Skip to content

Commit

Permalink
merge: #8392
Browse files Browse the repository at this point in the history
8392: fix(polling/state): prevent duplicates in repeatable requests list r=oleschoenburg a=romansmirnov

## Description

<!-- Please explain the changes you made here. -->

## Related issues

<!-- Which issues are closed by this PR or are related -->

relates #8310
closes #8390



Co-authored-by: Roman <roman.smirnov@camunda.com>
  • Loading branch information
zeebe-bors-cloud[bot] and romansmirnov committed Dec 20, 2021
2 parents 2cf8e90 + 460fc6f commit dc9773d
Show file tree
Hide file tree
Showing 4 changed files with 111 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,19 @@
package io.camunda.zeebe.gateway.impl.job;

import io.camunda.zeebe.gateway.metrics.LongPollingMetrics;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;

public final class InFlightLongPollingActivateJobsRequestsState {

private final String jobType;
private final LongPollingMetrics metrics;
private final Queue<LongPollingActivateJobsRequest> activeRequests = new LinkedList<>();
private final Queue<LongPollingActivateJobsRequest> activeRequestsToBeRepeated =
new LinkedList<>();
private final Queue<LongPollingActivateJobsRequest> pendingRequests = new LinkedList<>();
private final Set<LongPollingActivateJobsRequest> activeRequestsToBeRepeated = new HashSet<>();
private int failedAttempts;
private long lastUpdatedTime;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import org.slf4j.Logger;

/**
Expand All @@ -50,6 +51,7 @@ public final class LongPollingActivateJobsHandler extends Actor implements Activ
private final int failedAttemptThreshold;

private final LongPollingMetrics metrics;
private final AtomicLong requestIdGenerator = new AtomicLong(1);

private LongPollingActivateJobsHandler(
final BrokerClient brokerClient,
Expand Down Expand Up @@ -79,8 +81,9 @@ protected void onActorStarted() {
public void activateJobs(
final ActivateJobsRequest request,
final ServerStreamObserver<ActivateJobsResponse> responseObserver) {
final LongPollingActivateJobsRequest longPollingRequest =
new LongPollingActivateJobsRequest(request, responseObserver);
final var requestId = getNextActivateJobsRequestId();
final var longPollingRequest =
new LongPollingActivateJobsRequest(requestId, request, responseObserver);
activateJobs(longPollingRequest);
}

Expand Down Expand Up @@ -127,6 +130,10 @@ public void activateJobs(final LongPollingActivateJobsRequest request) {
});
}

private long getNextActivateJobsRequestId() {
return requestIdGenerator.getAndIncrement();
}

private InFlightLongPollingActivateJobsRequestsState getJobTypeState(final String jobType) {
return jobTypeState.computeIfAbsent(
jobType, type -> new InFlightLongPollingActivateJobsRequestsState(type, metrics));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,13 @@
import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.ActivateJobsResponse;
import io.camunda.zeebe.util.sched.ScheduledTimer;
import java.time.Duration;
import java.util.Objects;
import org.slf4j.Logger;

public final class LongPollingActivateJobsRequest {

private static final Logger LOG = Loggers.GATEWAY_LOGGER;
private final long requestId;
private final BrokerActivateJobsRequest request;
private final ServerStreamObserver<ActivateJobsResponse> responseObserver;
private final String jobType;
Expand All @@ -32,9 +34,11 @@ public final class LongPollingActivateJobsRequest {
private boolean isCompleted;

public LongPollingActivateJobsRequest(
final long requestId,
final ActivateJobsRequest request,
final ServerStreamObserver<ActivateJobsResponse> responseObserver) {
this(
requestId,
RequestMapper.toActivateJobsRequest(request),
responseObserver,
request.getType(),
Expand All @@ -44,12 +48,14 @@ public LongPollingActivateJobsRequest(
}

private LongPollingActivateJobsRequest(
final long requestId,
final BrokerActivateJobsRequest request,
final ServerStreamObserver<ActivateJobsResponse> responseObserver,
final String jobType,
final String worker,
final int maxJobsToActivate,
final long longPollingTimeout) {
this.requestId = requestId;
this.request = request;
this.responseObserver = responseObserver;
this.jobType = jobType;
Expand Down Expand Up @@ -156,4 +162,27 @@ private void cancelTimerIfScheduled() {
scheduledTimer = null;
}
}

@Override
public int hashCode() {
return Objects.hash(jobType, maxJobsToActivate, requestId, worker);
}

@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null) {
return false;
}
if (getClass() != obj.getClass()) {
return false;
}
final var other = (LongPollingActivateJobsRequest) obj;
return Objects.equals(jobType, other.jobType)
&& maxJobsToActivate == other.maxJobsToActivate
&& requestId == other.requestId
&& Objects.equals(worker, other.worker);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
Expand All @@ -60,12 +62,14 @@ public final class LongPollingActivateJobsTest {
private static final long LONG_POLLING_TIMEOUT = 5000;
private static final long PROBE_TIMEOUT = 20000;
private static final int FAILED_RESPONSE_THRESHOLD = 3;
private static final int MAX_JOBS_TO_ACTIVATE = 2;
private final ControlledActorClock actorClock = new ControlledActorClock();
@Rule public final ActorSchedulerRule actorSchedulerRule = new ActorSchedulerRule(actorClock);
private LongPollingActivateJobsHandler handler;
private ActivateJobsStub stub;
private int partitionsCount;
private final StubbedBrokerClient brokerClient = new StubbedBrokerClient();
private final AtomicLong requestIdGenerator = new AtomicLong(1);

@Before
public void setup() {
Expand Down Expand Up @@ -143,7 +147,7 @@ public void shouldBlockImmediatelyAfterThreshold() throws Exception {
@Test
public void shouldUnblockAllRequestsWhenJobsAvailable() throws Exception {
// given
final int amount = 3;
final int amount = FAILED_RESPONSE_THRESHOLD;
activateJobsAndWaitUntilBlocked(amount);
final int firstRound = amount * partitionsCount;

Expand Down Expand Up @@ -205,7 +209,7 @@ public void shouldNotBlockOtherJobTypes() {
// given
final String otherType = "other-type";
stub.addAvailableJobs(otherType, 2);
final int threshold = 3;
final int threshold = FAILED_RESPONSE_THRESHOLD;
activateJobsAndWaitUntilBlocked(threshold);

// when
Expand Down Expand Up @@ -253,7 +257,7 @@ public void shouldProbeNextRequestWhenBlockedRequestsTimedOut() throws Exception
.build();
actorSchedulerRule.submitActor(handler);

final int threshold = 3;
final int threshold = FAILED_RESPONSE_THRESHOLD;
final List<LongPollingActivateJobsRequest> requests =
activateJobsAndWaitUntilBlocked(threshold);

Expand Down Expand Up @@ -283,7 +287,7 @@ public void shouldUseRequestSpecificTimeout() {
final ServerStreamObserver<ActivateJobsResponse> responseSpy = spy(ServerStreamObserver.class);

final LongPollingActivateJobsRequest longPollingRequest =
new LongPollingActivateJobsRequest(request, responseSpy);
new LongPollingActivateJobsRequest(getNextRequestId(), request, responseSpy);

handler.activateJobs(longPollingRequest);
waitUntil(longPollingRequest::hasScheduledTimer);
Expand All @@ -300,6 +304,7 @@ public void shouldUseLargeRequestTimeout() {
final long requestTimeout = 50000;
final LongPollingActivateJobsRequest shortRequest =
new LongPollingActivateJobsRequest(
getNextRequestId(),
ActivateJobsRequest.newBuilder()
.setType(TYPE)
.setMaxJobsToActivate(1)
Expand All @@ -310,6 +315,7 @@ public void shouldUseLargeRequestTimeout() {
final long longTimeout = 100000;
final LongPollingActivateJobsRequest longRequest =
new LongPollingActivateJobsRequest(
getNextRequestId(),
ActivateJobsRequest.newBuilder()
.setType(TYPE)
.setMaxJobsToActivate(1)
Expand Down Expand Up @@ -339,6 +345,7 @@ public void shouldNotBlockWhenNegativeTimeout() {
// given
final LongPollingActivateJobsRequest request =
new LongPollingActivateJobsRequest(
getNextRequestId(),
ActivateJobsRequest.newBuilder()
.setType(TYPE)
.setMaxJobsToActivate(1)
Expand All @@ -363,6 +370,7 @@ public void shouldNotBlockWhenNegativeTimeout() {
// a request with timeout
final LongPollingActivateJobsRequest request =
new LongPollingActivateJobsRequest(
getNextRequestId(),
ActivateJobsRequest.newBuilder()
.setType(TYPE)
.setMaxJobsToActivate(15)
Expand Down Expand Up @@ -427,6 +435,7 @@ public BrokerResponse<JobBatchRecord> handle(final BrokerActivateJobsRequest req
// given
final LongPollingActivateJobsRequest request =
new LongPollingActivateJobsRequest(
getNextRequestId(),
ActivateJobsRequest.newBuilder()
.setType(TYPE)
.setMaxJobsToActivate(15)
Expand Down Expand Up @@ -472,6 +481,7 @@ public void shouldReturnJobsIfSomeBrokersHaveJobsWhileOthersReturnResourceExhaus
// given
final LongPollingActivateJobsRequest request =
new LongPollingActivateJobsRequest(
getNextRequestId(),
ActivateJobsRequest.newBuilder()
.setType(TYPE)
.setMaxJobsToActivate(15)
Expand Down Expand Up @@ -517,6 +527,52 @@ public BrokerResponse<?> handle(final BrokerActivateJobsRequest request)
assertThat(response.getJobsList()).hasSize(10);
}

@Test
public void shouldRepeatRequestOnlyOnce() throws Exception {
// given
// the first three requests activates jobs
final var firstRequest = getLongPollingActivateJobsRequest();
final var secondRequest = getLongPollingActivateJobsRequest();
final var thirdRequest = getLongPollingActivateJobsRequest();
// the last request does not activate any jobs
final var fourthRequest = getLongPollingActivateJobsRequest();

final var allRequestsSubmittedLatch = new CountDownLatch(1);
registerCustomHandlerWithNotification(
(r) -> {
try {
// ensure that all requests are submitted to
// the actor jobs queue before executing those
allRequestsSubmittedLatch.await();
} catch (InterruptedException e) {
// ignore
}
});

stub.addAvailableJobs(TYPE, 3 * MAX_JOBS_TO_ACTIVATE);

// when
handler.activateJobs(firstRequest);
handler.activateJobs(secondRequest);
handler.activateJobs(thirdRequest);
handler.activateJobs(fourthRequest);

allRequestsSubmittedLatch.countDown();
waitUntil(fourthRequest::hasScheduledTimer);
actorClock.addTime(Duration.ofMillis(LONG_POLLING_TIMEOUT));
waitUntil(fourthRequest::isTimedOut);

// then
assertThat(firstRequest.isCompleted()).isTrue();
assertThat(secondRequest.isCompleted()).isTrue();
assertThat(thirdRequest.isCompleted()).isTrue();

verify(stub, times(1)).handle(firstRequest.getRequest());
verify(stub, times(1)).handle(secondRequest.getRequest());
verify(stub, times(1)).handle(thirdRequest.getRequest());
verify(stub, times(partitionsCount * 2)).handle(fourthRequest.getRequest());
}

@Test
public void shouldCancelTimerOnResourceExhausted() {
// given
Expand Down Expand Up @@ -603,6 +659,7 @@ public void shouldCompleteRequestImmediatelyDespiteNotification() throws Excepti
// given
final LongPollingActivateJobsRequest request =
new LongPollingActivateJobsRequest(
getNextRequestId(),
ActivateJobsRequest.newBuilder()
.setType(TYPE)
.setRequestTimeout(-1)
Expand Down Expand Up @@ -667,15 +724,19 @@ private LongPollingActivateJobsRequest getLongPollingActivateJobsRequest() {
}

private LongPollingActivateJobsRequest getLongPollingActivateJobsRequest(final String jobType) {
final int maxJobsToActivate = 2;
final ActivateJobsRequest request =
final var requestId = getNextRequestId();
final var request =
ActivateJobsRequest.newBuilder()
.setType(jobType)
.setMaxJobsToActivate(maxJobsToActivate)
.setMaxJobsToActivate(MAX_JOBS_TO_ACTIVATE)
.build();
final ServerStreamObserver<ActivateJobsResponse> responseSpy = spy(ServerStreamObserver.class);
final var responseSpy = spy(ServerStreamObserver.class);

return new LongPollingActivateJobsRequest(requestId, request, responseSpy);
}

return new LongPollingActivateJobsRequest(request, responseSpy);
private long getNextRequestId() {
return requestIdGenerator.getAndIncrement();
}

private void registerCustomHandlerWithNotification(
Expand Down

0 comments on commit dc9773d

Please sign in to comment.