Skip to content

Commit

Permalink
merge: #10844
Browse files Browse the repository at this point in the history
10844: Yield actor thread between retries r=npepinpe a=npepinpe

## Description

Ensures retry strategies are yielding their thread between retries. Note that this doesn't really apply to the `BackOffRetryStrategy`, as it's already yielding via its usage of timers.

This PR also updates the `RetryStrategyTest` to junit 5, and adds test coverage by applying the existing tests to the other strategies where applicable (previously, only the `RecoverableRetryStrategy` and `AbortableRetryStrategy` were tested there).

## Related issues

<!-- Which issues are closed by this PR or are related -->

closes #10539 



Co-authored-by: Nicolas Pepin-Perreault <nicolas.pepin-perreault@camunda.com>
  • Loading branch information
zeebe-bors-camunda[bot] and npepinpe committed Oct 31, 2022
2 parents 991fb6a + d2999fd commit 4ef03a8
Show file tree
Hide file tree
Showing 6 changed files with 156 additions and 77 deletions.
5 changes: 5 additions & 0 deletions scheduler/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,11 @@
<artifactId>junit-jupiter-api</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-params</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ private void run() {
final var control = retryMechanism.run();
if (control == Control.RETRY) {
actor.run(this::run);
actor.yieldThread();
}
} catch (final Exception exception) {
currentFuture.completeExceptionally(exception);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,14 @@ private void run() {
final var control = retryMechanism.run();
if (control == Control.RETRY) {
actor.run(this::run);
actor.yieldThread();
}
} catch (final Exception exception) {
if (terminateCondition.getAsBoolean()) {
currentFuture.complete(false);
} else {
actor.run(this::run);
actor.yieldThread();
LOG.error(
"Caught exception {} with message {}, will retry...",
exception.getClass(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,12 @@ private void run() {
final var control = retryMechanism.run();
if (control == Control.RETRY) {
actor.run(this::run);
actor.yieldThread();
}
} catch (final RecoverableException ex) {
if (!terminateCondition.getAsBoolean()) {
actor.run(this::run);
actor.yieldThread();
}
} catch (final Exception exception) {
currentFuture.completeExceptionally(exception);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,140 +12,128 @@
import io.camunda.zeebe.scheduler.Actor;
import io.camunda.zeebe.scheduler.ActorControl;
import io.camunda.zeebe.scheduler.future.ActorFuture;
import io.camunda.zeebe.scheduler.testing.ControlledActorSchedulerRule;
import java.lang.reflect.Constructor;
import io.camunda.zeebe.scheduler.testing.ControlledActorSchedulerExtension;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.LinkedTransferQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameter;
import org.junit.runners.Parameterized.Parameters;

@RunWith(Parameterized.class)
public final class RetryStrategyTest {

@Rule
public final ControlledActorSchedulerRule schedulerRule = new ControlledActorSchedulerRule();

@Parameter public Class<RetryStrategy> retryStrategyClass;
private RetryStrategy retryStrategy;
private ActorControl actorControl;
private ActorFuture<Boolean> resultFuture;
import java.util.function.Function;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

@Parameters(name = "{index}: {0}")
public static Object[][] reprocessingTriggers() {
return new Object[][] {
new Object[] {RecoverableRetryStrategy.class}, new Object[] {AbortableRetryStrategy.class}
};
}
final class RetryStrategyTest {

@Before
public void setUp() {
final ControllableActor actor = new ControllableActor();
actorControl = actor.getActor();

try {
final Constructor<RetryStrategy> constructor =
retryStrategyClass.getConstructor(ActorControl.class);
retryStrategy = constructor.newInstance(actorControl);
} catch (final Exception e) {
throw new RuntimeException(e);
}
/** Ensure we use a single thread to better control the scheduling in the tests. */
@RegisterExtension
private final ControlledActorSchedulerExtension schedulerRule =
new ControlledActorSchedulerExtension(
builder -> builder.setIoBoundActorThreadCount(0).setCpuBoundActorThreadCount(1));

schedulerRule.submitActor(actor);
}
private ActorFuture<Boolean> resultFuture;

@Test
public void shouldRunUntilDone() throws Exception {
@ParameterizedTest
@ValueSource(strings = {"endless", "recoverable", "abortable", "backoff"})
void shouldRunUntilDone(final TestCase<?> test) {
// given
final AtomicInteger count = new AtomicInteger(0);
final var count = new AtomicInteger(0);
schedulerRule.submitActor(test.actor);

// when
actorControl.run(
() -> {
resultFuture = retryStrategy.runWithRetry(() -> count.incrementAndGet() == 10);
});

test.actor.run(
() -> resultFuture = test.strategy.runWithRetry(() -> count.incrementAndGet() == 10));
schedulerRule.workUntilDone();

// then
assertThat(count.get()).isEqualTo(10);
assertThat(resultFuture.isDone()).isTrue();
assertThat(resultFuture.get()).isTrue();
assertThat(resultFuture).succeedsWithin(Duration.ZERO).isEqualTo(true);
}

@Test
public void shouldStopWhenAbortConditionReturnsTrue() throws Exception {
@ParameterizedTest
@ValueSource(strings = {"endless", "recoverable", "abortable", "backoff"})
void shouldStopWhenAbortConditionReturnsTrue(final TestCase<?> test) {
// given
final AtomicInteger count = new AtomicInteger(0);
schedulerRule.submitActor(test.actor);

// when
actorControl.run(
() -> {
resultFuture =
retryStrategy.runWithRetry(() -> false, () -> count.incrementAndGet() == 10);
});

test.actor.run(
() ->
resultFuture =
test.strategy.runWithRetry(() -> false, () -> count.incrementAndGet() == 10));
schedulerRule.workUntilDone();

// then
assertThat(count.get()).isEqualTo(10);
assertThat(resultFuture.isDone()).isTrue();
assertThat(resultFuture.get()).isFalse();
assertThat(resultFuture).succeedsWithin(Duration.ZERO).isEqualTo(false);
}

@Test
public void shouldAbortOnOtherException() {
/**
* Only the {@link RecoverableRetryStrategy} and {@link AbortableRetryStrategy} stop retrying when
* an unrecoverable exception occurs; the others will always retry. We may want to extract this to
* specific class tests?
*/
@ParameterizedTest
@ValueSource(strings = {"recoverable", "abortable"})
void shouldAbortOnOtherException(final TestCase<?> test) {
// given
final RuntimeException failure = new RuntimeException("expected");
schedulerRule.submitActor(test.actor);

// when
actorControl.run(
test.actor.run(
() ->
resultFuture =
retryStrategy.runWithRetry(
test.strategy.runWithRetry(
() -> {
throw new RuntimeException("expected");
throw failure;
}));

schedulerRule.workUntilDone();

// then
assertThat(resultFuture.isDone()).isTrue();
assertThat(resultFuture.isCompletedExceptionally()).isTrue();
assertThat(resultFuture.getException()).isExactlyInstanceOf(RuntimeException.class);
assertThat(resultFuture)
.failsWithin(Duration.ZERO)
.withThrowableOfType(ExecutionException.class)
.withCause(failure);
}

@Test
public void shouldNotInterleaveRetry() {
/**
* The {@link BackOffRetryStrategy} is excluded here because its usage of timers necessarily allow
* interleaving calls. If we decide to fix it, then we should refactor the strategy and add it as
* a test case here.
*/
@ParameterizedTest
@ValueSource(strings = {"endless", "recoverable", "abortable"})
void shouldNotInterleaveRetry(final TestCase<?> test) {
// given
final AtomicReference<ActorFuture<Boolean>> firstFuture = new AtomicReference<>();
final AtomicReference<ActorFuture<Boolean>> secondFuture = new AtomicReference<>();

final AtomicInteger executionAttempt = new AtomicInteger(0);
final AtomicInteger firstResult = new AtomicInteger();
final AtomicInteger secondResult = new AtomicInteger();
schedulerRule.submitActor(test.actor);

// when
final var retryCounts = 5;
actorControl.run(
test.actor.run(
() ->
firstFuture.set(
retryStrategy.runWithRetry(
test.strategy.runWithRetry(
() -> {
firstResult.set(executionAttempt.getAndIncrement());
return executionAttempt.get() >= retryCounts;
})));
actorControl.run(
test.actor.run(
() ->
secondFuture.set(
retryStrategy.runWithRetry(
test.strategy.runWithRetry(
() -> {
secondResult.set(executionAttempt.getAndIncrement());
return true;
})));

schedulerRule.workUntilDone();

// then
Expand All @@ -155,6 +143,74 @@ public void shouldNotInterleaveRetry() {
assertThat(secondResult).hasValue(retryCounts);
}

/**
* The {@link BackOffRetryStrategy} is excluded here as it is already yielding implicitly by using
* timers for scheduling.
*/
@ParameterizedTest
@ValueSource(strings = {"endless", "recoverable", "abortable"})
void shouldYieldThreadOnRetry(final TestCase<?> test) {
// given - all actors share the same thread, force interleaving of their execution to ensure the
// retry strategy yields the thread in between retries
final var barrier = new LinkedTransferQueue<Boolean>();
final var future = new CompletableFuture<Void>();
final var secondActor = new ControllableActor();
schedulerRule.submitActor(test.actor);
schedulerRule.submitActor(secondActor);

// when
test.strategy.runWithRetry(
() -> {
// capture the result before to ensure we're looping
final boolean shouldRetry = !future.isDone();
barrier.offer(true);
return shouldRetry;
});
// toggle the retry strategy to stop retrying, letting workUntilDone finish
secondActor.run(
() -> {
// wait until the test actor ran at least once, guaranteeing it's currently looping
// and retrying
barrier.poll();
future.complete(null);
});
// wrap workUntilDone in a timeout condition, as otherwise the test hangs forever there if the
// actors are not yielding
Awaitility.await("workUntilDone should be finite if each actor yields the thread")
.atMost(Duration.ofSeconds(30))
.untilAsserted(schedulerRule::workUntilDone);

// then
assertThat(future)
.as("future is completed iff second actor can run")
.succeedsWithin(Duration.ofSeconds(2));
}

private record TestCase<T extends RetryStrategy>(ControllableActor actor, T strategy) {

// used to generate test cases in conjunction with @ValueSource
// https://junit.org/junit5/docs/current/user-guide/#writing-tests-parameterized-tests-argument-conversion-implicit
@SuppressWarnings("unused")
static TestCase<?> of(final String type) {
return switch (type.toLowerCase()) {
case "endless" -> TestCase.of(EndlessRetryStrategy::new);
case "recoverable" -> TestCase.of(RecoverableRetryStrategy::new);
case "abortable" -> TestCase.of(AbortableRetryStrategy::new);
case "backoff" -> TestCase.of(actor -> new BackOffRetryStrategy(actor, Duration.ZERO));
default -> throw new IllegalArgumentException(
"Expected one of ['endless', 'recoverable', 'abortable', or 'backoff'], but got "
+ type);
};
}

private static <T extends RetryStrategy> TestCase<T> of(
final Function<ActorControl, T> provider) {
final var actor = new ControllableActor();
final var strategy = provider.apply(actor.getActor());
return new TestCase<>(actor, strategy);
}
}

private static final class ControllableActor extends Actor {
public ActorControl getActor() {
return actor;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,27 @@
import io.camunda.zeebe.scheduler.clock.ActorClock;
import io.camunda.zeebe.scheduler.clock.ControlledActorClock;
import io.camunda.zeebe.scheduler.future.ActorFuture;
import java.util.Objects;
import java.util.function.Consumer;
import org.junit.jupiter.api.extension.AfterEachCallback;
import org.junit.jupiter.api.extension.BeforeEachCallback;
import org.junit.jupiter.api.extension.ExtensionContext;

public class ControlledActorSchedulerExtension implements BeforeEachCallback, AfterEachCallback {

private final Consumer<ActorSchedulerBuilder> configurator;

private ActorScheduler actorScheduler;
private ControlledActorThread controlledActorTaskRunner;

public ControlledActorSchedulerExtension() {
this(builder -> {});
}

public ControlledActorSchedulerExtension(final Consumer<ActorSchedulerBuilder> configurator) {
this.configurator = Objects.requireNonNull(configurator, "must specify a configurator");
}

@Override
public void afterEach(final ExtensionContext extensionContext) throws Exception {
actorScheduler.stop();
Expand All @@ -45,6 +57,7 @@ public void beforeEach(final ExtensionContext extensionContext) throws Exception
.setActorThreadFactory(actorTaskRunnerFactory)
.setActorTimerQueue(timerQueue);

configurator.accept(builder);
actorScheduler = builder.build();
controlledActorTaskRunner = actorTaskRunnerFactory.controlledThread;
actorScheduler.start();
Expand Down

0 comments on commit 4ef03a8

Please sign in to comment.