Skip to content

Commit

Permalink
merge: #8447 #8449
Browse files Browse the repository at this point in the history
8447: [Backport stable/1.1] deps(maven): bump version.elasticsearch from 7.16.1 to 7.16.2 r=menski a=npepinpe

Bumps `version.elasticsearch` from 7.16.1 to 7.16.2.

Updates `elasticsearch-x-content` from 7.16.1 to 7.16.2
- [Release notes](https://github.com/elastic/elasticsearch/releases)
- [Commits](elastic/elasticsearch@v7.16.1...v7.16.2)

Updates `elasticsearch-rest-client` from 7.16.1 to 7.16.2
- [Release notes](https://github.com/elastic/elasticsearch/releases)
- [Commits](elastic/elasticsearch@v7.16.1...v7.16.2)

---
updated-dependencies:
- dependency-name: org.elasticsearch:elasticsearch-x-content
  dependency-type: direct:production
  update-type: version-update:semver-patch
- dependency-name: org.elasticsearch.client:elasticsearch-rest-client
  dependency-type: direct:development
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
(cherry picked from commit 4f41e4f)

8449: [Backport stable/1.1] fix(polling): respect request timeout settings r=oleschoenburg a=github-actions[bot]

# Description
Backport of #8391 to `stable/1.1`.

relates to #8310 #8389

Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: Sebastian Menski <sebastian.menski@camunda.com>
Co-authored-by: Roman <roman.smirnov@camunda.com>
Co-authored-by: Ole Schönburg <ole.schoenburg@gmail.com>
  • Loading branch information
5 people committed Dec 20, 2021
3 parents 913e0e5 + c5a091a + 4a24a26 commit f9a9f1f
Show file tree
Hide file tree
Showing 6 changed files with 129 additions and 46 deletions.
4 changes: 2 additions & 2 deletions exporters/elasticsearch-exporter/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ version: '3'

services:
elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch:7.16.1
image: docker.elastic.co/elasticsearch/elasticsearch:7.16.2
ports:
- "9200:9200"
- "9300:9300"
Expand All @@ -12,7 +12,7 @@ services:
- ES_JAVA_OPTS=-Xmx750m -Xms750m

kibana:
image: docker.elastic.co/kibana/kibana:7.16.1
image: docker.elastic.co/kibana/kibana:7.16.2
ports:
- "5601:5601"
links:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,13 @@
*/
package io.camunda.zeebe.gateway.impl.job;

import io.camunda.zeebe.gateway.Loggers;
import io.camunda.zeebe.gateway.metrics.LongPollingMetrics;
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;

public final class InFlightLongPollingActivateJobsRequestsState {

private static final Logger LOGGER = Loggers.GATEWAY_LOGGER;

private final String jobType;
private final LongPollingMetrics metrics;
private final Queue<LongPollingActivateJobsRequest> activeRequests = new LinkedList<>();
Expand All @@ -27,7 +23,7 @@ public final class InFlightLongPollingActivateJobsRequestsState {
private int failedAttempts;
private long lastUpdatedTime;

private AtomicBoolean ongoingNotification = new AtomicBoolean(false);
private final AtomicBoolean ongoingNotification = new AtomicBoolean(false);

public InFlightLongPollingActivateJobsRequestsState(
final String jobType, final LongPollingMetrics metrics) {
Expand All @@ -40,11 +36,8 @@ public void incrementFailedAttempts(final long lastUpdatedTime) {
this.lastUpdatedTime = lastUpdatedTime;
}

public void setFailedAttempts(final int failedAttempts) {
this.failedAttempts = failedAttempts;
if (failedAttempts == 0) {
activeRequestsToBeRepeated.addAll(activeRequests);
}
public boolean shouldAttempt(final int attemptThreshold) {
return failedAttempts < attemptThreshold;
}

public void resetFailedAttempts() {
Expand All @@ -55,6 +48,13 @@ public int getFailedAttempts() {
return failedAttempts;
}

public void setFailedAttempts(final int failedAttempts) {
this.failedAttempts = failedAttempts;
if (failedAttempts == 0) {
activeRequestsToBeRepeated.addAll(activeRequests);
}
}

public long getLastUpdatedTime() {
return lastUpdatedTime;
}
Expand Down Expand Up @@ -112,10 +112,11 @@ public boolean hasActiveRequests() {

/**
* Returns whether the request should be repeated. A request should be repeated if the failed
* attempts were reset to 0 (because new jobs became available) whilst the request was running
* attempts were reset to 0 (because new jobs became available) whilst the request was running,
* and if the request's long polling is enabled.
*/
public boolean shouldBeRepeated(final LongPollingActivateJobsRequest request) {
return activeRequestsToBeRepeated.contains(request);
return activeRequestsToBeRepeated.contains(request) && !request.isLongPollingDisabled();
}

public boolean shouldNotifyAndStartNotification() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,16 +85,45 @@ public void activateJobs(
activateJobs(longPollingRequest);
}

private void completeOrResubmitRequest(
final LongPollingActivateJobsRequest request, final boolean activateImmediately) {
if (request.isLongPollingDisabled()) {
// request is not supposed to use the
// long polling capabilities -> just
// complete the request
request.complete();
return;
}

if (request.isTimedOut()) {
// already timed out, nothing to do here
return;
}

final var type = request.getType();
final var state = getJobTypeState(type);

if (!request.hasScheduledTimer()) {
addTimeOut(state, request);
}

if (activateImmediately) {
activateJobs(request);
} else {
enqueueRequest(state, request);
}
}

public void activateJobs(final LongPollingActivateJobsRequest request) {
actor.run(
() -> {
final InFlightLongPollingActivateJobsRequestsState state =
getJobTypeState(request.getType());

if (state.getFailedAttempts() < failedAttemptThreshold) {
if (state.shouldAttempt(failedAttemptThreshold)) {
activateJobsUnchecked(state, request);
} else {
completeOrEnqueueRequest(state, request);
completeOrResubmitRequest(request, false);
}
});
}
Expand Down Expand Up @@ -169,15 +198,9 @@ private void onCompleted(
actor.submit(
() -> {
state.incrementFailedAttempts(currentTimeMillis());

final boolean shouldBeRepeated = state.shouldBeRepeated(request);
state.removeActiveRequest(request);

if (shouldBeRepeated) {
activateJobs(request);
} else {
completeOrEnqueueRequest(getJobTypeState(request.getType()), request);
}
completeOrResubmitRequest(request, shouldBeRepeated);
});
}
} else {
Expand Down Expand Up @@ -220,26 +243,17 @@ private void resetFailedAttemptsAndHandlePendingRequests(final String jobType) {
}
}

private void completeOrEnqueueRequest(
private void enqueueRequest(
final InFlightLongPollingActivateJobsRequestsState state,
final LongPollingActivateJobsRequest request) {
if (request.isLongPollingDisabled()) {
request.complete();
return;
}
if (!request.isTimedOut()) {
LOG.trace(
"Worker '{}' asked for '{}' jobs of type '{}', but none are available. This request will"
+ " be kept open until a new job of this type is created or until timeout of '{}'.",
request.getWorker(),
request.getMaxJobsToActivate(),
request.getType(),
request.getLongPollingTimeout(longPollingTimeout));
state.enqueueRequest(request);
if (!request.hasScheduledTimer()) {
addTimeOut(state, request);
}
}
LOG.trace(
"Worker '{}' asked for '{}' jobs of type '{}', but none are available. This request will"
+ " be kept open until a new job of this type is created or until timeout of '{}'.",
request.getWorker(),
request.getMaxJobsToActivate(),
request.getType(),
request.getLongPollingTimeout(longPollingTimeout));
state.enqueueRequest(request);
}

private void addTimeOut(
Expand All @@ -251,8 +265,8 @@ private void addTimeOut(
actor.runDelayed(
requestTimeout,
() -> {
state.removeRequest(request);
request.timeout();
state.removeRequest(request);
});
request.setScheduledTimer(timeout);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,13 @@ private LongPollingActivateJobsRequest(
final ServerStreamObserver<ActivateJobsResponse> responseObserver,
final String jobType,
final String worker,
final int maxJobstoActivate,
final int maxJobsToActivate,
final long longPollingTimeout) {
this.request = request;
this.responseObserver = responseObserver;
this.jobType = jobType;
this.worker = worker;
maxJobsToActivate = maxJobstoActivate;
this.maxJobsToActivate = maxJobsToActivate;
this.longPollingTimeout =
longPollingTimeout == 0 ? null : Duration.ofMillis(longPollingTimeout);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import static io.camunda.zeebe.test.util.TestUtil.waitUntil;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.timeout;
Expand Down Expand Up @@ -43,6 +44,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.junit.Before;
Expand All @@ -58,7 +60,7 @@ public final class LongPollingActivateJobsTest {
private static final long LONG_POLLING_TIMEOUT = 5000;
private static final long PROBE_TIMEOUT = 20000;
private static final int FAILED_RESPONSE_THRESHOLD = 3;
protected final ControlledActorClock actorClock = new ControlledActorClock();
private final ControlledActorClock actorClock = new ControlledActorClock();
@Rule public final ActorSchedulerRule actorSchedulerRule = new ActorSchedulerRule(actorClock);
private LongPollingActivateJobsHandler handler;
private ActivateJobsStub stub;
Expand Down Expand Up @@ -596,6 +598,57 @@ public BrokerResponse<?> handle(final BrokerActivateJobsRequest request)
assertThat(request.hasScheduledTimer()).isFalse();
}

@Test
public void shouldCompleteRequestImmediatelyDespiteNotification() throws Exception {
// given
final LongPollingActivateJobsRequest request =
new LongPollingActivateJobsRequest(
ActivateJobsRequest.newBuilder()
.setType(TYPE)
.setRequestTimeout(-1)
.setMaxJobsToActivate(1)
.build(),
spy(ServerStreamObserver.class));

registerCustomHandlerWithNotification(
(r) -> {
final var partitionId = r.getPartitionId();
if (partitionId == 1) {
brokerClient.notifyJobsAvailable(TYPE);
}
});

// when
handler.activateJobs(request);

// then
waitUntil(request::isCompleted);
verify(stub, times(partitionsCount)).handle(any());
}

@Test
public void shouldTimeOutRequestDespiteMultipleNotificationLoops() throws Exception {
// given
final var request = getLongPollingActivateJobsRequest();

registerCustomHandlerWithNotification(
(r) -> {
final var partitionId = r.getPartitionId();
if (partitionId == 1) {
brokerClient.notifyJobsAvailable(TYPE);
}
});

// when
handler.activateJobs(request);
waitUntil(request::hasScheduledTimer);
actorClock.addTime(Duration.ofMillis(LONG_POLLING_TIMEOUT));
waitUntil(request::isTimedOut);

// then
verify(stub, atLeast(partitionsCount)).handle(any());
}

private List<LongPollingActivateJobsRequest> activateJobsAndWaitUntilBlocked(final int amount) {
return IntStream.range(0, amount)
.boxed()
Expand Down Expand Up @@ -624,4 +677,19 @@ private LongPollingActivateJobsRequest getLongPollingActivateJobsRequest(final S

return new LongPollingActivateJobsRequest(request, responseSpy);
}

private void registerCustomHandlerWithNotification(
final Consumer<BrokerActivateJobsRequest> notification) {
brokerClient.registerHandler(
BrokerActivateJobsRequest.class,
new RequestHandler<BrokerActivateJobsRequest, BrokerResponse<?>>() {

@Override
public BrokerResponse<?> handle(final BrokerActivateJobsRequest request)
throws Exception {
notification.accept(request);
return stub.handle(request);
}
});
}
}
2 changes: 1 addition & 1 deletion parent/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
<version.commons-math>3.6.1</version.commons-math>
<version.commons-codec>1.15</version.commons-codec>
<version.docker-java-api>3.2.8</version.docker-java-api>
<version.elasticsearch>7.16.1</version.elasticsearch>
<version.elasticsearch>7.16.2</version.elasticsearch>
<version.error-prone>2.7.1</version.error-prone>
<version.grpc>1.42.0</version.grpc>
<version.gson>2.8.9</version.gson>
Expand Down

0 comments on commit f9a9f1f

Please sign in to comment.