Skip to content

Commit

Permalink
merge: #10867
Browse files Browse the repository at this point in the history
10867: Unflake timer start event test r=remcowesterhoud a=korthout

## Description

<!-- Please explain the changes you made here. -->

Most cases in the timer start event test were suffering from flakiness. This stabilizes these tests again.

The cause of the flakiness was a race condition between time traveling and scheduling an upcoming timer:
 - the time would be increased after the deployment is completed
 - that includes writing all the deployment events (up to and including fully_distributed)
 - it does not await post-commit task execution
 - upcoming timers are scheduled in a post-commit task
 - this post-commit task could be executed after the time traveling
 - in that case the timer does not trigger after the time travel, as it is scheduled in the future

This changes the time travel to guarantee that the post-commit tasks have been executed. It does that by awaiting that the engine reaches the end of the log.

## Related issues

<!-- Which issues are closed by this PR or are related -->

closes #10272 



Co-authored-by: Nico Korthout <nico.korthout@camunda.com>
Co-authored-by: Remco Westerhoud <remco@westerhoud.nl>
  • Loading branch information
3 people committed Nov 2, 2022
2 parents 4498022 + 5c467e4 commit f93a7e1
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -204,8 +204,6 @@ public void shouldTriggerTimer() {
.withProcessInstanceKey(processInstanceKey)
.getFirst();

ENGINE.increaseTime(Duration.ofSeconds(1));

// then
final Record<TimerRecordValue> triggeredEvent =
RecordingExporter.timerRecords(TimerIntent.TRIGGERED)
Expand All @@ -215,7 +213,7 @@ public void shouldTriggerTimer() {
assertThat(triggeredEvent.getKey()).isEqualTo(createdEvent.getKey());
assertThat(triggeredEvent.getValue()).isEqualTo(createdEvent.getValue());
assertThat(Duration.ofMillis(triggeredEvent.getTimestamp() - createdEvent.getTimestamp()))
.isGreaterThanOrEqualTo(Duration.ofSeconds(1));
.isBetween(Duration.ofMillis(100), Duration.ofMillis(150));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.util.stream.Collectors;
import org.awaitility.Awaitility;
import org.junit.Rule;
import org.junit.Test;

Expand Down Expand Up @@ -1090,45 +1089,28 @@ public void shouldTriggerOnlyTwice() {
// when
engine.increaseTime(Duration.ofSeconds(5));

// disable because we'll await with Awaitility
RecordingExporter.disableAwaitingIncomingRecords();

// then
Awaitility.await()
.untilAsserted(
() -> {
// due timers are only checked if the engine is not currently processing #10112
// so we may need to move the clock slightly for each check
engine.increaseTime(Duration.ofMillis(100));
assertThat(RecordingExporter.timerRecords(TimerIntent.TRIGGERED).limit(4))
.extracting(record -> record.getValue().getProcessDefinitionKey())
.containsExactly(
firstDeploymentProcessDefinitionKey,
secondDeploymentProcessDefinitionKey,
firstDeploymentProcessDefinitionKey,
secondDeploymentProcessDefinitionKey);
});
assertThat(RecordingExporter.timerRecords(TimerIntent.TRIGGERED).limit(4))
.extracting(record -> record.getValue().getProcessDefinitionKey())
.containsExactly(
firstDeploymentProcessDefinitionKey,
secondDeploymentProcessDefinitionKey,
firstDeploymentProcessDefinitionKey,
secondDeploymentProcessDefinitionKey);

// when
engine.increaseTime(Duration.ofSeconds(10));

// then
Awaitility.await()
.untilAsserted(
() -> {
// due timers are only checked if the engine is not currently processing #10112
// so we may need to move the clock slightly for each check
engine.increaseTime(Duration.ofMillis(100));
assertThat(RecordingExporter.timerRecords(TimerIntent.TRIGGERED).limit(5))
.describedAs("Expect that start_1 triggered twice and start_2 triggered thrice")
.extracting(record -> record.getValue().getProcessDefinitionKey())
.containsExactly(
firstDeploymentProcessDefinitionKey,
secondDeploymentProcessDefinitionKey,
firstDeploymentProcessDefinitionKey,
secondDeploymentProcessDefinitionKey,
secondDeploymentProcessDefinitionKey);
});
assertThat(RecordingExporter.timerRecords(TimerIntent.TRIGGERED).limit(5))
.describedAs("Expect that start_1 triggered twice and start_2 triggered thrice")
.extracting(record -> record.getValue().getProcessDefinitionKey())
.containsExactly(
firstDeploymentProcessDefinitionKey,
secondDeploymentProcessDefinitionKey,
firstDeploymentProcessDefinitionKey,
secondDeploymentProcessDefinitionKey,
secondDeploymentProcessDefinitionKey);
}

@Test
Expand Down Expand Up @@ -1284,11 +1266,10 @@ public void shouldAvoidTriggeringMultipleTimes() {
final long processDefinitionKey = deployedProcess.getProcessDefinitionKey();

// when
engine.stop();
final long engineStoppedTime = engine.getClock().getCurrentTimeInMillis();
engine.forEachPartition(engine::pauseProcessing);
final long enginePausedTime = engine.getClock().getCurrentTimeInMillis();
engine.increaseTime(Duration.ofMinutes(35));
RecordingExporter.reset();
engine.start();
engine.forEachPartition(engine::resumeProcessing);

// then
final Record<TimerRecordValue> firstRecord =
Expand All @@ -1301,7 +1282,7 @@ public void shouldAvoidTriggeringMultipleTimes() {
.hasTargetElementId("start")
.hasElementInstanceKey(TimerInstance.NO_ELEMENT_INSTANCE);

assertThat(firstRecord.getTimestamp()).isGreaterThan(engineStoppedTime);
assertThat(firstRecord.getTimestamp()).isGreaterThan(enginePausedTime);

final TimerRecordValue secondTimerRecord =
RecordingExporter.timerRecords(TimerIntent.CREATED)
Expand Down
13 changes: 13 additions & 0 deletions engine/src/test/java/io/camunda/zeebe/engine/util/EngineRule.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import io.camunda.zeebe.scheduler.future.ActorFuture;
import io.camunda.zeebe.scheduler.future.CompletableActorFuture;
import io.camunda.zeebe.streamprocessor.StreamProcessor;
import io.camunda.zeebe.streamprocessor.StreamProcessor.Phase;
import io.camunda.zeebe.streamprocessor.StreamProcessorListener;
import io.camunda.zeebe.streamprocessor.StreamProcessorMode;
import io.camunda.zeebe.streamprocessor.TypedRecordImpl;
Expand Down Expand Up @@ -233,6 +234,18 @@ public void forEachPartition(final Consumer<Integer> partitionIdConsumer) {
}

public void increaseTime(final Duration duration) {
final var streamProcessor = environmentRule.getStreamProcessor(PARTITION_ID);
if (streamProcessor.getCurrentPhase().join() == Phase.PROCESSING) {
// When time traveling, we're generally want to make sure that the entire state machine cycle
// for processing a record is completed, including the execution of post-commit tasks. For
// example, we're often interested in scheduled timers when time traveling in tests, for which
// the due date checker is scheduled through a post-commit task. When the engine has reached
// the end of the log, all post-commit tasks have also been applied, because the state machine
// will have executed them before switching the hasReachEnd flag.
Awaitility.await("Expect that engine reaches the end of the log before increasing the time")
.until(this::hasReachedEnd);
}

environmentRule.getClock().addTime(duration);
}

Expand Down

0 comments on commit f93a7e1

Please sign in to comment.