Skip to content

Commit

Permalink
merge: #10812
Browse files Browse the repository at this point in the history
10812: test: handle temporarily missing snapshots more gracefully r=oleschoenburg a=oleschoenburg

After triggering the snapshot it's not necessarily immediately available so with this change, we simply wait for the snapshot to be available before trying to parse the snapshot name. Previously, the parsing could fail with an NPE.

Co-authored-by: Ole Schönburg <ole.schoenburg@gmail.com>
  • Loading branch information
zeebe-bors-camunda[bot] and lenaschoenburg committed Oct 27, 2022
2 parents 4f768f7 + 89650fb commit 2854485
Showing 1 changed file with 24 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,9 @@
import io.zeebe.containers.ZeebeVolume;
import io.zeebe.containers.exporter.DebugReceiver;
import java.time.Duration;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.IntStream;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.Test;

Expand Down Expand Up @@ -57,12 +58,16 @@ void shouldIncludeExportedPositionInSnapshot() {
final var snapshotId =
Awaitility.await("Snapshot is taken")
.atMost(Duration.ofSeconds(60))
.until(() -> partitions.query().get(1).snapshotId(), Objects::nonNull);
.until(
() ->
Optional.ofNullable(partitions.query().get(1).snapshotId())
.flatMap(FileBasedSnapshotId::ofFileName),
Optional::isPresent)
.orElseThrow();

// then
final var exportedPositionInSnapshot =
FileBasedSnapshotId.ofFileName(snapshotId).orElseThrow().getExportedPosition();
assertThat(exportedPositionInSnapshot).isEqualTo(exporterPosition);
assertThat(snapshotId)
.returns(exporterPosition, FileBasedSnapshotId::getExportedPosition);
}
}
}
Expand Down Expand Up @@ -100,11 +105,13 @@ void shouldTakeSnapshotWhenExporterPositionIsMinusOne() {
.during(Duration.ofSeconds(5))
.until(
() ->
FileBasedSnapshotId.ofFileName(partitions.query().get(1).snapshotId())
.orElseThrow(),
hasStableValue());
Optional.ofNullable(partitions.query().get(1).snapshotId())
.flatMap(FileBasedSnapshotId::ofFileName),
hasStableValue())
.orElseThrow();

assertThat(snapshotWithExporters).returns(0L, FileBasedSnapshotId::getExportedPosition);
Assertions.assertThat(snapshotWithExporters)
.returns(0L, FileBasedSnapshotId::getExportedPosition);
}
}
}
Expand Down Expand Up @@ -134,12 +141,12 @@ void shouldNotTakeNewSnapshotWhenAddingNewExporter() {
snapshotWithoutExporters =
Awaitility.await("Snapshot is taken")
.atMost(Duration.ofSeconds(60))
.during(Duration.ofSeconds(5))
.until(
() ->
FileBasedSnapshotId.ofFileName(partitions.query().get(1).snapshotId())
.orElseThrow(),
hasStableValue());
Optional.ofNullable(partitions.query().get(1).snapshotId())
.flatMap(FileBasedSnapshotId::ofFileName),
Optional::isPresent)
.orElseThrow();
}

// when -- taking snapshot on broker with exporters configured
Expand All @@ -160,9 +167,10 @@ void shouldNotTakeNewSnapshotWhenAddingNewExporter() {
.during(Duration.ofSeconds(5))
.until(
() ->
FileBasedSnapshotId.ofFileName(partitions.query().get(1).snapshotId())
.orElseThrow(),
hasStableValue());
Optional.ofNullable(partitions.query().get(1).snapshotId())
.flatMap(FileBasedSnapshotId::ofFileName),
hasStableValue())
.orElseThrow();
}
}

Expand Down

0 comments on commit 2854485

Please sign in to comment.