Skip to content

Commit

Permalink
refactor: decouple original size from chunking
Browse files Browse the repository at this point in the history
Chunking is defined by original chunk size, not original content size.
This PR removed the scenario where these two sizes are conflated (e.g. passing original size zero to disable chunking, etc.).
  • Loading branch information
jeqo committed Apr 11, 2024
1 parent cdffc7f commit 5726bc9
Show file tree
Hide file tree
Showing 5 changed files with 81 additions and 65 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -457,23 +457,26 @@ InputStream transformIndex(final IndexType indexType,
);
}
final var transformFinisher = new TransformFinisher(transformEnum, size);
// Getting next element and expecting that it is the only one.
// No need to get a sequenced input stream
final var inputStream = transformFinisher.nextElement();
segmentIndexBuilder.add(indexType, singleChunk(transformFinisher.chunkIndex()).range().size());
final var chunkIndex = transformFinisher.chunkIndex();
// by getting a chunk index, means that the transformation is completed.
if (chunkIndex == null) {
throw new IllegalStateException("Chunking disabled when single chunk is expected");
}
if (chunkIndex.chunks().size() != 1) {
// not expected, as next element run once. But for safety
throw new IllegalStateException("Number of chunks different than 1, single chunk is expected");
}
segmentIndexBuilder.add(indexType, chunkIndex.chunks().get(0).range().size());
return inputStream;
} else {
segmentIndexBuilder.add(indexType, 0);
return InputStream.nullInputStream();
}
}

private Chunk singleChunk(final ChunkIndex chunkIndex) {
final var chunks = chunkIndex.chunks();
if (chunks.size() != 1) {
throw new IllegalStateException("Single chunk expected when transforming indexes");
}
return chunks.get(0);
}

void uploadManifest(final RemoteLogSegmentMetadata remoteLogSegmentMetadata,
final ChunkIndex chunkIndex,
final SegmentIndexesV1 segmentIndexes,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@
* both transformed and not. We rely on this information for determining the transformed chunks borders
* in the input stream. We also can tell if the input stream has too few bytes.
*
* <p>An empty list of chunks means no chunking has been applied to the incoming stream.
* <p>An empty list of chunks means no chunking has been applied to the incoming stream
* and all content should be returned at once.
*/
public class BaseDetransformChunkEnumeration implements DetransformChunkEnumeration {
private final InputStream inputStream;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,10 @@ public class BaseTransformChunkEnumeration implements TransformChunkEnumeration

private byte[] chunk = null;

public BaseTransformChunkEnumeration(final InputStream inputStream) {
this.inputStream = Objects.requireNonNull(inputStream, "inputStream cannot be null");

this.originalChunkSize = 0;
}

/**
* @param inputStream original content
* @param originalChunkSize chunk size from the <b>original</b> content. If zero, it disables chunking.
*/
public BaseTransformChunkEnumeration(final InputStream inputStream,
final int originalChunkSize) {
this.inputStream = Objects.requireNonNull(inputStream, "inputStream cannot be null");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,43 +27,53 @@
import io.aiven.kafka.tieredstorage.manifest.index.FixedSizeChunkIndexBuilder;
import io.aiven.kafka.tieredstorage.manifest.index.VariableSizeChunkIndexBuilder;

// TODO test transforms and detransforms with property-based tests

/**
* The transformation finisher.
*
* <p>It converts enumeration of {@code byte[]} into enumeration of {@link InputStream},
* so that it could be used in {@link SequenceInputStream}.
*
* <p>It's responsible for building the chunk index.
* The chunk index is empty (i.e. null) if chunking has been disabled (i.e. chunk size is zero),
* but could also have a single chunk if the chunk size is equal or higher to the original file size.
* Otherwise, the chunk index will contain more than one chunk.
*/
public class TransformFinisher implements Enumeration<InputStream> {
private final TransformChunkEnumeration inner;
private final AbstractChunkIndexBuilder chunkIndexBuilder;
private final int originalFileSize;
private ChunkIndex chunkIndex = null;

public TransformFinisher(final TransformChunkEnumeration inner) {
this(inner, 0);
}

public TransformFinisher(final TransformChunkEnumeration inner, final int originalFileSize) {
public TransformFinisher(
final TransformChunkEnumeration inner,
final int originalFileSize
) {
this.inner = Objects.requireNonNull(inner, "inner cannot be null");
this.originalFileSize = originalFileSize;

if (originalFileSize < 0) {
throw new IllegalArgumentException(
"originalFileSize must be non-negative, " + originalFileSize + " given");
}

this.chunkIndexBuilder = chunkIndexBuilder(inner, inner.originalChunkSize(), originalFileSize);
}

private static AbstractChunkIndexBuilder chunkIndexBuilder(
final TransformChunkEnumeration inner,
final int originalChunkSize,
final int originalFileSize
) {
final Integer transformedChunkSize = inner.transformedChunkSize();
if (originalFileSize == 0) {
this.chunkIndexBuilder = null;
} else if (transformedChunkSize == null) {
this.chunkIndexBuilder = new VariableSizeChunkIndexBuilder(inner.originalChunkSize(), originalFileSize);
if (transformedChunkSize == null) {
return new VariableSizeChunkIndexBuilder(
originalChunkSize,
originalFileSize
);
} else {
this.chunkIndexBuilder = new FixedSizeChunkIndexBuilder(
inner.originalChunkSize(), originalFileSize, transformedChunkSize);
return new FixedSizeChunkIndexBuilder(
originalChunkSize,
originalFileSize,
transformedChunkSize
);
}
}

Expand All @@ -75,7 +85,7 @@ public boolean hasMoreElements() {
@Override
public InputStream nextElement() {
final var chunk = inner.nextElement();
if (chunkIndexBuilder != null) {
if (inner.originalChunkSize() > 0) {
if (hasMoreElements()) {
this.chunkIndexBuilder.addChunk(chunk.length);
} else {
Expand All @@ -87,7 +97,7 @@ public InputStream nextElement() {
}

public ChunkIndex chunkIndex() {
if (chunkIndex == null && originalFileSize > 0) {
if (chunkIndex == null && inner.originalChunkSize() > 0) {
throw new IllegalStateException("Chunk index was not built, was finisher used?");
}
return this.chunkIndex;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,39 +70,43 @@ void compressionAndEncryption(final int chunkSize) throws IOException {

private void test(final int chunkSize, final boolean compression, final boolean encryption) throws IOException {
// Transform.
TransformChunkEnumeration transformEnum = new BaseTransformChunkEnumeration(
new ByteArrayInputStream(original), chunkSize);
if (compression) {
transformEnum = new CompressionChunkEnumeration(transformEnum);
}
if (encryption) {
transformEnum = new EncryptionChunkEnumeration(transformEnum, AesKeyAwareTest::encryptionCipherSupplier);
}
final var transformFinisher = chunkSize == 0
? new TransformFinisher(transformEnum)
: new TransformFinisher(transformEnum, ORIGINAL_SIZE);
final byte[] uploadedData;
final ChunkIndex chunkIndex;
try (final var sis = transformFinisher.toInputStream()) {
uploadedData = sis.readAllBytes();
chunkIndex = transformFinisher.chunkIndex();
}
try (final var inputStream = new ByteArrayInputStream(original)) {
TransformChunkEnumeration transformEnum = new BaseTransformChunkEnumeration(inputStream, chunkSize);
if (compression) {
transformEnum = new CompressionChunkEnumeration(transformEnum);
}
if (encryption) {
transformEnum = new EncryptionChunkEnumeration(
transformEnum,
AesKeyAwareTest::encryptionCipherSupplier
);
}
final var transformFinisher = new TransformFinisher(transformEnum, ORIGINAL_SIZE);
final byte[] uploadedData;
final ChunkIndex chunkIndex;
try (final var sis = transformFinisher.toInputStream()) {
uploadedData = sis.readAllBytes();
chunkIndex = transformFinisher.chunkIndex();
}

// Detransform.
DetransformChunkEnumeration detransformEnum = chunkIndex == null
? new BaseDetransformChunkEnumeration(new ByteArrayInputStream(uploadedData))
: new BaseDetransformChunkEnumeration(new ByteArrayInputStream(uploadedData), chunkIndex.chunks());
if (encryption) {
detransformEnum = new DecryptionChunkEnumeration(
detransformEnum, ivSize, AesKeyAwareTest::decryptionCipherSupplier);
}
if (compression) {
detransformEnum = new DecompressionChunkEnumeration(detransformEnum);
}
final var detransformFinisher = new DetransformFinisher(detransformEnum);
try (final var sis = detransformFinisher.toInputStream()) {
final byte[] downloaded = sis.readAllBytes();
assertThat(downloaded).isEqualTo(original);
// Detransform.
try (final var uploadedStream = new ByteArrayInputStream(uploadedData)) {
DetransformChunkEnumeration detransformEnum = chunkIndex == null
? new BaseDetransformChunkEnumeration(uploadedStream)
: new BaseDetransformChunkEnumeration(uploadedStream, chunkIndex.chunks());
if (encryption) {
detransformEnum = new DecryptionChunkEnumeration(
detransformEnum, ivSize, AesKeyAwareTest::decryptionCipherSupplier);
}
if (compression) {
detransformEnum = new DecompressionChunkEnumeration(detransformEnum);
}
final var detransformFinisher = new DetransformFinisher(detransformEnum);
try (final var sis = detransformFinisher.toInputStream()) {
final byte[] downloaded = sis.readAllBytes();
assertThat(downloaded).isEqualTo(original);
}
}
}
}
}

0 comments on commit 5726bc9

Please sign in to comment.