From 3e31914735b5da87c22229283afe3731767f219d Mon Sep 17 00:00:00 2001 From: Roman Date: Tue, 14 Dec 2021 17:03:50 +0100 Subject: [PATCH 1/2] fix(polling): respect request timeout settings --- ...tLongPollingActivateJobsRequestsState.java | 9 ++- .../job/LongPollingActivateJobsHandler.java | 70 +++++++++++-------- .../api/job/LongPollingActivateJobsTest.java | 68 ++++++++++++++++++ 3 files changed, 117 insertions(+), 30 deletions(-) diff --git a/gateway/src/main/java/io/camunda/zeebe/gateway/impl/job/InFlightLongPollingActivateJobsRequestsState.java b/gateway/src/main/java/io/camunda/zeebe/gateway/impl/job/InFlightLongPollingActivateJobsRequestsState.java index fb2b83212558..e5ba39231f74 100644 --- a/gateway/src/main/java/io/camunda/zeebe/gateway/impl/job/InFlightLongPollingActivateJobsRequestsState.java +++ b/gateway/src/main/java/io/camunda/zeebe/gateway/impl/job/InFlightLongPollingActivateJobsRequestsState.java @@ -47,6 +47,10 @@ public void setFailedAttempts(final int failedAttempts) { } } + public boolean shouldAttempt(final int attemptThreshold) { + return failedAttempts < attemptThreshold; + } + public void resetFailedAttempts() { setFailedAttempts(0); } @@ -112,10 +116,11 @@ public boolean hasActiveRequests() { /** * Returns whether the request should be repeated. A request should be repeated if the failed - * attempts were reset to 0 (because new jobs became available) whilst the request was running + * attempts were reset to 0 (because new jobs became available) whilst the request was running, + * and if the request's long polling is enabled. */ public boolean shouldBeRepeated(final LongPollingActivateJobsRequest request) { - return activeRequestsToBeRepeated.contains(request); + return activeRequestsToBeRepeated.contains(request) && !request.isLongPollingDisabled(); } public boolean shouldNotifyAndStartNotification() { diff --git a/gateway/src/main/java/io/camunda/zeebe/gateway/impl/job/LongPollingActivateJobsHandler.java b/gateway/src/main/java/io/camunda/zeebe/gateway/impl/job/LongPollingActivateJobsHandler.java index 38facba9bf87..336058b5d4d1 100644 --- a/gateway/src/main/java/io/camunda/zeebe/gateway/impl/job/LongPollingActivateJobsHandler.java +++ b/gateway/src/main/java/io/camunda/zeebe/gateway/impl/job/LongPollingActivateJobsHandler.java @@ -84,16 +84,45 @@ public void activateJobs( activateJobs(longPollingRequest); } + protected void completeOrResubmitRequest( + final LongPollingActivateJobsRequest request, final boolean activateImmediately) { + if (request.isLongPollingDisabled()) { + // request is not supposed to use the + // long polling capabilities -> just + // complete the request + request.complete(); + return; + } + + if (request.isTimedOut()) { + // already timed out, nothing to do here + return; + } + + final var type = request.getType(); + final var state = getJobTypeState(type); + + if (!request.hasScheduledTimer()) { + addTimeOut(state, request); + } + + if (activateImmediately) { + activateJobs(request); + } else { + enqueueRequest(state, request); + } + } + public void activateJobs(final LongPollingActivateJobsRequest request) { actor.run( () -> { final InFlightLongPollingActivateJobsRequestsState state = getJobTypeState(request.getType()); - if (state.getFailedAttempts() < failedAttemptThreshold) { + if (state.shouldAttempt(failedAttemptThreshold)) { activateJobsUnchecked(state, request); } else { - completeOrEnqueueRequest(state, request); + completeOrResubmitRequest(request, false); } }); } @@ -168,15 +197,9 @@ private void onCompleted( actor.submit( () -> { state.incrementFailedAttempts(currentTimeMillis()); - final boolean shouldBeRepeated = state.shouldBeRepeated(request); state.removeActiveRequest(request); - - if (shouldBeRepeated) { - activateJobs(request); - } else { - completeOrEnqueueRequest(getJobTypeState(request.getType()), request); - } + completeOrResubmitRequest(request, shouldBeRepeated); }); } } else { @@ -219,26 +242,17 @@ private void resetFailedAttemptsAndHandlePendingRequests(final String jobType) { } } - private void completeOrEnqueueRequest( + private void enqueueRequest( final InFlightLongPollingActivateJobsRequestsState state, final LongPollingActivateJobsRequest request) { - if (request.isLongPollingDisabled()) { - request.complete(); - return; - } - if (!request.isTimedOut()) { - LOG.trace( - "Worker '{}' asked for '{}' jobs of type '{}', but none are available. This request will" - + " be kept open until a new job of this type is created or until timeout of '{}'.", - request.getWorker(), - request.getMaxJobsToActivate(), - request.getType(), - request.getLongPollingTimeout(longPollingTimeout)); - state.enqueueRequest(request); - if (!request.hasScheduledTimer()) { - addTimeOut(state, request); - } - } + LOG.trace( + "Worker '{}' asked for '{}' jobs of type '{}', but none are available. This request will" + + " be kept open until a new job of this type is created or until timeout of '{}'.", + request.getWorker(), + request.getMaxJobsToActivate(), + request.getType(), + request.getLongPollingTimeout(longPollingTimeout)); + state.enqueueRequest(request); } private void addTimeOut( @@ -249,8 +263,8 @@ private void addTimeOut( actor.runDelayed( requestTimeout, () -> { - state.removeRequest(request); request.timeout(); + state.removeRequest(request); }); request.setScheduledTimer(timeout); } diff --git a/gateway/src/test/java/io/camunda/zeebe/gateway/api/job/LongPollingActivateJobsTest.java b/gateway/src/test/java/io/camunda/zeebe/gateway/api/job/LongPollingActivateJobsTest.java index 8a6cf5775ba1..a6ecb1527d5d 100644 --- a/gateway/src/test/java/io/camunda/zeebe/gateway/api/job/LongPollingActivateJobsTest.java +++ b/gateway/src/test/java/io/camunda/zeebe/gateway/api/job/LongPollingActivateJobsTest.java @@ -10,6 +10,7 @@ import static io.camunda.zeebe.test.util.TestUtil.waitUntil; import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.atLeast; import static org.mockito.Mockito.never; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.timeout; @@ -43,6 +44,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.function.Consumer; import java.util.stream.Collectors; import java.util.stream.IntStream; import org.junit.Before; @@ -596,6 +598,57 @@ public BrokerResponse handle(final BrokerActivateJobsRequest request) assertThat(request.hasScheduledTimer()).isFalse(); } + @Test + public void shouldCompleteRequestImmediatellyDespiteNotification() throws Exception { + // given + final LongPollingActivateJobsRequest request = + new LongPollingActivateJobsRequest( + ActivateJobsRequest.newBuilder() + .setType(TYPE) + .setRequestTimeout(-1) + .setMaxJobsToActivate(1) + .build(), + spy(ServerStreamObserver.class)); + + registerCustomHandlerWithNotification( + (r) -> { + final var partitionId = r.getPartitionId(); + if (partitionId == 1) { + brokerClient.notifyJobsAvailable(TYPE); + } + }); + + // when + handler.activateJobs(request); + + // then + waitUntil(request::isCompleted); + verify(stub, times(partitionsCount)).handle(any()); + } + + @Test + public void shouldTimeOutRequestDespiteMultipleNotificationLoops() throws Exception { + // given + final var request = getLongPollingActivateJobsRequest(); + + registerCustomHandlerWithNotification( + (r) -> { + final var partitionId = r.getPartitionId(); + if (partitionId == 1) { + brokerClient.notifyJobsAvailable(TYPE); + } + }); + + // when + handler.activateJobs(request); + waitUntil(request::hasScheduledTimer); + actorClock.addTime(Duration.ofMillis(LONG_POLLING_TIMEOUT)); + waitUntil(request::isTimedOut); + + // then + verify(stub, atLeast(partitionsCount)).handle(any()); + } + private List activateJobsAndWaitUntilBlocked(final int amount) { return IntStream.range(0, amount) .boxed() @@ -624,4 +677,19 @@ private LongPollingActivateJobsRequest getLongPollingActivateJobsRequest(final S return new LongPollingActivateJobsRequest(request, responseSpy); } + + private void registerCustomHandlerWithNotification( + final Consumer notification) { + brokerClient.registerHandler( + BrokerActivateJobsRequest.class, + new RequestHandler>() { + + @Override + public BrokerResponse handle(final BrokerActivateJobsRequest request) + throws Exception { + notification.accept(request); + return stub.handle(request); + } + }); + } } From 31992b38a26fc412a88eb8052a62f5d72a424343 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ole=20Sch=C3=B6nburg?= Date: Mon, 20 Dec 2021 13:41:56 +0100 Subject: [PATCH 2/2] style(gateway): minor code cleanup in long polling classes --- ...tLongPollingActivateJobsRequestsState.java | 20 ++++++++----------- .../job/LongPollingActivateJobsHandler.java | 2 +- .../job/LongPollingActivateJobsRequest.java | 4 ++-- .../api/job/LongPollingActivateJobsTest.java | 4 ++-- 4 files changed, 13 insertions(+), 17 deletions(-) diff --git a/gateway/src/main/java/io/camunda/zeebe/gateway/impl/job/InFlightLongPollingActivateJobsRequestsState.java b/gateway/src/main/java/io/camunda/zeebe/gateway/impl/job/InFlightLongPollingActivateJobsRequestsState.java index e5ba39231f74..0cc1465df5b4 100644 --- a/gateway/src/main/java/io/camunda/zeebe/gateway/impl/job/InFlightLongPollingActivateJobsRequestsState.java +++ b/gateway/src/main/java/io/camunda/zeebe/gateway/impl/job/InFlightLongPollingActivateJobsRequestsState.java @@ -7,17 +7,13 @@ */ package io.camunda.zeebe.gateway.impl.job; -import io.camunda.zeebe.gateway.Loggers; import io.camunda.zeebe.gateway.metrics.LongPollingMetrics; import java.util.LinkedList; import java.util.Queue; import java.util.concurrent.atomic.AtomicBoolean; -import org.slf4j.Logger; public final class InFlightLongPollingActivateJobsRequestsState { - private static final Logger LOGGER = Loggers.GATEWAY_LOGGER; - private final String jobType; private final LongPollingMetrics metrics; private final Queue activeRequests = new LinkedList<>(); @@ -27,7 +23,7 @@ public final class InFlightLongPollingActivateJobsRequestsState { private int failedAttempts; private long lastUpdatedTime; - private AtomicBoolean ongoingNotification = new AtomicBoolean(false); + private final AtomicBoolean ongoingNotification = new AtomicBoolean(false); public InFlightLongPollingActivateJobsRequestsState( final String jobType, final LongPollingMetrics metrics) { @@ -40,13 +36,6 @@ public void incrementFailedAttempts(final long lastUpdatedTime) { this.lastUpdatedTime = lastUpdatedTime; } - public void setFailedAttempts(final int failedAttempts) { - this.failedAttempts = failedAttempts; - if (failedAttempts == 0) { - activeRequestsToBeRepeated.addAll(activeRequests); - } - } - public boolean shouldAttempt(final int attemptThreshold) { return failedAttempts < attemptThreshold; } @@ -59,6 +48,13 @@ public int getFailedAttempts() { return failedAttempts; } + public void setFailedAttempts(final int failedAttempts) { + this.failedAttempts = failedAttempts; + if (failedAttempts == 0) { + activeRequestsToBeRepeated.addAll(activeRequests); + } + } + public long getLastUpdatedTime() { return lastUpdatedTime; } diff --git a/gateway/src/main/java/io/camunda/zeebe/gateway/impl/job/LongPollingActivateJobsHandler.java b/gateway/src/main/java/io/camunda/zeebe/gateway/impl/job/LongPollingActivateJobsHandler.java index 336058b5d4d1..fe7931f98d0b 100644 --- a/gateway/src/main/java/io/camunda/zeebe/gateway/impl/job/LongPollingActivateJobsHandler.java +++ b/gateway/src/main/java/io/camunda/zeebe/gateway/impl/job/LongPollingActivateJobsHandler.java @@ -84,7 +84,7 @@ public void activateJobs( activateJobs(longPollingRequest); } - protected void completeOrResubmitRequest( + private void completeOrResubmitRequest( final LongPollingActivateJobsRequest request, final boolean activateImmediately) { if (request.isLongPollingDisabled()) { // request is not supposed to use the diff --git a/gateway/src/main/java/io/camunda/zeebe/gateway/impl/job/LongPollingActivateJobsRequest.java b/gateway/src/main/java/io/camunda/zeebe/gateway/impl/job/LongPollingActivateJobsRequest.java index fd114165e782..cdd4dffa6b1d 100644 --- a/gateway/src/main/java/io/camunda/zeebe/gateway/impl/job/LongPollingActivateJobsRequest.java +++ b/gateway/src/main/java/io/camunda/zeebe/gateway/impl/job/LongPollingActivateJobsRequest.java @@ -48,13 +48,13 @@ private LongPollingActivateJobsRequest( final ServerStreamObserver responseObserver, final String jobType, final String worker, - final int maxJobstoActivate, + final int maxJobsToActivate, final long longPollingTimeout) { this.request = request; this.responseObserver = responseObserver; this.jobType = jobType; this.worker = worker; - maxJobsToActivate = maxJobstoActivate; + this.maxJobsToActivate = maxJobsToActivate; this.longPollingTimeout = longPollingTimeout == 0 ? null : Duration.ofMillis(longPollingTimeout); } diff --git a/gateway/src/test/java/io/camunda/zeebe/gateway/api/job/LongPollingActivateJobsTest.java b/gateway/src/test/java/io/camunda/zeebe/gateway/api/job/LongPollingActivateJobsTest.java index a6ecb1527d5d..505b6a654734 100644 --- a/gateway/src/test/java/io/camunda/zeebe/gateway/api/job/LongPollingActivateJobsTest.java +++ b/gateway/src/test/java/io/camunda/zeebe/gateway/api/job/LongPollingActivateJobsTest.java @@ -60,7 +60,7 @@ public final class LongPollingActivateJobsTest { private static final long LONG_POLLING_TIMEOUT = 5000; private static final long PROBE_TIMEOUT = 20000; private static final int FAILED_RESPONSE_THRESHOLD = 3; - protected final ControlledActorClock actorClock = new ControlledActorClock(); + private final ControlledActorClock actorClock = new ControlledActorClock(); @Rule public final ActorSchedulerRule actorSchedulerRule = new ActorSchedulerRule(actorClock); private LongPollingActivateJobsHandler handler; private ActivateJobsStub stub; @@ -599,7 +599,7 @@ public BrokerResponse handle(final BrokerActivateJobsRequest request) } @Test - public void shouldCompleteRequestImmediatellyDespiteNotification() throws Exception { + public void shouldCompleteRequestImmediatelyDespiteNotification() throws Exception { // given final LongPollingActivateJobsRequest request = new LongPollingActivateJobsRequest(