Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Persist snapshot metadata #10121

Merged
merged 20 commits into from
Aug 22, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
c76d433
refactor(snapshot): rename getMetadata to getSnapshotId
deepthidevaki Aug 18, 2022
960c5c0
refactor(snapshot): rename FileBasedSnapshotMetadata to FileBasedSnap…
deepthidevaki Aug 18, 2022
b2b82a9
refactor(snapshot): add new SnapshotMetadata to PersistedSnapshot
deepthidevaki Aug 18, 2022
a01eb95
refactor(snapshots): allow specifying lastWrittenPostion before persi…
deepthidevaki Aug 18, 2022
acdcae9
feat(snapshots): persist metadata as a file in the snapshot
deepthidevaki Aug 18, 2022
021e0e7
feat(snapshots): metadata is read when loading snapshot
deepthidevaki Aug 18, 2022
01ce37c
refactor(snapshot): static metadata file name
deepthidevaki Aug 18, 2022
8b01094
test(snapshots): verify metadata is persisted
deepthidevaki Aug 18, 2022
292f83d
feat(snapshots): allow sending metadata in replicated snapshot
deepthidevaki Aug 18, 2022
23878d4
test(snapshots): merge SnapshotChunkReaderTest to FileBasedSnapshotCh…
deepthidevaki Aug 19, 2022
be38e07
feat(snapshots): when calculating checksum the metadata file should b…
deepthidevaki Aug 19, 2022
977cfda
test(snapshots): fix assertion
deepthidevaki Aug 19, 2022
379390e
refactor(snapshots): implement toString in SfvChecksum
deepthidevaki Aug 19, 2022
5cbefec
refactor(snapshots): fix typos and toString
deepthidevaki Aug 22, 2022
5636691
refactor(snapshots): rename old metadata with snapshotId
deepthidevaki Aug 22, 2022
065524b
refactor(snapshots): document the new interface methods
deepthidevaki Aug 22, 2022
b4a7a65
refactor(snapshots): use filechannel to write metadata
deepthidevaki Aug 22, 2022
3c7aeeb
test(snapshots): fix typo
deepthidevaki Aug 22, 2022
6064efd
refactor(snapshots): improve documentation
deepthidevaki Aug 22, 2022
1c6d09d
refactor(snapshot): use specific exception
deepthidevaki Aug 22, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.camunda.zeebe.snapshots.SnapshotChunk;
import io.camunda.zeebe.snapshots.SnapshotChunkReader;
import io.camunda.zeebe.snapshots.SnapshotId;
import io.camunda.zeebe.snapshots.SnapshotMetadata;
import io.camunda.zeebe.snapshots.SnapshotReservation;
import io.camunda.zeebe.util.StringUtil;
import io.camunda.zeebe.util.buffer.BufferUtil;
Expand Down Expand Up @@ -149,6 +150,11 @@ public long getChecksum() {
return 0;
}

@Override
public SnapshotMetadata getMetadata() {
return null;
}

@Override
public ActorFuture<SnapshotReservation> reserve() {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import io.camunda.zeebe.scheduler.Actor;
import io.camunda.zeebe.scheduler.future.ActorFuture;
import io.camunda.zeebe.snapshots.PersistedSnapshot;
import io.camunda.zeebe.snapshots.impl.FileBasedSnapshotMetadata;
import io.camunda.zeebe.snapshots.impl.FileBasedSnapshotId;
import io.camunda.zeebe.streamprocessor.StreamProcessor;
import java.util.Collections;
import java.util.List;
Expand Down Expand Up @@ -158,8 +158,8 @@ private void getFollowerPartitionStatus(
final var snapshotId = getSnapshotId(partition);
final var processedPositionInSnapshot =
snapshotId
.flatMap(FileBasedSnapshotMetadata::ofFileName)
.map(FileBasedSnapshotMetadata::getProcessedPosition)
.flatMap(FileBasedSnapshotId::ofFileName)
.map(FileBasedSnapshotId::getProcessedPosition)
.orElse(null);
final var status =
PartitionStatus.ofFollower(snapshotId.orElse(null), processedPositionInSnapshot);
Expand All @@ -179,8 +179,8 @@ private void getLeaderPartitionStatus(
final var snapshotId = getSnapshotId(partition);
final var processedPositionInSnapshot =
snapshotId
.flatMap(FileBasedSnapshotMetadata::ofFileName)
.map(FileBasedSnapshotMetadata::getProcessedPosition)
.flatMap(FileBasedSnapshotId::ofFileName)
.map(FileBasedSnapshotId::getProcessedPosition)
.orElse(null);

actor.runOnCompletion(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,8 @@ private void persistSnapshotIfLastWrittenPositionCommitted() {
commitPosition,
lastWrittenPosition,
pendingSnapshot);
final var snapshotPersistFuture = pendingSnapshot.persist();
final var snapshotPersistFuture =
pendingSnapshot.withLastFollowupEventPosition(lastWrittenPosition).persist();

snapshotPersistFuture.onComplete(
(snapshot, persistError) -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import io.camunda.zeebe.client.ZeebeClient;
import io.camunda.zeebe.client.ZeebeClientBuilder;
import io.camunda.zeebe.snapshots.SnapshotId;
import io.camunda.zeebe.snapshots.impl.FileBasedSnapshotMetadata;
import io.camunda.zeebe.snapshots.impl.FileBasedSnapshotId;
import io.netty.util.NetUtil;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;
Expand Down Expand Up @@ -100,6 +100,6 @@ private SnapshotId waitForSnapshotAtBroker(
.getProcessedPositionInSnapshot())
.isNotNull());
final PartitionStatus partitionStatus = brokerAdminService.getPartitionStatus().get(1);
return FileBasedSnapshotMetadata.ofFileName(partitionStatus.getSnapshotId()).get();
return FileBasedSnapshotId.ofFileName(partitionStatus.getSnapshotId()).get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import io.camunda.zeebe.scheduler.testing.ActorSchedulerRule;
import io.camunda.zeebe.snapshots.PersistedSnapshot;
import io.camunda.zeebe.snapshots.SnapshotException.StateClosedException;
import io.camunda.zeebe.snapshots.impl.FileBasedSnapshotMetadata;
import io.camunda.zeebe.snapshots.impl.FileBasedSnapshotId;
import io.camunda.zeebe.snapshots.impl.FileBasedSnapshotStore;
import io.camunda.zeebe.snapshots.impl.SnapshotMetrics;
import io.camunda.zeebe.test.util.AutoCloseableRule;
Expand Down Expand Up @@ -193,9 +193,8 @@ public void shouldTakeSnapshotWhenExporterPositionNotChanged() {
.extracting(PersistedSnapshot::getCompactionBound)
.isEqualTo(firstSnapshot.getCompactionBound());
assertThat(snapshot.getId()).isNotEqualTo(firstSnapshot.getId());
final var newSnapshotId = FileBasedSnapshotMetadata.ofFileName(snapshot.getId()).orElseThrow();
final var firstSnapshotId =
FileBasedSnapshotMetadata.ofFileName(firstSnapshot.getId()).orElseThrow();
final var newSnapshotId = FileBasedSnapshotId.ofFileName(snapshot.getId()).orElseThrow();
final var firstSnapshotId = FileBasedSnapshotId.ofFileName(firstSnapshot.getId()).orElseThrow();
assertThat(firstSnapshotId).isLessThan(newSnapshotId);
}

Expand All @@ -218,9 +217,8 @@ public void shouldTakeSnapshotWhenProcessorPositionNotChanged() {
.extracting(PersistedSnapshot::getCompactionBound)
.isEqualTo(firstSnapshot.getCompactionBound());
assertThat(snapshot.getId()).isNotEqualTo(firstSnapshot.getId());
final var newSnapshotId = FileBasedSnapshotMetadata.ofFileName(snapshot.getId()).orElseThrow();
final var firstSnapshotId =
FileBasedSnapshotMetadata.ofFileName(firstSnapshot.getId()).orElseThrow();
final var newSnapshotId = FileBasedSnapshotId.ofFileName(snapshot.getId()).orElseThrow();
final var firstSnapshotId = FileBasedSnapshotId.ofFileName(firstSnapshot.getId()).orElseThrow();
assertThat(firstSnapshotId).isLessThan(newSnapshotId);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@
import io.camunda.zeebe.scheduler.future.ActorFuture;
import io.camunda.zeebe.scheduler.future.CompletableActorFuture;
import io.camunda.zeebe.snapshots.SnapshotId;
import io.camunda.zeebe.snapshots.impl.FileBasedSnapshotMetadata;
import io.camunda.zeebe.snapshots.impl.FileBasedSnapshotId;
import io.camunda.zeebe.test.util.AutoCloseableRule;
import io.camunda.zeebe.test.util.asserts.TopologyAssert;
import io.camunda.zeebe.test.util.record.RecordingExporterTestWatcher;
Expand Down Expand Up @@ -813,7 +813,7 @@ private Optional<SnapshotId> getSnapshot(final Broker broker, final int partitio

return Optional.ofNullable(partitionStatus)
.map(PartitionStatus::getSnapshotId)
.flatMap(FileBasedSnapshotMetadata::ofFileName);
.flatMap(FileBasedSnapshotId::ofFileName);
}

public Optional<SnapshotId> getSnapshot(final int brokerId) {
Expand Down
10 changes: 10 additions & 0 deletions snapshot/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,16 @@
<artifactId>slf4j-api</artifactId>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
</dependency>

<dependency>
<groupId>io.camunda</groupId>
<artifactId>zeebe-scheduler</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,12 @@ public interface PersistedSnapshot extends CloseableSilently {
*/
long getChecksum();

/**
* SnapshotMetadata includes information related to a snapshot.
*
* @return the metadata of the snapshot.
*/
SnapshotMetadata getMetadata();
/**
* Reserves this snapshot. When the snapshot is reserved, it is not deleted until it is released.
* The reservation status is not persisted. After a restart the snapshot will be in state
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH under
* one or more contributor license agreements. See the NOTICE file distributed
* with this work for additional information regarding copyright ownership.
* Licensed under the Zeebe Community License 1.1. You may not use this file
* except in compliance with the Zeebe Community License 1.1.
*/
package io.camunda.zeebe.snapshots;

public interface SnapshotMetadata {
deepthidevaki marked this conversation as resolved.
Show resolved Hide resolved

/**
* @return version of the snapshot
*/
int version();

/**
* @return processed position in the snapshot (same as in SnapshotId)
*/
long processedPosition();

/**
* @return exported position in the snapshot (same as in SnapshotId)
*/
long exportedPosition();

/**
* A snapshot is only valid if the logstream consists of the events from the processedPosition up
* to the followup event position.
*
* @return position of the last followUpEvent that must be in the logstream to ensure that the
* system can recover from the snapshot and the logstream.
*/
long lastFollowupEventPosition();
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,15 @@ public interface TransientSnapshot extends PersistableSnapshot {
* @return true on success, false otherwise
*/
ActorFuture<Void> take(Consumer<Path> takeSnapshot);

/**
* A snapshot is only valid if the accompanying logstream has events from processedPosition up to
* the last followup event position. The last followUp event position is the position of an event
* whose source position >= actual processed position in the state.
*
* @param followupEventPosition position of the followup event which must be in the logstream to
* ensure that the system can recover from the snapshot and the events in the logstream.
* @return transient snapshot.
*/
TransientSnapshot withLastFollowupEventPosition(long followupEventPosition);
deepthidevaki marked this conversation as resolved.
Show resolved Hide resolved
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,16 +35,17 @@ public class FileBasedReceivedSnapshot implements ReceivedSnapshot {
private final ActorControl actor;
private final FileBasedSnapshotStore snapshotStore;

private final FileBasedSnapshotMetadata metadata;
private final FileBasedSnapshotId snapshotId;
private long expectedSnapshotChecksum;
private int expectedTotalCount;
private FileBasedSnapshotMetadata metadata;

FileBasedReceivedSnapshot(
final FileBasedSnapshotMetadata metadata,
final FileBasedSnapshotId snapshotId,
final Path directory,
final FileBasedSnapshotStore snapshotStore,
final ActorControl actor) {
this.metadata = metadata;
this.snapshotId = snapshotId;
this.snapshotStore = snapshotStore;
this.directory = directory;
this.actor = actor;
Expand All @@ -54,7 +55,7 @@ public class FileBasedReceivedSnapshot implements ReceivedSnapshot {

@Override
public long index() {
return metadata.getIndex();
return snapshotId.getIndex();
}

@Override
Expand Down Expand Up @@ -113,6 +114,18 @@ private void applyInternal(final SnapshotChunk snapshotChunk) throws SnapshotWri

LOGGER.trace("Consume snapshot snapshotChunk {} of snapshot {}", chunkName, snapshotId);
writeReceivedSnapshotChunk(snapshotChunk, snapshotFile);

if (snapshotChunk.getChunkName().equals(FileBasedSnapshotStore.METADATA_FILE_NAME)) {
try {
collectMetadata(snapshotChunk.getContent());
} catch (final IOException e) {
throw new SnapshotWriteException("Cannot decode snapshot metadata");
}
}
}

private void collectMetadata(final byte[] content) throws IOException {
metadata = FileBasedSnapshotMetadata.decode(content);
}

private void checkChunkChecksumIsValid(
Expand Down Expand Up @@ -157,18 +170,18 @@ private void checkTotalCountIsValid(final int currentTotalCount) throws Snapshot
}

private void checkSnapshotIdIsValid(final String snapshotId) throws SnapshotWriteException {
final var receivedSnapshotId = FileBasedSnapshotMetadata.ofFileName(snapshotId);
final var receivedSnapshotId = FileBasedSnapshotId.ofFileName(snapshotId);
if (receivedSnapshotId.isEmpty()) {
throw new SnapshotWriteException(
String.format("Snapshot file name '%s' has unexpected format", snapshotId));
}

final FileBasedSnapshotMetadata chunkMetadata = receivedSnapshotId.get();
if (metadata.compareTo(chunkMetadata) != 0) {
final FileBasedSnapshotId chunkSnapshotId = receivedSnapshotId.get();
if (this.snapshotId.compareTo(chunkSnapshotId) != 0) {
throw new SnapshotWriteException(
String.format(
"Expected snapshot chunk metadata to match metadata '%s' but was '%s' instead",
metadata, chunkMetadata));
"Expected snapshot id in chunk to be '%s' but was '%s' instead",
this.snapshotId, chunkSnapshotId));
}
}

Expand Down Expand Up @@ -213,7 +226,7 @@ public ActorFuture<PersistedSnapshot> persist() {

@Override
public SnapshotId snapshotId() {
return metadata;
return snapshotId;
}

@Override
Expand All @@ -233,7 +246,7 @@ private void abortInternal() {
}

private void persistInternal(final CompletableActorFuture<PersistedSnapshot> future) {
if (snapshotStore.hasSnapshotId(metadata.getSnapshotIdAsString())) {
if (snapshotStore.hasSnapshotId(snapshotId.getSnapshotIdAsString())) {
abortInternal();
future.complete(snapshotStore.getLatestSnapshot().orElseThrow());
return;
Expand All @@ -257,8 +270,17 @@ private void persistInternal(final CompletableActorFuture<PersistedSnapshot> fut
}

try {
if (metadata == null) {
// backward compatibility
metadata =
new FileBasedSnapshotMetadata(
FileBasedSnapshotStore.VERSION,
snapshotId.getProcessedPosition(),
snapshotId.getExportedPosition(),
Long.MAX_VALUE);
}
final PersistedSnapshot value =
snapshotStore.newSnapshot(metadata, directory, expectedSnapshotChecksum);
snapshotStore.newSnapshot(snapshotId, directory, expectedSnapshotChecksum, metadata);
future.complete(value);
} catch (final Exception e) {
future.completeExceptionally(e);
Expand All @@ -275,7 +297,7 @@ public String toString() {
+ ", snapshotStore="
+ snapshotStore.getName()
+ ", metadata="
+ metadata
+ snapshotId
+ '}';
}
}
Loading