Skip to content

Commit

Permalink
Merge #6428
Browse files Browse the repository at this point in the history
6428: [BACKPORT 0.26] fix(broker): fix race condition in persisting snapshot r=Zelldon a=MiguelPires

## Description

Backports #6383. No changes were made to the PR.

## Related issues

closes #6377

## Definition of Done

_Not all items need to be done depending on the issue and the pull request._

Code changes:
* [ ] The changes are backwards compatibility with previous versions
* [ ] If it fixes a bug then PRs are created to [backport](https://github.com/zeebe-io/zeebe/compare/stable/0.24...develop?expand=1&template=backport_template.md&title=[Backport%200.24]) the fix to the last two minor versions. You can trigger a backport by assigning labels (e.g. `backport stable/0.25`) to the PR, in case that fails you need to create backports manually.

Testing:
* [ ] There are unit/integration tests that verify all acceptance criterias of the issue
* [ ] New tests are written to ensure backwards compatibility with further versions
* [ ] The behavior is tested manually
* [ ] The change has been verified by a QA run
* [ ] The impact of the changes is verified by a benchmark 

Documentation: 
* [ ] The documentation is updated (e.g. BPMN reference, configuration, examples, get-started guides, etc.)
* [ ] New content is added to the [release announcement](https://drive.google.com/drive/u/0/folders/1DTIeswnEEq-NggJ25rm2BsDjcCQpDape)


Co-authored-by: Miguel Pires <miguel.pires@camunda.com>
  • Loading branch information
zeebe-bors[bot] and Miguel Pires committed Feb 23, 2021
2 parents b879f4b + b0df37d commit 49290da
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ public final class AsyncSnapshotDirector extends Actor {
private TransientSnapshot pendingSnapshot;
private long lowerBoundSnapshotPosition;
private boolean takingSnapshot;
private boolean persistingSnapshot;

public AsyncSnapshotDirector(
final int nodeId,
Expand Down Expand Up @@ -179,6 +180,7 @@ private void takeSnapshot(final long initialCommitPosition) {
LOG.info(
LOG_MSG_WAIT_UNTIL_COMMITTED, endPosition, initialCommitPosition);
lastWrittenEventPosition = endPosition;
persistingSnapshot = false;
persistSnapshotIfLastWrittenPositionCommitted();
} else {
lastWrittenEventPosition = null;
Expand All @@ -198,7 +200,9 @@ private void persistSnapshotIfLastWrittenPositionCommitted() {
(currentCommitPosition, error) -> {
if (pendingSnapshot != null
&& lastWrittenEventPosition != null
&& currentCommitPosition >= lastWrittenEventPosition) {
&& currentCommitPosition >= lastWrittenEventPosition
&& !persistingSnapshot) {
persistingSnapshot = true;

final var snapshotPersistFuture = pendingSnapshot.persist();

Expand All @@ -216,6 +220,7 @@ private void persistSnapshotIfLastWrittenPositionCommitted() {
lastWrittenEventPosition = null;
takingSnapshot = false;
pendingSnapshot = null;
persistingSnapshot = false;
});
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ public final class FileBasedTransientSnapshot implements TransientSnapshot {
private final FileBasedSnapshotMetadata metadata;
private final ActorFuture<Boolean> takenFuture = new CompletableActorFuture<>();
private boolean isValid = false;
private PersistedSnapshot snapshot;

FileBasedTransientSnapshot(
final FileBasedSnapshotMetadata metadata,
Expand Down Expand Up @@ -66,6 +67,8 @@ private void takeInternal(final Predicate<Path> takeSnapshot) {
if (!isValid) {
abortInternal();
}

snapshot = null;
takenFuture.complete(isValid);
} catch (final Exception exception) {
LOGGER.warn("Unexpected exception on taking snapshot ({})", metadata, exception);
Expand All @@ -91,6 +94,10 @@ public ActorFuture<PersistedSnapshot> persist() {
final CompletableActorFuture<PersistedSnapshot> future = new CompletableActorFuture<>();
actor.call(
() -> {
if (snapshot != null) {
future.complete(snapshot);
return;
}
if (!takenFuture.isDone()) {
future.completeExceptionally(new IllegalStateException("Snapshot is not taken"));
return;
Expand All @@ -101,7 +108,7 @@ public ActorFuture<PersistedSnapshot> persist() {
return;
}
try {
final var snapshot = snapshotStore.newSnapshot(metadata, directory);
snapshot = snapshotStore.newSnapshot(metadata, directory);
future.complete(snapshot);
} catch (final Exception e) {
future.completeExceptionally(e);
Expand All @@ -118,6 +125,7 @@ public SnapshotId snapshotId() {
private void abortInternal() {
try {
isValid = false;
snapshot = null;
LOGGER.debug("DELETE dir {}", directory);
FileUtil.deleteFolder(directory);
} catch (final IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ public void shouldAbortAndDeleteTransientSnapshot() {
}

@Test
public void shouldPurgePendingOnStore() throws Exception {
public void shouldPurgePendingOnStore() {
// given
final var index = 1L;
final var term = 0L;
Expand All @@ -153,7 +153,7 @@ public void shouldPurgePendingOnStore() throws Exception {
}

@Test
public void shouldNotDeletePersistedSnapshotOnPurge() throws Exception {
public void shouldNotDeletePersistedSnapshotOnPurge() {
// given
final var index = 1L;
final var term = 0L;
Expand Down Expand Up @@ -451,6 +451,34 @@ public void shouldNotPersistInvalidPendingSnapshot() {
.hasMessageContaining("Snapshot is not valid");
}

@Test
public void shouldPersistIdempotently() {
// given
final var transientSnapshot =
persistedSnapshotStore.newTransientSnapshot(1L, 2L, 3, 4).orElseThrow();
transientSnapshot.take(this::createSnapshotDir).join();
final var firstSnapshot = transientSnapshot.persist().join();
assertSnapshotWasMoved();

// when
final var secondSnapshot = transientSnapshot.persist().join();

// then
assertThat(firstSnapshot).isEqualTo(secondSnapshot);
assertSnapshotWasMoved();
}

private void assertSnapshotWasMoved() {
assertThat(pendingSnapshotsDir.toFile().listFiles()).isEmpty();
final var snapshotDirs = snapshotsDir.toFile().listFiles();
assertThat(snapshotDirs).isNotNull().hasSize(1);
final var snapshotDir = snapshotDirs[0];
assertThat(snapshotDir.listFiles())
.isNotNull()
.extracting(File::getName)
.containsExactlyInAnyOrder("file1.txt");
}

private boolean createSnapshotDir(final Path path) {
try {
FileUtil.ensureDirectoryExists(path);
Expand Down

0 comments on commit 49290da

Please sign in to comment.