Skip to content

Commit

Permalink
merge: #14175
Browse files Browse the repository at this point in the history
14175: Refactor checksum collection writing and usage r=Zelldon a=Zelldon

## Description

The first part of PR:

Instead of constructing a byte array with byte output stream and writers, and then later writing again
to a stream, we now get an OutputStream in and write directly to the stream


Additionally PR contains #14184

- Support to update the checksum collection from bytes, which might be received or read from the file already. This is useful, if we want to avoid re-reading files etc.
- Support to persist snapshot with given checksum collection.
- collection of checksum is no longer recreated and directly written to the directory.



<!-- Please explain the changes you made here. -->

## Related issues

<!-- Which issues are closed by this PR or are related -->

related #14045 



Co-authored-by: Christopher Zell <zelldon91@googlemail.com>
Co-authored-by: Christopher Kujawa (Zell) <zelldon91@googlemail.com>
  • Loading branch information
zeebe-bors-camunda[bot] and Zelldon committed Sep 6, 2023
2 parents a6d01a6 + d74f357 commit 9efc1ec
Show file tree
Hide file tree
Showing 9 changed files with 85 additions and 65 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
package io.camunda.zeebe.snapshots;

import java.io.IOException;
import java.io.OutputStream;

/**
* Immutable checksum collection in simple file verification (SFV) file format, which only allows to
Expand All @@ -21,7 +22,9 @@ public interface ImmutableChecksumsSFV {
long getCombinedValue();

/**
* @return the serialized SFV file data
* Write the checksum collection in SFV format to the given output stream.
*
* @param stream in which the data will be written to
*/
byte[] serializeSfvFileData() throws IOException;
void write(OutputStream stream) throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,18 @@ public interface MutableChecksumsSFV extends ImmutableChecksumsSFV {
*/
void updateFromFile(final Path filePath) throws IOException;

/**
* Update the checksum collection, and add a new checksum from given bytes, likely a read file or
* soon to be written file.
*
* <p>Useful, if we want to avoid re-reading files etc.
*
* @param fileName the name of the file (which relates to the given bytes), that is used in the
* checksum collection in SFV file format
* @param bytes the bytes for which the checksum should be created
*/
void updateFromBytes(final String fileName, final byte[] bytes);

/**
* Build the checksum collection from a SFV format string array.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ public class FileBasedReceivedSnapshot implements ReceivedSnapshot {
private long expectedSnapshotChecksum;
private int expectedTotalCount;
private FileBasedSnapshotMetadata metadata;
private SfvChecksumImpl checksumCollection;

FileBasedReceivedSnapshot(
final FileBasedSnapshotId snapshotId,
Expand Down Expand Up @@ -115,6 +116,12 @@ private void applyInternal(final SnapshotChunk snapshotChunk) throws SnapshotWri
LOGGER.trace("Consume snapshot snapshotChunk {} of snapshot {}", chunkName, snapshotId);
writeReceivedSnapshotChunk(snapshotChunk, snapshotFile);

if (checksumCollection == null) {
checksumCollection = new SfvChecksumImpl();
}
checksumCollection.updateFromBytes(
snapshotFile.getFileName().toString(), snapshotChunk.getContent());

if (snapshotChunk.getChunkName().equals(FileBasedSnapshotStore.METADATA_FILE_NAME)) {
try {
collectMetadata(snapshotChunk.getContent());
Expand Down Expand Up @@ -269,6 +276,13 @@ private void persistInternal(final CompletableActorFuture<PersistedSnapshot> fut
return;
}

if (expectedSnapshotChecksum != checksumCollection.getCombinedValue()) {
future.completeExceptionally(
new InvalidSnapshotChecksum(
directory, expectedSnapshotChecksum, checksumCollection.getCombinedValue()));
return;
}

try {
if (metadata == null) {
// backward compatibility
Expand All @@ -280,8 +294,7 @@ private void persistInternal(final CompletableActorFuture<PersistedSnapshot> fut
Long.MAX_VALUE);
}
final PersistedSnapshot value =
snapshotStore.persistNewSnapshot(
snapshotId, directory, expectedSnapshotChecksum, metadata);
snapshotStore.persistNewSnapshot(snapshotId, directory, checksumCollection, metadata);
future.complete(value);
} catch (final Exception e) {
future.completeExceptionally(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
import io.camunda.zeebe.scheduler.future.ActorFuture;
import io.camunda.zeebe.scheduler.future.CompletableActorFuture;
import io.camunda.zeebe.snapshots.ConstructableSnapshotStore;
import io.camunda.zeebe.snapshots.MutableChecksumsSFV;
import io.camunda.zeebe.snapshots.ImmutableChecksumsSFV;
import io.camunda.zeebe.snapshots.PersistableSnapshot;
import io.camunda.zeebe.snapshots.PersistedSnapshot;
import io.camunda.zeebe.snapshots.PersistedSnapshotListener;
Expand Down Expand Up @@ -484,12 +484,10 @@ private boolean isCurrentSnapshotNewer(final FileBasedSnapshotId snapshotId) {
&& persistedSnapshot.getSnapshotId().compareTo(snapshotId) >= 0);
}

// TODO(npepinpe): using Either here would allow easy rollback regardless of when or where an
// exception is thrown, without having to catch and rollback for every possible case
FileBasedSnapshot persistNewSnapshot(
final FileBasedSnapshotId snapshotId,
final Path directory,
final long expectedChecksum,
final ImmutableChecksumsSFV immutableChecksumsSFV,
final FileBasedSnapshotMetadata metadata) {
final var currentPersistedSnapshot = currentPersistedSnapshotRef.get();

Expand All @@ -513,19 +511,8 @@ FileBasedSnapshot persistNewSnapshot(
moveToSnapshotDirectory(directory, destination);

final var checksumPath = buildSnapshotsChecksumPath(snapshotId);
final MutableChecksumsSFV actualChecksum;
try {
// computing the checksum on the final destination also lets us detect any failures during
// the
// copy/move that could occur
actualChecksum = SnapshotChecksum.calculate(destination);
if (actualChecksum.getCombinedValue() != expectedChecksum) {
rollbackPartialSnapshot(destination);
throw new InvalidSnapshotChecksum(
directory, expectedChecksum, actualChecksum.getCombinedValue());
}

SnapshotChecksum.persist(checksumPath, actualChecksum);
SnapshotChecksum.persist(checksumPath, immutableChecksumsSFV);
} catch (final IOException e) {
rollbackPartialSnapshot(destination);
throw new UncheckedIOException(e);
Expand All @@ -535,7 +522,7 @@ FileBasedSnapshot persistNewSnapshot(
new FileBasedSnapshot(
destination,
checksumPath,
actualChecksum.getCombinedValue(),
immutableChecksumsSFV.getCombinedValue(),
snapshotId,
metadata,
this::onSnapshotDeleted,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,9 +149,7 @@ private void persistInternal(final CompletableActorFuture<PersistedSnapshot> fut
snapshotId.getExportedPosition(),
lastFollowupEventPosition);
writeMetadataAndUpdateChecksum(metadata);
snapshot =
snapshotStore.persistNewSnapshot(
snapshotId, directory, checksum.getCombinedValue(), metadata);
snapshot = snapshotStore.persistNewSnapshot(snapshotId, directory, checksum, metadata);
future.complete(snapshot);
} catch (final Exception e) {
future.completeExceptionally(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,9 @@
import static java.nio.charset.StandardCharsets.UTF_8;

import io.camunda.zeebe.snapshots.MutableChecksumsSFV;
import java.io.BufferedWriter;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.io.OutputStream;
import java.io.PrintWriter;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.Path;
Expand All @@ -34,15 +33,22 @@
*/
final class SfvChecksumImpl implements MutableChecksumsSFV {

private static final String FILE_CRC_SEPARATOR = " ";
public static final String FORMAT_NUMBER_OF_FILES_LINE =
"; number of files used for combined value = %d\n";
private static final String SFV_HEADER =
"""
; This is an SFC checksum file for all files in the given directory.
; You might use cksfv or another tool to validate these files manually.
; This is an automatically created file - please do NOT modify.
""";
private static final String FORMAT_SNAPSHOT_DIRECTORY_LINE = "; snapshot directory = %s\n";
private static final String FORMAT_FILE_CRC_LINE = "%s %s\n";
private static final String FORMAT_COMBINED_VALUE_LINE = "; combinedValue = %s\n";
private static final String FILE_CRC_SEPARATOR_REGEX = " {3}";
private static final Pattern FILE_CRC_PATTERN =
Pattern.compile("(.*)" + FILE_CRC_SEPARATOR_REGEX + "([0-9a-fA-F]{1,16})");
private static final String COMBINED_VALUE_PREFIX = "; combinedValue = ";
private static final Pattern COMBINED_VALUE_PATTERN =
Pattern.compile(".*combinedValue\\s+=\\s+([0-9a-fA-F]{1,16})");
private static final String SNAPSHOT_DIRECTORY_PREFIX = "; snapshot directory = ";

private Checksum combinedChecksum;
private final SortedMap<String, Long> checksums = new TreeMap<>();
private String snapshotDirectoryComment;
Expand All @@ -66,33 +72,19 @@ public long getCombinedValue() {
}

@Override
public byte[] serializeSfvFileData() throws IOException {
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
final BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(baos, UTF_8));
writer.write("; This is an SFC checksum file for all files in the given directory.");
writer.newLine();
writer.write("; You might use cksfv or another tool to validate these files manually.");
writer.newLine();
writer.write("; This is an automatically created file - please do NOT modify.");
writer.newLine();
public void write(final OutputStream stream) throws IOException {
final var writer = new PrintWriter(stream);
writer.print(SFV_HEADER);
if (snapshotDirectoryComment != null) {
writer.write(SNAPSHOT_DIRECTORY_PREFIX);
writer.write(snapshotDirectoryComment);
writer.newLine();
writer.printf(FORMAT_SNAPSHOT_DIRECTORY_LINE, snapshotDirectoryComment);
}
writer.write(COMBINED_VALUE_PREFIX);
writer.write(Long.toHexString(combinedChecksum.getValue()));
writer.newLine();
writer.write("; number of files used for combined value = " + checksums.size());
writer.newLine();
writer.printf(FORMAT_COMBINED_VALUE_LINE, Long.toHexString(combinedChecksum.getValue()));
writer.printf(FORMAT_NUMBER_OF_FILES_LINE, checksums.size());

for (final Entry<String, Long> entry : checksums.entrySet()) {
writer.write(entry.getKey());
writer.write(FILE_CRC_SEPARATOR);
writer.write(Long.toHexString(entry.getValue()));
writer.newLine();
writer.printf(FORMAT_FILE_CRC_LINE, entry.getKey(), Long.toHexString(entry.getValue()));
}
writer.flush();
return baos.toByteArray();
}

public void setSnapshotDirectoryComment(final String headerComment) {
Expand Down Expand Up @@ -130,6 +122,15 @@ public void updateFromFile(final Path filePath) throws IOException {
checksums.put(fileName, checksum.getValue());
}

@Override
public void updateFromBytes(final String fileName, final byte[] bytes) {
combinedChecksum.update(fileName.getBytes(UTF_8));
final Checksum checksum = new CRC32C();
checksum.update(bytes);
combinedChecksum.update(bytes);
checksums.put(fileName, checksum.getValue());
}

@Override
public void updateFromSfvFile(final String... lines) {
for (String line : lines) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

import io.camunda.zeebe.snapshots.ImmutableChecksumsSFV;
import io.camunda.zeebe.snapshots.MutableChecksumsSFV;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.io.UncheckedIOException;
Expand Down Expand Up @@ -61,10 +62,8 @@ private static boolean isNotMetadataFile(final Path file) {

public static void persist(final Path checksumPath, final ImmutableChecksumsSFV checksum)
throws IOException {
try (final RandomAccessFile checksumFile = new RandomAccessFile(checksumPath.toFile(), "rwd")) {
final byte[] data = checksum.serializeSfvFileData();
checksumFile.write(data);
checksumFile.setLength(data.length);
try (final var stream = new FileOutputStream(checksumPath.toFile())) {
checksum.write(stream);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import org.junit.Before;
Expand Down Expand Up @@ -54,8 +55,10 @@ public void shouldReadAndWriteSameValues() throws IOException {
// given
final String[] givenSfvLines = {"; combinedValue = 12345678", "file1 aabbccdd"};
sfvChecksum.updateFromSfvFile(givenSfvLines);
final String serialized =
new String(sfvChecksum.serializeSfvFileData(), StandardCharsets.UTF_8);

final var arrayOutputStream = new ByteArrayOutputStream();
sfvChecksum.write(arrayOutputStream);
final String serialized = arrayOutputStream.toString(StandardCharsets.UTF_8);

// when
final String[] actualSfVlines = serialized.split(System.lineSeparator());
Expand All @@ -69,10 +72,11 @@ public void shouldReadAndWriteSameValues() throws IOException {
public void shouldWriteSnapshotDirectoryCommentIfPresent() throws IOException {
// given
sfvChecksum.setSnapshotDirectoryComment("/foo/bar");
final var arrayOutputStream = new ByteArrayOutputStream();
sfvChecksum.write(arrayOutputStream);

// when
final String serialized =
new String(sfvChecksum.serializeSfvFileData(), StandardCharsets.UTF_8);
final String serialized = arrayOutputStream.toString(StandardCharsets.UTF_8);

// then
assertThat(serialized).contains("; snapshot directory = /foo/bar");
Expand All @@ -81,10 +85,11 @@ public void shouldWriteSnapshotDirectoryCommentIfPresent() throws IOException {
@Test
public void shouldContainHumanReadableInstructions() throws IOException {
// given
final var arrayOutputStream = new ByteArrayOutputStream();
sfvChecksum.write(arrayOutputStream);

// when
final String serialized =
new String(sfvChecksum.serializeSfvFileData(), StandardCharsets.UTF_8);
final String serialized = arrayOutputStream.toString(StandardCharsets.UTF_8);

// then
assertThat(serialized)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import static org.assertj.core.api.Assertions.assertThat;

import io.camunda.zeebe.snapshots.ImmutableChecksumsSFV;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
Expand Down Expand Up @@ -183,10 +184,11 @@ public void shouldWriteTheNumberOfFiles() throws IOException {
createChunk(folder, "file2.txt");
createChunk(folder, "file3.txt");
final ImmutableChecksumsSFV sfvChecksum = SnapshotChecksum.calculate(folder);
final var arrayOutputStream = new ByteArrayOutputStream();
sfvChecksum.write(arrayOutputStream);

// when
final String serialized =
new String(sfvChecksum.serializeSfvFileData(), StandardCharsets.UTF_8);
final String serialized = arrayOutputStream.toString(StandardCharsets.UTF_8);

// then
assertThat(serialized).contains("; number of files used for combined value = 3");
Expand Down

0 comments on commit 9efc1ec

Please sign in to comment.