Skip to content

Commit

Permalink
merge: #8391
Browse files Browse the repository at this point in the history
8391: fix(polling): respect request timeout settings r=oleschoenburg a=romansmirnov

## Description

* If long polling is disabled by the received request, then always complete the request immediately even when no jobs are activated.
* Ensure that the provided request timeout is respected so that the request completes at latest at the given timeout.

## Related issues

<!-- Which issues are closed by this PR or are related -->

relates #8310 
closes #8389 



Co-authored-by: Roman <roman.smirnov@camunda.com>
Co-authored-by: Ole Schönburg <ole.schoenburg@gmail.com>
  • Loading branch information
3 people committed Dec 20, 2021
2 parents 49026b1 + 31992b3 commit 2c754a7
Show file tree
Hide file tree
Showing 4 changed files with 126 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,13 @@
*/
package io.camunda.zeebe.gateway.impl.job;

import io.camunda.zeebe.gateway.Loggers;
import io.camunda.zeebe.gateway.metrics.LongPollingMetrics;
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;

public final class InFlightLongPollingActivateJobsRequestsState {

private static final Logger LOGGER = Loggers.GATEWAY_LOGGER;

private final String jobType;
private final LongPollingMetrics metrics;
private final Queue<LongPollingActivateJobsRequest> activeRequests = new LinkedList<>();
Expand All @@ -27,7 +23,7 @@ public final class InFlightLongPollingActivateJobsRequestsState {
private int failedAttempts;
private long lastUpdatedTime;

private AtomicBoolean ongoingNotification = new AtomicBoolean(false);
private final AtomicBoolean ongoingNotification = new AtomicBoolean(false);

public InFlightLongPollingActivateJobsRequestsState(
final String jobType, final LongPollingMetrics metrics) {
Expand All @@ -40,11 +36,8 @@ public void incrementFailedAttempts(final long lastUpdatedTime) {
this.lastUpdatedTime = lastUpdatedTime;
}

public void setFailedAttempts(final int failedAttempts) {
this.failedAttempts = failedAttempts;
if (failedAttempts == 0) {
activeRequestsToBeRepeated.addAll(activeRequests);
}
public boolean shouldAttempt(final int attemptThreshold) {
return failedAttempts < attemptThreshold;
}

public void resetFailedAttempts() {
Expand All @@ -55,6 +48,13 @@ public int getFailedAttempts() {
return failedAttempts;
}

public void setFailedAttempts(final int failedAttempts) {
this.failedAttempts = failedAttempts;
if (failedAttempts == 0) {
activeRequestsToBeRepeated.addAll(activeRequests);
}
}

public long getLastUpdatedTime() {
return lastUpdatedTime;
}
Expand Down Expand Up @@ -112,10 +112,11 @@ public boolean hasActiveRequests() {

/**
* Returns whether the request should be repeated. A request should be repeated if the failed
* attempts were reset to 0 (because new jobs became available) whilst the request was running
* attempts were reset to 0 (because new jobs became available) whilst the request was running,
* and if the request's long polling is enabled.
*/
public boolean shouldBeRepeated(final LongPollingActivateJobsRequest request) {
return activeRequestsToBeRepeated.contains(request);
return activeRequestsToBeRepeated.contains(request) && !request.isLongPollingDisabled();
}

public boolean shouldNotifyAndStartNotification() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,16 +84,45 @@ public void activateJobs(
activateJobs(longPollingRequest);
}

private void completeOrResubmitRequest(
final LongPollingActivateJobsRequest request, final boolean activateImmediately) {
if (request.isLongPollingDisabled()) {
// request is not supposed to use the
// long polling capabilities -> just
// complete the request
request.complete();
return;
}

if (request.isTimedOut()) {
// already timed out, nothing to do here
return;
}

final var type = request.getType();
final var state = getJobTypeState(type);

if (!request.hasScheduledTimer()) {
addTimeOut(state, request);
}

if (activateImmediately) {
activateJobs(request);
} else {
enqueueRequest(state, request);
}
}

public void activateJobs(final LongPollingActivateJobsRequest request) {
actor.run(
() -> {
final InFlightLongPollingActivateJobsRequestsState state =
getJobTypeState(request.getType());

if (state.getFailedAttempts() < failedAttemptThreshold) {
if (state.shouldAttempt(failedAttemptThreshold)) {
activateJobsUnchecked(state, request);
} else {
completeOrEnqueueRequest(state, request);
completeOrResubmitRequest(request, false);
}
});
}
Expand Down Expand Up @@ -168,15 +197,9 @@ private void onCompleted(
actor.submit(
() -> {
state.incrementFailedAttempts(currentTimeMillis());

final boolean shouldBeRepeated = state.shouldBeRepeated(request);
state.removeActiveRequest(request);

if (shouldBeRepeated) {
activateJobs(request);
} else {
completeOrEnqueueRequest(getJobTypeState(request.getType()), request);
}
completeOrResubmitRequest(request, shouldBeRepeated);
});
}
} else {
Expand Down Expand Up @@ -219,26 +242,17 @@ private void resetFailedAttemptsAndHandlePendingRequests(final String jobType) {
}
}

private void completeOrEnqueueRequest(
private void enqueueRequest(
final InFlightLongPollingActivateJobsRequestsState state,
final LongPollingActivateJobsRequest request) {
if (request.isLongPollingDisabled()) {
request.complete();
return;
}
if (!request.isTimedOut()) {
LOG.trace(
"Worker '{}' asked for '{}' jobs of type '{}', but none are available. This request will"
+ " be kept open until a new job of this type is created or until timeout of '{}'.",
request.getWorker(),
request.getMaxJobsToActivate(),
request.getType(),
request.getLongPollingTimeout(longPollingTimeout));
state.enqueueRequest(request);
if (!request.hasScheduledTimer()) {
addTimeOut(state, request);
}
}
LOG.trace(
"Worker '{}' asked for '{}' jobs of type '{}', but none are available. This request will"
+ " be kept open until a new job of this type is created or until timeout of '{}'.",
request.getWorker(),
request.getMaxJobsToActivate(),
request.getType(),
request.getLongPollingTimeout(longPollingTimeout));
state.enqueueRequest(request);
}

private void addTimeOut(
Expand All @@ -249,8 +263,8 @@ private void addTimeOut(
actor.runDelayed(
requestTimeout,
() -> {
state.removeRequest(request);
request.timeout();
state.removeRequest(request);
});
request.setScheduledTimer(timeout);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,13 @@ private LongPollingActivateJobsRequest(
final ServerStreamObserver<ActivateJobsResponse> responseObserver,
final String jobType,
final String worker,
final int maxJobstoActivate,
final int maxJobsToActivate,
final long longPollingTimeout) {
this.request = request;
this.responseObserver = responseObserver;
this.jobType = jobType;
this.worker = worker;
maxJobsToActivate = maxJobstoActivate;
this.maxJobsToActivate = maxJobsToActivate;
this.longPollingTimeout =
longPollingTimeout == 0 ? null : Duration.ofMillis(longPollingTimeout);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import static io.camunda.zeebe.test.util.TestUtil.waitUntil;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.timeout;
Expand Down Expand Up @@ -43,6 +44,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.junit.Before;
Expand All @@ -58,7 +60,7 @@ public final class LongPollingActivateJobsTest {
private static final long LONG_POLLING_TIMEOUT = 5000;
private static final long PROBE_TIMEOUT = 20000;
private static final int FAILED_RESPONSE_THRESHOLD = 3;
protected final ControlledActorClock actorClock = new ControlledActorClock();
private final ControlledActorClock actorClock = new ControlledActorClock();
@Rule public final ActorSchedulerRule actorSchedulerRule = new ActorSchedulerRule(actorClock);
private LongPollingActivateJobsHandler handler;
private ActivateJobsStub stub;
Expand Down Expand Up @@ -596,6 +598,57 @@ public BrokerResponse<?> handle(final BrokerActivateJobsRequest request)
assertThat(request.hasScheduledTimer()).isFalse();
}

@Test
public void shouldCompleteRequestImmediatelyDespiteNotification() throws Exception {
// given
final LongPollingActivateJobsRequest request =
new LongPollingActivateJobsRequest(
ActivateJobsRequest.newBuilder()
.setType(TYPE)
.setRequestTimeout(-1)
.setMaxJobsToActivate(1)
.build(),
spy(ServerStreamObserver.class));

registerCustomHandlerWithNotification(
(r) -> {
final var partitionId = r.getPartitionId();
if (partitionId == 1) {
brokerClient.notifyJobsAvailable(TYPE);
}
});

// when
handler.activateJobs(request);

// then
waitUntil(request::isCompleted);
verify(stub, times(partitionsCount)).handle(any());
}

@Test
public void shouldTimeOutRequestDespiteMultipleNotificationLoops() throws Exception {
// given
final var request = getLongPollingActivateJobsRequest();

registerCustomHandlerWithNotification(
(r) -> {
final var partitionId = r.getPartitionId();
if (partitionId == 1) {
brokerClient.notifyJobsAvailable(TYPE);
}
});

// when
handler.activateJobs(request);
waitUntil(request::hasScheduledTimer);
actorClock.addTime(Duration.ofMillis(LONG_POLLING_TIMEOUT));
waitUntil(request::isTimedOut);

// then
verify(stub, atLeast(partitionsCount)).handle(any());
}

private List<LongPollingActivateJobsRequest> activateJobsAndWaitUntilBlocked(final int amount) {
return IntStream.range(0, amount)
.boxed()
Expand Down Expand Up @@ -624,4 +677,19 @@ private LongPollingActivateJobsRequest getLongPollingActivateJobsRequest(final S

return new LongPollingActivateJobsRequest(request, responseSpy);
}

private void registerCustomHandlerWithNotification(
final Consumer<BrokerActivateJobsRequest> notification) {
brokerClient.registerHandler(
BrokerActivateJobsRequest.class,
new RequestHandler<BrokerActivateJobsRequest, BrokerResponse<?>>() {

@Override
public BrokerResponse<?> handle(final BrokerActivateJobsRequest request)
throws Exception {
notification.accept(request);
return stub.handle(request);
}
});
}
}

0 comments on commit 2c754a7

Please sign in to comment.