Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reuse already created checksum collections on persist #14184

Merged
merged 5 commits into from
Sep 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,18 @@ public interface MutableChecksumsSFV extends ImmutableChecksumsSFV {
*/
void updateFromFile(final Path filePath) throws IOException;

/**
* Update the checksum collection, and add a new checksum from given bytes, likely a read file or
* soon to be written file.
*
* <p>Useful, if we want to avoid re-reading files etc.
*
* @param fileName the name of the file (which relates to the given bytes), that is used in the
* checksum collection in SFV file format
* @param bytes the bytes for which the checksum should be created
*/
void updateFromBytes(final String fileName, final byte[] bytes);

/**
* Build the checksum collection from a SFV format string array.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ public class FileBasedReceivedSnapshot implements ReceivedSnapshot {
private long expectedSnapshotChecksum;
private int expectedTotalCount;
private FileBasedSnapshotMetadata metadata;
private SfvChecksumImpl checksumCollection;

FileBasedReceivedSnapshot(
final FileBasedSnapshotId snapshotId,
Expand Down Expand Up @@ -115,6 +116,12 @@ private void applyInternal(final SnapshotChunk snapshotChunk) throws SnapshotWri
LOGGER.trace("Consume snapshot snapshotChunk {} of snapshot {}", chunkName, snapshotId);
writeReceivedSnapshotChunk(snapshotChunk, snapshotFile);

if (checksumCollection == null) {
checksumCollection = new SfvChecksumImpl();
}
checksumCollection.updateFromBytes(
snapshotFile.getFileName().toString(), snapshotChunk.getContent());

if (snapshotChunk.getChunkName().equals(FileBasedSnapshotStore.METADATA_FILE_NAME)) {
try {
collectMetadata(snapshotChunk.getContent());
Expand Down Expand Up @@ -269,6 +276,13 @@ private void persistInternal(final CompletableActorFuture<PersistedSnapshot> fut
return;
}

if (expectedSnapshotChecksum != checksumCollection.getCombinedValue()) {
future.completeExceptionally(
new InvalidSnapshotChecksum(
directory, expectedSnapshotChecksum, checksumCollection.getCombinedValue()));
return;
}

try {
if (metadata == null) {
// backward compatibility
Expand All @@ -280,8 +294,7 @@ private void persistInternal(final CompletableActorFuture<PersistedSnapshot> fut
Long.MAX_VALUE);
}
final PersistedSnapshot value =
snapshotStore.persistNewSnapshot(
snapshotId, directory, expectedSnapshotChecksum, metadata);
snapshotStore.persistNewSnapshot(snapshotId, directory, checksumCollection, metadata);
future.complete(value);
} catch (final Exception e) {
future.completeExceptionally(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
import io.camunda.zeebe.scheduler.future.ActorFuture;
import io.camunda.zeebe.scheduler.future.CompletableActorFuture;
import io.camunda.zeebe.snapshots.ConstructableSnapshotStore;
import io.camunda.zeebe.snapshots.MutableChecksumsSFV;
import io.camunda.zeebe.snapshots.ImmutableChecksumsSFV;
import io.camunda.zeebe.snapshots.PersistableSnapshot;
import io.camunda.zeebe.snapshots.PersistedSnapshot;
import io.camunda.zeebe.snapshots.PersistedSnapshotListener;
Expand Down Expand Up @@ -484,12 +484,10 @@ private boolean isCurrentSnapshotNewer(final FileBasedSnapshotId snapshotId) {
&& persistedSnapshot.getSnapshotId().compareTo(snapshotId) >= 0);
}

// TODO(npepinpe): using Either here would allow easy rollback regardless of when or where an
// exception is thrown, without having to catch and rollback for every possible case
FileBasedSnapshot persistNewSnapshot(
final FileBasedSnapshotId snapshotId,
final Path directory,
final long expectedChecksum,
final ImmutableChecksumsSFV immutableChecksumsSFV,
final FileBasedSnapshotMetadata metadata) {
final var currentPersistedSnapshot = currentPersistedSnapshotRef.get();

Expand All @@ -513,19 +511,8 @@ FileBasedSnapshot persistNewSnapshot(
moveToSnapshotDirectory(directory, destination);

final var checksumPath = buildSnapshotsChecksumPath(snapshotId);
final MutableChecksumsSFV actualChecksum;
try {
// computing the checksum on the final destination also lets us detect any failures during
// the
// copy/move that could occur
actualChecksum = SnapshotChecksum.calculate(destination);
if (actualChecksum.getCombinedValue() != expectedChecksum) {
rollbackPartialSnapshot(destination);
throw new InvalidSnapshotChecksum(
directory, expectedChecksum, actualChecksum.getCombinedValue());
}

SnapshotChecksum.persist(checksumPath, actualChecksum);
SnapshotChecksum.persist(checksumPath, immutableChecksumsSFV);
} catch (final IOException e) {
rollbackPartialSnapshot(destination);
throw new UncheckedIOException(e);
Expand All @@ -535,7 +522,7 @@ FileBasedSnapshot persistNewSnapshot(
new FileBasedSnapshot(
destination,
checksumPath,
actualChecksum.getCombinedValue(),
immutableChecksumsSFV.getCombinedValue(),
snapshotId,
metadata,
this::onSnapshotDeleted,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,9 +149,7 @@ private void persistInternal(final CompletableActorFuture<PersistedSnapshot> fut
snapshotId.getExportedPosition(),
lastFollowupEventPosition);
writeMetadataAndUpdateChecksum(metadata);
snapshot =
snapshotStore.persistNewSnapshot(
snapshotId, directory, checksum.getCombinedValue(), metadata);
snapshot = snapshotStore.persistNewSnapshot(snapshotId, directory, checksum, metadata);
future.complete(snapshot);
} catch (final Exception e) {
future.completeExceptionally(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,15 @@ public void updateFromFile(final Path filePath) throws IOException {
checksums.put(fileName, checksum.getValue());
}

@Override
public void updateFromBytes(final String fileName, final byte[] bytes) {
combinedChecksum.update(fileName.getBytes(UTF_8));
final Checksum checksum = new CRC32C();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔧 You can create it once for the whole class, and just call CRC32C#reset before every usage. Very minor optimization 🙃

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I keep it for now.

The CRC323C#reset :D

    public void reset() {
        this.crc = -1;
    }

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you look at how it works, this is the initial value 🙃 Then when you pass things it gets updated. So it is, in fact, resetting the state to the initial values ;)

checksum.update(bytes);
combinedChecksum.update(bytes);
checksums.put(fileName, checksum.getValue());
}

@Override
public void updateFromSfvFile(final String... lines) {
for (String line : lines) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ private static boolean isNotMetadataFile(final Path file) {

public static void persist(final Path checksumPath, final ImmutableChecksumsSFV checksum)
throws IOException {
try(final var stream = new FileOutputStream(checksumPath.toFile())) {
try (final var stream = new FileOutputStream(checksumPath.toFile())) {
Copy link
Member

@npepinpe npepinpe Sep 6, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ups 🙈 my bad

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah :D seems to be from the other PR :D

checksum.write(stream);
}
}
Expand Down