Skip to content

Commit

Permalink
ARTEMIS-4314 Small Tweak: using executor directly if no delay
Browse files Browse the repository at this point in the history
  • Loading branch information
clebertsuconic committed Jun 17, 2023
1 parent 9b5dbf4 commit c6a82ff
Showing 1 changed file with 21 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -208,23 +208,29 @@ public Executor getExecutor() {
}

private void scheduleCreditOnEmpty(final int delay, final QueueHandle handle) {
scheduledExecutorService.schedule(() -> {
// use queue executor to sync on message count metric
handle.getExecutor().execute(() -> {
if (clientConsumer != null) {
if (0L == handle.getMessageCount()) {
flow(handle.getCreditWindow());
pendingPullCredit.set(handle.getCreditWindow());
} else {
if (0 == delay) {
clientConsumer.resetIfSlowConsumer();
pendingPullCredit.set(0);
}
scheduleCreditOnEmpty(FederatedQueueConsumer.getNextDelay(delay, intialConnectDelayMultiplier, intialConnectDelayMax), handle);

Runnable runnable = () -> {
if (clientConsumer != null) {
if (0L == handle.getMessageCount()) {
flow(handle.getCreditWindow());
pendingPullCredit.set(handle.getCreditWindow());
} else {
if (0 == delay) {
clientConsumer.resetIfSlowConsumer();
pendingPullCredit.set(0);
}
scheduleCreditOnEmpty(FederatedQueueConsumer.getNextDelay(delay, intialConnectDelayMultiplier, intialConnectDelayMax), handle);
}
});
}, delay, TimeUnit.SECONDS);
}
};

if (delay == 0) { // if delay==0 just use the executor directly
handle.getExecutor().execute(runnable);
} else {
scheduledExecutorService.schedule(() -> {
handle.getExecutor().execute(runnable);
}, delay, TimeUnit.SECONDS);
}
}

private void flow(int creditWindow) {
Expand Down

0 comments on commit c6a82ff

Please sign in to comment.