Skip to content

Commit

Permalink
merge: #13938
Browse files Browse the repository at this point in the history
13938: Speed up state controller recovery from existing snapshot r=npepinpe a=npepinpe

## Description

This PR greatly improves the state recovery time from an existing snapshot, especially when the state grows to be very large. This is done by leveraging RocksDB checkpoint capabilities, which create hard-links for the immutable SST files if the source and target directories are on the same filesystem. This has the additional benefit of reducing disk space usage in most cases.

> **Note**
> When the `runtime` and `snapshot` directories are not on the same filesystem, this will end up doing a normal copy, which is as fast as the previous solution.

## Related issues

closes #13775 



Co-authored-by: Nicolas Pepin-Perreault <nicolas.pepin-perreault@camunda.com>
  • Loading branch information
zeebe-bors-camunda[bot] and npepinpe committed Aug 21, 2023
2 parents e81a12f + 932af9f commit d4b846f
Show file tree
Hide file tree
Showing 8 changed files with 22 additions and 127 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package io.atomix.cluster;

import io.camunda.zeebe.scheduler.future.ActorFuture;
import io.camunda.zeebe.scheduler.future.CompletableActorFuture;
import io.camunda.zeebe.snapshots.PersistedSnapshot;
import io.camunda.zeebe.snapshots.PersistedSnapshotListener;
import io.camunda.zeebe.snapshots.ReceivableSnapshotStore;
Expand Down Expand Up @@ -78,12 +77,6 @@ public Path getPath() {
return null;
}

@Override
public ActorFuture<Void> copySnapshot(
final PersistedSnapshot snapshot, final Path targetDirectory) {
return CompletableActorFuture.completed(null);
}

@Override
public ReceivedSnapshot newReceivedSnapshot(final String snapshotId) {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,12 +112,6 @@ public Path getPath() {
return null;
}

@Override
public ActorFuture<Void> copySnapshot(
final PersistedSnapshot snapshot, final Path targetDirectory) {
return null;
}

@Override
public ReceivedSnapshot newReceivedSnapshot(final String snapshotId) {
final var newSnapshot = new InMemorySnapshot(this, snapshotId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,13 @@
import io.camunda.zeebe.broker.system.partitions.NoEntryAtSnapshotPosition;
import io.camunda.zeebe.broker.system.partitions.StateController;
import io.camunda.zeebe.db.ZeebeDb;
import io.camunda.zeebe.db.ZeebeDbException;
import io.camunda.zeebe.db.ZeebeDbFactory;
import io.camunda.zeebe.logstreams.impl.Loggers;
import io.camunda.zeebe.scheduler.ConcurrencyControl;
import io.camunda.zeebe.scheduler.future.ActorFuture;
import io.camunda.zeebe.snapshots.ConstructableSnapshotStore;
import io.camunda.zeebe.snapshots.PersistedSnapshot;
import io.camunda.zeebe.snapshots.SnapshotException.StateClosedException;
import io.camunda.zeebe.snapshots.TransientSnapshot;
import io.camunda.zeebe.util.FileUtil;
Expand All @@ -25,32 +27,30 @@
import org.slf4j.Logger;

/** Controls how snapshot/recovery operations are performed */
@SuppressWarnings("rawtypes")
public class StateControllerImpl implements StateController {

private static final Logger LOG = Loggers.SNAPSHOT_LOGGER;

private final Path runtimeDirectory;

@SuppressWarnings("rawtypes")
private final ZeebeDbFactory zeebeDbFactory;

@SuppressWarnings("rawtypes")
private final ToLongFunction<ZeebeDb> exporterPositionSupplier;

private final AtomixRecordEntrySupplier entrySupplier;

@SuppressWarnings("rawtypes")
private ZeebeDb db;

private final ConstructableSnapshotStore constructableSnapshotStore;
private final ConcurrencyControl concurrencyControl;

public StateControllerImpl(
@SuppressWarnings("rawtypes") final ZeebeDbFactory zeebeDbFactory,
final ZeebeDbFactory zeebeDbFactory,
final ConstructableSnapshotStore constructableSnapshotStore,
final Path runtimeDirectory,
final AtomixRecordEntrySupplier entrySupplier,
@SuppressWarnings("rawtypes") final ToLongFunction<ZeebeDb> exporterPositionSupplier,
final ToLongFunction<ZeebeDb> exporterPositionSupplier,
final ConcurrencyControl concurrencyControl) {
this.constructableSnapshotStore = constructableSnapshotStore;
this.runtimeDirectory = runtimeDirectory;
Expand Down Expand Up @@ -108,26 +108,22 @@ private void recoverInternal(final ActorFuture<ZeebeDb> future) {
"Failed to delete runtime folder. Cannot recover from snapshot.", e));
}

final var optLatestSnapshot = constructableSnapshotStore.getLatestSnapshot();
if (optLatestSnapshot.isPresent()) {
final var snapshot = optLatestSnapshot.get();
LOG.debug("Recovering state from available snapshot: {}", snapshot);
constructableSnapshotStore
.copySnapshot(snapshot, runtimeDirectory)
.onComplete(
(ok, error) -> {
if (error != null) {
future.completeExceptionally(
new RuntimeException(
String.format("Failed to recover from snapshot %s", snapshot.getId()),
error));
} else {
openDb(future);
}
});
} else {
// If there is no snapshot, open empty database
openDb(future);
constructableSnapshotStore
.getLatestSnapshot()
.ifPresent(snapshot -> recoverFromSnapshot(future, snapshot));
openDb(future);
}

private void recoverFromSnapshot(
final ActorFuture<ZeebeDb> future, final PersistedSnapshot snapshot) {
LOG.debug("Recovering state from available snapshot: {}", snapshot);

try (final var db = zeebeDbFactory.openSnapshotOnlyDb(snapshot.getPath().toFile())) {
db.createSnapshot(runtimeDirectory.toFile());
} catch (final Exception e) {
future.completeExceptionally(
new ZeebeDbException(
String.format("Failed to recover from snapshot %s", snapshot.getId()), e));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public class LargeStateControllerPerformanceTest {
private static final double SIZE_GB =
Double.parseDouble(
System.getenv().getOrDefault("LARGE_STATE_CONTROLLER_PERFORMANCE_TEST_SIZE_GB", "0.5"));
private static final Map<Double, Double> KNOWN_REFERENCE_SCORES = Map.of(0.5, 3.2, 4.0, 0.55);
private static final Map<Double, Double> KNOWN_REFERENCE_SCORES = Map.of(0.5, 10.0, 4.0, 9.0);

private TestState.TestContext context;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,8 @@
import java.util.concurrent.ThreadLocalRandom;
import org.agrona.CloseHelper;
import org.agrona.concurrent.UnsafeBuffer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

final class TestState {
private static final Logger LOGGER = LoggerFactory.getLogger(TestState.class);

private static final int BATCH_INSERT_SIZE = 10_000;
private static final int KEY_VALUE_SIZE = 8096;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,14 +83,4 @@ public interface PersistedSnapshotStore extends CloseableSilently {
ActorFuture<Void> delete();

Path getPath();

/**
* Copy snapshot to the given directory
*
* @param snapshot the snapshot to be copied
* @param targetDirectory the directory to which the snapshot files will be copied
* @return future which will be completed exceptionally if the snapshot was not copied
* successfully
*/
ActorFuture<Void> copySnapshot(PersistedSnapshot snapshot, Path targetDirectory);
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import io.camunda.zeebe.snapshots.TransientSnapshot;
import io.camunda.zeebe.util.Either;
import io.camunda.zeebe.util.FileUtil;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.file.AtomicMoveNotSupportedException;
Expand Down Expand Up @@ -345,35 +344,6 @@ public Path getPath() {
return snapshotsDirectory;
}

@Override
public ActorFuture<Void> copySnapshot(
final PersistedSnapshot snapshot, final Path targetDirectory) {
final CompletableActorFuture<Void> result = new CompletableActorFuture<>();
actor.run(
() -> {
if (!Files.exists(snapshot.getPath())) {
result.completeExceptionally(
String.format(
"Expected to copy snapshot %s to directory %s, but snapshot directory %s does not exists. Snapshot may have been deleted.",
snapshot.getId(), targetDirectory, snapshot.getPath()),
new FileNotFoundException());
} else {
try {
FileUtil.copySnapshot(snapshot.getPath(), targetDirectory);
result.complete(null);
} catch (final Exception e) {
result.completeExceptionally(
String.format(
"Failed to copy snapshot %s to directory %s.",
snapshot.getId(), targetDirectory),
e);
}
}
});

return result;
}

@Override
public FileBasedReceivedSnapshot newReceivedSnapshot(final String snapshotId) {
final var optSnapshotId = FileBasedSnapshotId.ofFileName(snapshotId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,8 @@
import io.camunda.zeebe.snapshots.TransientSnapshot;
import io.camunda.zeebe.test.util.asserts.DirectoryAssert;
import io.camunda.zeebe.util.FileUtil;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.file.FileAlreadyExistsException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
Expand Down Expand Up @@ -242,49 +240,6 @@ public void shouldPurgePendingSnapshots() {
assertThat(pendingSnapshotsDir).isEmptyDirectory();
}

@Test
public void shouldCopySnapshot() {
// given
final var persistedSnapshot = (FileBasedSnapshot) takeTransientSnapshot().persist().join();
final var target = rootDirectory.resolve("runtime");

// when
snapshotStore.copySnapshot(persistedSnapshot, target).join();

// then
assertThat(target).isNotEmptyDirectory();
assertThat(target.toFile().list())
.containsExactlyInAnyOrder(persistedSnapshot.getDirectory().toFile().list());
}

@Test
public void shouldCompleteWithExceptionWhenCannotCopySnapshot() throws IOException {
// given
final var persistedSnapshot = (FileBasedSnapshot) takeTransientSnapshot().persist().join();
final var target = rootDirectory.resolve("runtime");
target.toFile().createNewFile();

// when
final var result = snapshotStore.copySnapshot(persistedSnapshot, target);

// then - should fail because targetDirectory already exists
assertThatThrownBy(result::join).hasCauseInstanceOf(FileAlreadyExistsException.class);
}

@Test
public void shouldCompleteWithExceptionWhenCopyingIfSnapshotDoesNotExists() {
// given
final var persistedSnapshot = (FileBasedSnapshot) takeTransientSnapshot().persist().join();
final var target = rootDirectory.resolve("runtime");

// when
persistedSnapshot.delete();
final var result = snapshotStore.copySnapshot(persistedSnapshot, target);

// then
assertThatThrownBy(result::join).hasCauseInstanceOf(FileNotFoundException.class);
}

@Test
public void shouldDeleteSnapshot() {
// given
Expand Down

0 comments on commit d4b846f

Please sign in to comment.