Skip to content

Commit

Permalink
Merge pull request #14184 from camunda/ck-reuse-sfv-checksum
Browse files Browse the repository at this point in the history
Reuse already created checksum collections on persist
  • Loading branch information
Zelldon committed Sep 6, 2023
2 parents d2a5a64 + 0638271 commit d74f357
Show file tree
Hide file tree
Showing 6 changed files with 42 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,18 @@ public interface MutableChecksumsSFV extends ImmutableChecksumsSFV {
*/
void updateFromFile(final Path filePath) throws IOException;

/**
* Update the checksum collection, and add a new checksum from given bytes, likely a read file or
* soon to be written file.
*
* <p>Useful, if we want to avoid re-reading files etc.
*
* @param fileName the name of the file (which relates to the given bytes), that is used in the
* checksum collection in SFV file format
* @param bytes the bytes for which the checksum should be created
*/
void updateFromBytes(final String fileName, final byte[] bytes);

/**
* Build the checksum collection from a SFV format string array.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ public class FileBasedReceivedSnapshot implements ReceivedSnapshot {
private long expectedSnapshotChecksum;
private int expectedTotalCount;
private FileBasedSnapshotMetadata metadata;
private SfvChecksumImpl checksumCollection;

FileBasedReceivedSnapshot(
final FileBasedSnapshotId snapshotId,
Expand Down Expand Up @@ -115,6 +116,12 @@ private void applyInternal(final SnapshotChunk snapshotChunk) throws SnapshotWri
LOGGER.trace("Consume snapshot snapshotChunk {} of snapshot {}", chunkName, snapshotId);
writeReceivedSnapshotChunk(snapshotChunk, snapshotFile);

if (checksumCollection == null) {
checksumCollection = new SfvChecksumImpl();
}
checksumCollection.updateFromBytes(
snapshotFile.getFileName().toString(), snapshotChunk.getContent());

if (snapshotChunk.getChunkName().equals(FileBasedSnapshotStore.METADATA_FILE_NAME)) {
try {
collectMetadata(snapshotChunk.getContent());
Expand Down Expand Up @@ -269,6 +276,13 @@ private void persistInternal(final CompletableActorFuture<PersistedSnapshot> fut
return;
}

if (expectedSnapshotChecksum != checksumCollection.getCombinedValue()) {
future.completeExceptionally(
new InvalidSnapshotChecksum(
directory, expectedSnapshotChecksum, checksumCollection.getCombinedValue()));
return;
}

try {
if (metadata == null) {
// backward compatibility
Expand All @@ -280,8 +294,7 @@ private void persistInternal(final CompletableActorFuture<PersistedSnapshot> fut
Long.MAX_VALUE);
}
final PersistedSnapshot value =
snapshotStore.persistNewSnapshot(
snapshotId, directory, expectedSnapshotChecksum, metadata);
snapshotStore.persistNewSnapshot(snapshotId, directory, checksumCollection, metadata);
future.complete(value);
} catch (final Exception e) {
future.completeExceptionally(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
import io.camunda.zeebe.scheduler.future.ActorFuture;
import io.camunda.zeebe.scheduler.future.CompletableActorFuture;
import io.camunda.zeebe.snapshots.ConstructableSnapshotStore;
import io.camunda.zeebe.snapshots.MutableChecksumsSFV;
import io.camunda.zeebe.snapshots.ImmutableChecksumsSFV;
import io.camunda.zeebe.snapshots.PersistableSnapshot;
import io.camunda.zeebe.snapshots.PersistedSnapshot;
import io.camunda.zeebe.snapshots.PersistedSnapshotListener;
Expand Down Expand Up @@ -484,12 +484,10 @@ private boolean isCurrentSnapshotNewer(final FileBasedSnapshotId snapshotId) {
&& persistedSnapshot.getSnapshotId().compareTo(snapshotId) >= 0);
}

// TODO(npepinpe): using Either here would allow easy rollback regardless of when or where an
// exception is thrown, without having to catch and rollback for every possible case
FileBasedSnapshot persistNewSnapshot(
final FileBasedSnapshotId snapshotId,
final Path directory,
final long expectedChecksum,
final ImmutableChecksumsSFV immutableChecksumsSFV,
final FileBasedSnapshotMetadata metadata) {
final var currentPersistedSnapshot = currentPersistedSnapshotRef.get();

Expand All @@ -513,19 +511,8 @@ FileBasedSnapshot persistNewSnapshot(
moveToSnapshotDirectory(directory, destination);

final var checksumPath = buildSnapshotsChecksumPath(snapshotId);
final MutableChecksumsSFV actualChecksum;
try {
// computing the checksum on the final destination also lets us detect any failures during
// the
// copy/move that could occur
actualChecksum = SnapshotChecksum.calculate(destination);
if (actualChecksum.getCombinedValue() != expectedChecksum) {
rollbackPartialSnapshot(destination);
throw new InvalidSnapshotChecksum(
directory, expectedChecksum, actualChecksum.getCombinedValue());
}

SnapshotChecksum.persist(checksumPath, actualChecksum);
SnapshotChecksum.persist(checksumPath, immutableChecksumsSFV);
} catch (final IOException e) {
rollbackPartialSnapshot(destination);
throw new UncheckedIOException(e);
Expand All @@ -535,7 +522,7 @@ FileBasedSnapshot persistNewSnapshot(
new FileBasedSnapshot(
destination,
checksumPath,
actualChecksum.getCombinedValue(),
immutableChecksumsSFV.getCombinedValue(),
snapshotId,
metadata,
this::onSnapshotDeleted,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,9 +149,7 @@ private void persistInternal(final CompletableActorFuture<PersistedSnapshot> fut
snapshotId.getExportedPosition(),
lastFollowupEventPosition);
writeMetadataAndUpdateChecksum(metadata);
snapshot =
snapshotStore.persistNewSnapshot(
snapshotId, directory, checksum.getCombinedValue(), metadata);
snapshot = snapshotStore.persistNewSnapshot(snapshotId, directory, checksum, metadata);
future.complete(snapshot);
} catch (final Exception e) {
future.completeExceptionally(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,15 @@ public void updateFromFile(final Path filePath) throws IOException {
checksums.put(fileName, checksum.getValue());
}

@Override
public void updateFromBytes(final String fileName, final byte[] bytes) {
combinedChecksum.update(fileName.getBytes(UTF_8));
final Checksum checksum = new CRC32C();
checksum.update(bytes);
combinedChecksum.update(bytes);
checksums.put(fileName, checksum.getValue());
}

@Override
public void updateFromSfvFile(final String... lines) {
for (String line : lines) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ private static boolean isNotMetadataFile(final Path file) {

public static void persist(final Path checksumPath, final ImmutableChecksumsSFV checksum)
throws IOException {
try(final var stream = new FileOutputStream(checksumPath.toFile())) {
try (final var stream = new FileOutputStream(checksumPath.toFile())) {
checksum.write(stream);
}
}
Expand Down

0 comments on commit d74f357

Please sign in to comment.