Skip to content

Commit

Permalink
ENH: Improve signals throttling
Browse files Browse the repository at this point in the history
  • Loading branch information
Punzo committed Jun 4, 2024
1 parent 7dca0f3 commit 5255c53
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 12 deletions.
61 changes: 49 additions & 12 deletions Libs/Core/ctkJobScheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -558,7 +558,7 @@ void ctkJobScheduler::stopAllJobs(bool stopPersistentJobs)
// In addition, to speedup the cleaning of jobs, we remove them with one call removeJobs,
// instead of using the signal
QMap<QString, QMetaObject::Connection> connections = d->JobsConnections.value(jobUID);
QObject::disconnect(connections.value("userStopped"));
//QObject::disconnect(connections.value("userStopped"));
job->setStatus(ctkAbstractJob::JobStatus::UserStopped);
initializedStoppedJobsUIDs.append(jobUID);
}
Expand All @@ -585,7 +585,7 @@ void ctkJobScheduler::stopAllJobs(bool stopPersistentJobs)
{
this->deleteWorker(job->jobUID());
QMap<QString, QMetaObject::Connection> connections = d->JobsConnections.value(jobUID);
QObject::disconnect(connections.value("userStopped"));
//QObject::disconnect(connections.value("userStopped"));
job->setStatus(ctkAbstractJob::JobStatus::UserStopped);
initializedStoppedJobsUIDs.append(job->jobUID());
}
Expand All @@ -605,8 +605,6 @@ void ctkJobScheduler::stopAllJobs(bool stopPersistentJobs)

worker->requestCancel();
}

d->clearBactchedJobsLists();
}

//----------------------------------------------------------------------------
Expand Down Expand Up @@ -652,7 +650,7 @@ void ctkJobScheduler::stopJobsByJobUIDs(const QStringList &jobUIDs)
// In addition, to speedup the cleaning of jobs, we remove them with one call removeJobs,
// instead of using the signal
QMap<QString, QMetaObject::Connection> connections = d->JobsConnections.value(jobUID);
QObject::disconnect(connections.value("userStopped"));
//QObject::disconnect(connections.value("userStopped"));
job->setStatus(ctkAbstractJob::JobStatus::UserStopped);
initializedStoppedJobsUIDs.append(job->jobUID());
}
Expand Down Expand Up @@ -680,7 +678,7 @@ void ctkJobScheduler::stopJobsByJobUIDs(const QStringList &jobUIDs)
{
this->deleteWorker(job->jobUID());
QMap<QString, QMetaObject::Connection> connections = d->JobsConnections.value(jobUID);
QObject::disconnect(connections.value("userStopped"));
//QObject::disconnect(connections.value("userStopped"));
job->setStatus(ctkAbstractJob::JobStatus::UserStopped);
initializedStoppedJobsUIDs.append(job->jobUID());
}
Expand Down Expand Up @@ -861,12 +859,51 @@ void ctkJobScheduler::emitThrottledSignals()
{
Q_D(ctkJobScheduler);

emit this->jobStarted(d->BatchedJobsStarted);
emit this->jobUserStopped(d->BatchedJobsUserStopped);
emit this->jobFinished(d->BatchedJobsFinished);
emit this->jobAttemptFailed(d->BatchedJobsAttemptFailed);
emit this->jobFailed(d->BatchedJobsFailed);
int totalEmitted = 0;
if (!d->BatchedJobsStarted.isEmpty() && totalEmitted < d->MaximumBatchedSignalsForTimeInterval)
{
int count = qMin(d->MaximumBatchedSignalsForTimeInterval - totalEmitted, d->BatchedJobsStarted.size());
emit this->jobStarted(d->BatchedJobsStarted.mid(0, count));
d->BatchedJobsStarted = d->BatchedJobsStarted.mid(count);
totalEmitted += count;
}
if (!d->BatchedJobsUserStopped.isEmpty() && totalEmitted < d->MaximumBatchedSignalsForTimeInterval)
{
int count = qMin(d->MaximumBatchedSignalsForTimeInterval - totalEmitted, d->BatchedJobsUserStopped.size());
emit this->jobUserStopped(d->BatchedJobsUserStopped.mid(0, count));
d->BatchedJobsUserStopped = d->BatchedJobsUserStopped.mid(count);
totalEmitted += count;
}
if (!d->BatchedJobsFinished.isEmpty() && totalEmitted < d->MaximumBatchedSignalsForTimeInterval)
{
int count = qMin(d->MaximumBatchedSignalsForTimeInterval - totalEmitted, d->BatchedJobsFinished.size());
emit this->jobFinished(d->BatchedJobsFinished.mid(0, count));
d->BatchedJobsFinished = d->BatchedJobsFinished.mid(count);
totalEmitted += count;
}
if (!d->BatchedJobsAttemptFailed.isEmpty() && totalEmitted < d->MaximumBatchedSignalsForTimeInterval)
{
int count = qMin(d->MaximumBatchedSignalsForTimeInterval - totalEmitted, d->BatchedJobsAttemptFailed.size());
emit this->jobAttemptFailed(d->BatchedJobsAttemptFailed.mid(0, count));
d->BatchedJobsAttemptFailed = d->BatchedJobsAttemptFailed.mid(count);
totalEmitted += count;
}
if (!d->BatchedJobsFailed.isEmpty() && totalEmitted < d->MaximumBatchedSignalsForTimeInterval)
{
int count = qMin(d->MaximumBatchedSignalsForTimeInterval - totalEmitted, d->BatchedJobsFailed.size());
emit this->jobFailed(d->BatchedJobsFailed.mid(0, count));
d->BatchedJobsFailed = d->BatchedJobsFailed.mid(count);
totalEmitted += count;
}

emit this->progressJobDetail(d->BatchedJobsProgress);
d->BatchedJobsProgress.clear();

d->clearBactchedJobsLists();
int numberOfSignalsNotSent = d->BatchedJobsStarted.size() + d->BatchedJobsUserStopped.size() +
d->BatchedJobsFinished.size() + d->BatchedJobsAttemptFailed.size() +
d->BatchedJobsFailed.size();
if (numberOfSignalsNotSent != 0 && !d->ThrottleTimer->isActive())
{
d->ThrottleTimer->start(d->ThrottleTimeInterval);
}
}
1 change: 1 addition & 0 deletions Libs/Core/ctkJobScheduler_p.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ public Q_SLOTS:
QList<QVariant> BatchedJobsProgress;
QSharedPointer<QTimer> ThrottleTimer;
int ThrottleTimeInterval{300};
int MaximumBatchedSignalsForTimeInterval{20};
};

#endif

0 comments on commit 5255c53

Please sign in to comment.