Skip to content

Commit

Permalink
Merge pull request #215 from hchepey-clari/SystemMaxPollCount
Browse files Browse the repository at this point in the history
Introducing SystemMaxPollCount to tune and optimise the System Thread…
  • Loading branch information
v1r3n committed Jul 28, 2024
2 parents a0335c9 + 9eed49c commit 7e877cb
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,9 @@ public class ConductorProperties {
/** The number of threads to be used within the threadpool for system task workers. */
private int systemTaskWorkerThreadCount = Runtime.getRuntime().availableProcessors() * 2;

/** The max number of the threads to be polled within the threadpool for system task workers. */
private int systemTaskMaxPollCount = systemTaskWorkerThreadCount;

/**
* The interval (in seconds) after which a system task will be checked by the system task worker
* for completion.
Expand Down Expand Up @@ -360,6 +363,14 @@ public void setSystemTaskWorkerThreadCount(int systemTaskWorkerThreadCount) {
this.systemTaskWorkerThreadCount = systemTaskWorkerThreadCount;
}

public int getSystemTaskMaxPollCount() {
return systemTaskMaxPollCount;
}

public void setSystemTaskMaxPollCount(int systemTaskMaxPollCount) {
this.systemTaskMaxPollCount = systemTaskMaxPollCount;
}

public Duration getSystemTaskWorkerCallbackDuration() {
return systemTaskWorkerCallbackDuration;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,14 @@ void pollAndExecute(WorkflowSystemTask systemTask, String queueName) {
SemaphoreUtil semaphoreUtil = executionConfig.getSemaphoreUtil();
ExecutorService executorService = executionConfig.getExecutorService();
String taskName = QueueUtils.getTaskType(queueName);

int messagesToAcquire = semaphoreUtil.availableSlots();
final int systemTaskMaxPollCount = properties.getSystemTaskMaxPollCount();
int maxSystemTasksToAcquire =
(systemTaskMaxPollCount < 1
|| systemTaskMaxPollCount
> properties.getSystemTaskWorkerThreadCount())
? properties.getSystemTaskWorkerThreadCount()
: systemTaskMaxPollCount;
int messagesToAcquire = Math.min(semaphoreUtil.availableSlots(), maxSystemTasksToAcquire);

try {
if (messagesToAcquire <= 0 || !semaphoreUtil.acquireSlots(messagesToAcquire)) {
Expand Down

0 comments on commit 7e877cb

Please sign in to comment.