diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index ab03d2988..8c3be1151 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -21,11 +21,13 @@
+
+
diff --git a/core/src/main/java/io/aiven/kafka/tieredstorage/ChunkManager.java b/core/src/main/java/io/aiven/kafka/tieredstorage/ChunkManager.java
index dfbb5be66..aaa6b94bc 100644
--- a/core/src/main/java/io/aiven/kafka/tieredstorage/ChunkManager.java
+++ b/core/src/main/java/io/aiven/kafka/tieredstorage/ChunkManager.java
@@ -19,39 +19,32 @@
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
-import java.util.List;
import java.util.Optional;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
import io.aiven.kafka.tieredstorage.cache.ChunkCache;
-import io.aiven.kafka.tieredstorage.manifest.SegmentEncryptionMetadata;
import io.aiven.kafka.tieredstorage.manifest.SegmentManifest;
-import io.aiven.kafka.tieredstorage.security.AesEncryptionProvider;
import io.aiven.kafka.tieredstorage.storage.ObjectFetcher;
import io.aiven.kafka.tieredstorage.storage.StorageBackendException;
-import io.aiven.kafka.tieredstorage.transform.BaseDetransformChunkEnumeration;
-import io.aiven.kafka.tieredstorage.transform.DecompressionChunkEnumeration;
-import io.aiven.kafka.tieredstorage.transform.DecryptionChunkEnumeration;
-import io.aiven.kafka.tieredstorage.transform.DetransformChunkEnumeration;
-import io.aiven.kafka.tieredstorage.transform.DetransformFinisher;
+import io.aiven.kafka.tieredstorage.transform.TransformPipeline;
import org.apache.commons.io.IOUtils;
public class ChunkManager {
private final ObjectFetcher fetcher;
private final ObjectKey objectKey;
- private final AesEncryptionProvider aesEncryptionProvider;
private final ChunkCache chunkCache;
+ private final TransformPipeline transformPipeline;
public ChunkManager(final ObjectFetcher fetcher,
final ObjectKey objectKey,
- final AesEncryptionProvider aesEncryptionProvider,
- final ChunkCache chunkCache) {
+ final ChunkCache chunkCache,
+ final TransformPipeline transformPipeline) {
this.fetcher = fetcher;
this.objectKey = objectKey;
- this.aesEncryptionProvider = aesEncryptionProvider;
this.chunkCache = chunkCache;
+ this.transformPipeline = transformPipeline;
}
/**
@@ -63,19 +56,8 @@ public InputStream getChunk(final RemoteLogSegmentMetadata remoteLogSegmentMetad
final int chunkId) throws StorageBackendException {
final Chunk chunk = manifest.chunkIndex().chunks().get(chunkId);
final InputStream chunkContent = getChunkContent(remoteLogSegmentMetadata, chunk);
- DetransformChunkEnumeration detransformEnum = new BaseDetransformChunkEnumeration(chunkContent, List.of(chunk));
- final Optional encryptionMetadata = manifest.encryption();
- if (encryptionMetadata.isPresent()) {
- detransformEnum = new DecryptionChunkEnumeration(
- detransformEnum,
- encryptionMetadata.get().ivSize(),
- encryptedChunk -> aesEncryptionProvider.decryptionCipher(encryptedChunk, encryptionMetadata.get())
- );
- }
- if (manifest.compression()) {
- detransformEnum = new DecompressionChunkEnumeration(detransformEnum);
- }
- final DetransformFinisher detransformFinisher = new DetransformFinisher(detransformEnum);
+ final var detransformFinisher = transformPipeline.outboundTransformChain(chunkContent, manifest, chunk)
+ .complete();
return detransformFinisher.toInputStream();
}
diff --git a/core/src/main/java/io/aiven/kafka/tieredstorage/RemoteStorageManager.java b/core/src/main/java/io/aiven/kafka/tieredstorage/RemoteStorageManager.java
index c97fdab14..ced42376d 100644
--- a/core/src/main/java/io/aiven/kafka/tieredstorage/RemoteStorageManager.java
+++ b/core/src/main/java/io/aiven/kafka/tieredstorage/RemoteStorageManager.java
@@ -16,9 +16,6 @@
package io.aiven.kafka.tieredstorage;
-import javax.crypto.SecretKey;
-import javax.crypto.spec.SecretKeySpec;
-
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.IOException;
@@ -35,31 +32,17 @@
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
-import io.aiven.kafka.tieredstorage.manifest.SegmentEncryptionMetadataV1;
import io.aiven.kafka.tieredstorage.manifest.SegmentManifest;
-import io.aiven.kafka.tieredstorage.manifest.SegmentManifestV1;
-import io.aiven.kafka.tieredstorage.manifest.index.ChunkIndex;
-import io.aiven.kafka.tieredstorage.manifest.serde.DataKeyDeserializer;
-import io.aiven.kafka.tieredstorage.manifest.serde.DataKeySerializer;
import io.aiven.kafka.tieredstorage.metrics.Metrics;
-import io.aiven.kafka.tieredstorage.security.AesEncryptionProvider;
-import io.aiven.kafka.tieredstorage.security.DataKeyAndAAD;
-import io.aiven.kafka.tieredstorage.security.RsaEncryptionProvider;
import io.aiven.kafka.tieredstorage.storage.BytesRange;
import io.aiven.kafka.tieredstorage.storage.ObjectDeleter;
import io.aiven.kafka.tieredstorage.storage.ObjectFetcher;
import io.aiven.kafka.tieredstorage.storage.ObjectUploader;
import io.aiven.kafka.tieredstorage.storage.StorageBackendException;
-import io.aiven.kafka.tieredstorage.transform.BaseTransformChunkEnumeration;
-import io.aiven.kafka.tieredstorage.transform.CompressionChunkEnumeration;
-import io.aiven.kafka.tieredstorage.transform.EncryptionChunkEnumeration;
import io.aiven.kafka.tieredstorage.transform.FetchChunkEnumeration;
-import io.aiven.kafka.tieredstorage.transform.TransformChunkEnumeration;
import io.aiven.kafka.tieredstorage.transform.TransformFinisher;
+import io.aiven.kafka.tieredstorage.transform.TransformPipeline;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.module.SimpleModule;
-import com.fasterxml.jackson.datatype.jdk8.Jdk8Module;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -83,13 +66,9 @@ public class RemoteStorageManager implements org.apache.kafka.server.log.remote.
private ObjectDeleter deleter;
private boolean compressionEnabled;
private boolean compressionHeuristic;
- private boolean encryptionEnabled;
- private int chunkSize;
- private RsaEncryptionProvider rsaEncryptionProvider;
- private AesEncryptionProvider aesEncryptionProvider;
- private ObjectMapper mapper;
private ChunkManager chunkManager;
private ObjectKey objectKey;
+ private TransformPipeline transformPipeline;
private SegmentManifestProvider segmentManifestProvider;
@@ -111,49 +90,27 @@ public void configure(final Map configs) {
uploader = config.storage();
deleter = config.storage();
objectKey = new ObjectKey(config.keyPrefix());
- encryptionEnabled = config.encryptionEnabled();
- if (encryptionEnabled) {
- rsaEncryptionProvider = RsaEncryptionProvider.of(
- config.encryptionPublicKeyFile(),
- config.encryptionPrivateKeyFile()
- );
- aesEncryptionProvider = new AesEncryptionProvider();
- }
- chunkManager = new ChunkManager(
- fetcher,
- objectKey,
- aesEncryptionProvider,
- config.chunkCache()
- );
- chunkSize = config.chunkSize();
compressionEnabled = config.compressionEnabled();
compressionHeuristic = config.compressionHeuristicEnabled();
- mapper = getObjectMapper();
+ transformPipeline = TransformPipeline.newBuilder().fromConfig(config).build();
+ chunkManager = new ChunkManager(
+ fetcher,
+ objectKey,
+ config.chunkCache(),
+ transformPipeline
+ );
segmentManifestProvider = new SegmentManifestProvider(
objectKey,
config.segmentManifestCacheSize(),
config.segmentManifestCacheRetention(),
fetcher,
- mapper,
+ transformPipeline.objectMapper(),
executor);
}
- private ObjectMapper getObjectMapper() {
- final ObjectMapper objectMapper = new ObjectMapper();
- objectMapper.registerModule(new Jdk8Module());
- if (encryptionEnabled) {
- final SimpleModule simpleModule = new SimpleModule();
- simpleModule.addSerializer(SecretKey.class, new DataKeySerializer(rsaEncryptionProvider::encryptDataKey));
- simpleModule.addDeserializer(SecretKey.class, new DataKeyDeserializer(
- b -> new SecretKeySpec(rsaEncryptionProvider.decryptDataKey(b), "AES")));
- objectMapper.registerModule(simpleModule);
- }
- return objectMapper;
- }
-
@Override
public void copyLogSegmentData(final RemoteLogSegmentMetadata remoteLogSegmentMetadata,
final LogSegmentData logSegmentData) throws RemoteStorageException {
@@ -165,27 +122,11 @@ public void copyLogSegmentData(final RemoteLogSegmentMetadata remoteLogSegmentMe
final long startedMs = time.milliseconds();
try {
- TransformChunkEnumeration transformEnum = new BaseTransformChunkEnumeration(
- Files.newInputStream(logSegmentData.logSegment()), chunkSize);
- SegmentEncryptionMetadataV1 encryptionMetadata = null;
- final boolean requiresCompression = requiresCompression(logSegmentData);
- if (requiresCompression) {
- transformEnum = new CompressionChunkEnumeration(transformEnum);
- }
- if (encryptionEnabled) {
- final DataKeyAndAAD dataKeyAndAAD = aesEncryptionProvider.createDataKeyAndAAD();
- transformEnum = new EncryptionChunkEnumeration(
- transformEnum,
- () -> aesEncryptionProvider.encryptionCipher(dataKeyAndAAD));
- encryptionMetadata = new SegmentEncryptionMetadataV1(dataKeyAndAAD.dataKey, dataKeyAndAAD.aad);
- }
- final TransformFinisher transformFinisher =
- new TransformFinisher(transformEnum, remoteLogSegmentMetadata.segmentSizeInBytes());
+ final var inboundTransformChain = transformPipeline.inboundTransformChain(logSegmentData.logSegment());
+ final var transformFinisher = inboundTransformChain.complete();
uploadSegmentLog(remoteLogSegmentMetadata, transformFinisher);
- final ChunkIndex chunkIndex = transformFinisher.chunkIndex();
- final SegmentManifest segmentManifest =
- new SegmentManifestV1(chunkIndex, requiresCompression, encryptionMetadata);
+ final SegmentManifest segmentManifest = transformPipeline.segmentManifest(transformFinisher.chunkIndex());
uploadManifest(remoteLogSegmentMetadata, segmentManifest);
final InputStream offsetIndex = Files.newInputStream(logSegmentData.offsetIndex());
@@ -248,10 +189,9 @@ private void uploadIndexFile(final RemoteLogSegmentMetadata remoteLogSegmentMeta
private void uploadManifest(final RemoteLogSegmentMetadata remoteLogSegmentMetadata,
final SegmentManifest segmentManifest)
throws StorageBackendException, IOException {
- final String manifest = mapper.writeValueAsString(segmentManifest);
- final String manifestFileKey = objectKey.key(remoteLogSegmentMetadata, ObjectKey.Suffix.MANIFEST);
-
- try (final ByteArrayInputStream manifestContent = new ByteArrayInputStream(manifest.getBytes())) {
+ final byte[] manifestBytes = transformPipeline.objectMapper().writeValueAsBytes(segmentManifest);
+ try (final var manifestContent = new ByteArrayInputStream(manifestBytes)) {
+ final String manifestFileKey = objectKey.key(remoteLogSegmentMetadata, ObjectKey.Suffix.MANIFEST);
uploader.upload(manifestContent, manifestFileKey);
}
}
diff --git a/core/src/main/java/io/aiven/kafka/tieredstorage/RemoteStorageManagerConfig.java b/core/src/main/java/io/aiven/kafka/tieredstorage/RemoteStorageManagerConfig.java
index 72ad5905c..36a056df3 100644
--- a/core/src/main/java/io/aiven/kafka/tieredstorage/RemoteStorageManagerConfig.java
+++ b/core/src/main/java/io/aiven/kafka/tieredstorage/RemoteStorageManagerConfig.java
@@ -243,27 +243,27 @@ ChunkCache chunkCache() {
}
}
- String keyPrefix() {
+ public String keyPrefix() {
return getString(OBJECT_KEY_PREFIX_CONFIG);
}
- int chunkSize() {
+ public int chunkSize() {
return getInt(CHUNK_SIZE_CONFIG);
}
- boolean compressionEnabled() {
+ public boolean compressionEnabled() {
return getBoolean(COMPRESSION_ENABLED_CONFIG);
}
- boolean compressionHeuristicEnabled() {
+ public boolean compressionHeuristicEnabled() {
return getBoolean(COMPRESSION_HEURISTIC_ENABLED_CONFIG);
}
- boolean encryptionEnabled() {
+ public boolean encryptionEnabled() {
return getBoolean(ENCRYPTION_CONFIG);
}
- Path encryptionPublicKeyFile() {
+ public Path encryptionPublicKeyFile() {
final String value = getString(ENCRYPTION_PUBLIC_KEY_FILE_CONFIG);
if (value == null) {
return null;
@@ -271,7 +271,7 @@ Path encryptionPublicKeyFile() {
return Path.of(value);
}
- Path encryptionPrivateKeyFile() {
+ public Path encryptionPrivateKeyFile() {
final String value = getString(ENCRYPTION_PRIVATE_KEY_FILE_CONFIG);
if (value == null) {
return null;
diff --git a/core/src/main/java/io/aiven/kafka/tieredstorage/transform/InboundTransformChain.java b/core/src/main/java/io/aiven/kafka/tieredstorage/transform/InboundTransformChain.java
new file mode 100644
index 000000000..bda22a8ab
--- /dev/null
+++ b/core/src/main/java/io/aiven/kafka/tieredstorage/transform/InboundTransformChain.java
@@ -0,0 +1,38 @@
+/*
+ * Copyright 2023 Aiven Oy
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.aiven.kafka.tieredstorage.transform;
+
+import java.io.InputStream;
+import java.util.function.Function;
+
+public final class InboundTransformChain {
+ final int originalSize;
+ TransformChunkEnumeration inner;
+
+ InboundTransformChain(final InputStream content, final int size, final int chunkSize) {
+ this.originalSize = size;
+ this.inner = new BaseTransformChunkEnumeration(content, chunkSize);
+ }
+
+ void chain(final Function transformSupplier) {
+ this.inner = transformSupplier.apply(this.inner);
+ }
+
+ public TransformFinisher complete() {
+ return new TransformFinisher(inner, originalSize);
+ }
+}
diff --git a/core/src/main/java/io/aiven/kafka/tieredstorage/transform/OutboundTransformChain.java b/core/src/main/java/io/aiven/kafka/tieredstorage/transform/OutboundTransformChain.java
new file mode 100644
index 000000000..f26d511f1
--- /dev/null
+++ b/core/src/main/java/io/aiven/kafka/tieredstorage/transform/OutboundTransformChain.java
@@ -0,0 +1,39 @@
+/*
+ * Copyright 2023 Aiven Oy
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.aiven.kafka.tieredstorage.transform;
+
+import java.io.InputStream;
+import java.util.List;
+import java.util.function.Function;
+
+import io.aiven.kafka.tieredstorage.Chunk;
+
+public final class OutboundTransformChain {
+ DetransformChunkEnumeration inner;
+
+ OutboundTransformChain(final InputStream uploadedData, final List chunks) {
+ this.inner = new BaseDetransformChunkEnumeration(uploadedData, chunks);
+ }
+
+ void chain(final Function transform) {
+ this.inner = transform.apply(this.inner);
+ }
+
+ public DetransformFinisher complete() {
+ return new DetransformFinisher(inner);
+ }
+}
diff --git a/core/src/main/java/io/aiven/kafka/tieredstorage/transform/TransformPipeline.java b/core/src/main/java/io/aiven/kafka/tieredstorage/transform/TransformPipeline.java
new file mode 100644
index 000000000..4fdcf0114
--- /dev/null
+++ b/core/src/main/java/io/aiven/kafka/tieredstorage/transform/TransformPipeline.java
@@ -0,0 +1,208 @@
+/*
+ * Copyright 2023 Aiven Oy
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.aiven.kafka.tieredstorage.transform;
+
+import javax.crypto.Cipher;
+import javax.crypto.SecretKey;
+import javax.crypto.spec.SecretKeySpec;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.List;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+import java.util.function.Supplier;
+
+import io.aiven.kafka.tieredstorage.Chunk;
+import io.aiven.kafka.tieredstorage.RemoteStorageManagerConfig;
+import io.aiven.kafka.tieredstorage.manifest.SegmentEncryptionMetadata;
+import io.aiven.kafka.tieredstorage.manifest.SegmentEncryptionMetadataV1;
+import io.aiven.kafka.tieredstorage.manifest.SegmentManifest;
+import io.aiven.kafka.tieredstorage.manifest.SegmentManifestV1;
+import io.aiven.kafka.tieredstorage.manifest.index.ChunkIndex;
+import io.aiven.kafka.tieredstorage.manifest.serde.DataKeyDeserializer;
+import io.aiven.kafka.tieredstorage.manifest.serde.DataKeySerializer;
+import io.aiven.kafka.tieredstorage.security.AesEncryptionProvider;
+import io.aiven.kafka.tieredstorage.security.DataKeyAndAAD;
+import io.aiven.kafka.tieredstorage.security.RsaEncryptionProvider;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.module.SimpleModule;
+import com.fasterxml.jackson.datatype.jdk8.Jdk8Module;
+
+public class TransformPipeline {
+
+ final int chunkSize;
+ final boolean withCompression;
+ final boolean withEncryption;
+ final DataKeyAndAAD dataKeyAndAAD;
+ final int ivSize;
+ final Supplier inboundCipherSupplier;
+ final BiFunction outboundCipherSupplier;
+ final ObjectMapper objectMapper;
+
+ public TransformPipeline(final int chunkSize,
+ final boolean withCompression,
+ final boolean withEncryption,
+ final DataKeyAndAAD dataKeyAndAAD,
+ final int ivSize,
+ final Supplier inboundCipherSupplier,
+ final BiFunction outboundCipherSupplier,
+ final ObjectMapper objectMapper) {
+ this.chunkSize = chunkSize;
+ this.withCompression = withCompression;
+ this.withEncryption = withEncryption;
+ this.dataKeyAndAAD = dataKeyAndAAD;
+ this.ivSize = ivSize;
+ this.inboundCipherSupplier = inboundCipherSupplier;
+ this.outboundCipherSupplier = outboundCipherSupplier;
+ this.objectMapper = objectMapper;
+ }
+
+
+ public static TransformPipeline.Builder newBuilder() {
+ return new Builder();
+ }
+
+ public SegmentManifest segmentManifest(final ChunkIndex chunkIndex) {
+ SegmentEncryptionMetadataV1 encryption = null;
+ if (withEncryption) {
+ encryption = new SegmentEncryptionMetadataV1(dataKeyAndAAD.dataKey, dataKeyAndAAD.aad);
+ }
+ return new SegmentManifestV1(chunkIndex, withCompression, encryption);
+ }
+
+ public InboundTransformChain inboundTransformChain(final Path logPath) throws IOException {
+ return inboundTransformChain(Files.newInputStream(logPath), (int) Files.size(logPath));
+ }
+
+ public InboundTransformChain inboundTransformChain(final InputStream content, final int size) {
+ final Function inboundFunction = inboundTransformChain -> {
+ if (withCompression) {
+ inboundTransformChain.chain(CompressionChunkEnumeration::new);
+ }
+ if (withEncryption) {
+ inboundTransformChain.chain(inboundTransform ->
+ new EncryptionChunkEnumeration(inboundTransform, inboundCipherSupplier));
+ }
+ return inboundTransformChain;
+ };
+ return inboundFunction.apply(new InboundTransformChain(content, size, chunkSize));
+ }
+
+ public OutboundTransformChain outboundTransformChain(final InputStream uploadedData,
+ final SegmentManifest manifest,
+ final Chunk chunk) {
+ return outboundTransformChain(uploadedData, manifest, List.of(chunk));
+ }
+
+ public OutboundTransformChain outboundTransformChain(final InputStream uploadedData,
+ final SegmentManifest manifest,
+ final List chunks) {
+ final Function outboundFunction =
+ outboundTransformChain -> {
+ if (withEncryption) {
+ outboundTransformChain.chain(
+ outboundTransform ->
+ new DecryptionChunkEnumeration(
+ outboundTransform,
+ ivSize,
+ bytes -> outboundCipherSupplier.apply(bytes, manifest.encryption().get())));
+ }
+ if (withCompression) {
+ outboundTransformChain.chain(DecompressionChunkEnumeration::new);
+ }
+ return outboundTransformChain;
+ };
+ return outboundFunction.apply(new OutboundTransformChain(uploadedData, chunks));
+ }
+
+ public ObjectMapper objectMapper() {
+ return objectMapper;
+ }
+
+ public static class Builder {
+ private int chunkSize;
+ private boolean withEncryption = false;
+ private int ivSize = -1;
+ private Supplier inboundCipherSupplier = null;
+ private BiFunction outboundCipherSupplier = null;
+ private boolean withCompression = false;
+ private DataKeyAndAAD dataKeyAndAAD;
+ private RsaEncryptionProvider rsaEncryptionProvider;
+
+ private ObjectMapper objectMapper() {
+ final ObjectMapper objectMapper = new ObjectMapper();
+ objectMapper.registerModule(new Jdk8Module());
+ if (withEncryption) {
+ final SimpleModule simpleModule = new SimpleModule();
+ simpleModule.addSerializer(SecretKey.class,
+ new DataKeySerializer(rsaEncryptionProvider::encryptDataKey));
+ simpleModule.addDeserializer(SecretKey.class, new DataKeyDeserializer(
+ b -> new SecretKeySpec(rsaEncryptionProvider.decryptDataKey(b), "AES")));
+ objectMapper.registerModule(simpleModule);
+ }
+ return objectMapper;
+ }
+
+ public Builder withChunkSize(final int chunkSize) {
+ this.chunkSize = chunkSize;
+ return this;
+ }
+
+ public Builder withEncryption(final Path publicKeyFile, final Path privateKeyFile) {
+ rsaEncryptionProvider = RsaEncryptionProvider.of(publicKeyFile, privateKeyFile);
+ final AesEncryptionProvider aesEncryptionProvider = new AesEncryptionProvider();
+ dataKeyAndAAD = aesEncryptionProvider.createDataKeyAndAAD();
+ ivSize = aesEncryptionProvider.encryptionCipher(dataKeyAndAAD).getIV().length;
+ withEncryption = true;
+ inboundCipherSupplier = () -> aesEncryptionProvider.encryptionCipher(dataKeyAndAAD);
+ outboundCipherSupplier = aesEncryptionProvider::decryptionCipher;
+ return this;
+ }
+
+ public Builder withCompression() {
+ withCompression = true;
+ return this;
+ }
+
+ public Builder fromConfig(final RemoteStorageManagerConfig config) {
+ withChunkSize(config.chunkSize());
+ if (config.compressionEnabled()) {
+ withCompression();
+ }
+ if (config.encryptionEnabled()) {
+ withEncryption(config.encryptionPublicKeyFile(), config.encryptionPrivateKeyFile());
+ }
+ return this;
+ }
+
+ public TransformPipeline build() {
+ return new TransformPipeline(
+ chunkSize,
+ withCompression,
+ withEncryption,
+ dataKeyAndAAD,
+ ivSize,
+ inboundCipherSupplier,
+ outboundCipherSupplier,
+ objectMapper());
+ }
+ }
+}
diff --git a/core/src/test/java/io/aiven/kafka/tieredstorage/ChunkManagerTest.java b/core/src/test/java/io/aiven/kafka/tieredstorage/ChunkManagerTest.java
index 1422acc7a..b93da5b1d 100644
--- a/core/src/test/java/io/aiven/kafka/tieredstorage/ChunkManagerTest.java
+++ b/core/src/test/java/io/aiven/kafka/tieredstorage/ChunkManagerTest.java
@@ -35,6 +35,7 @@
import io.aiven.kafka.tieredstorage.security.DataKeyAndAAD;
import io.aiven.kafka.tieredstorage.storage.StorageBackend;
import io.aiven.kafka.tieredstorage.storage.StorageBackendException;
+import io.aiven.kafka.tieredstorage.transform.TransformPipeline;
import com.github.luben.zstd.ZstdCompressCtx;
import org.junit.jupiter.api.BeforeEach;
@@ -51,7 +52,7 @@
import static org.mockito.Mockito.when;
@ExtendWith(MockitoExtension.class)
-class ChunkManagerTest extends AesKeyAwareTest {
+class ChunkManagerTest extends RsaKeyAwareTest {
static final byte[] TEST_CHUNK_CONTENT = "0123456789".getBytes();
@Mock
@@ -71,7 +72,8 @@ void testGetChunk() throws StorageBackendException {
final FixedSizeChunkIndex chunkIndex = new FixedSizeChunkIndex(10, 10, 10, 10);
final SegmentManifest manifest = new SegmentManifestV1(chunkIndex, false, null);
- final ChunkManager chunkManager = new ChunkManager(storage, objectKey, null, null);
+ final TransformPipeline transformPipeline = TransformPipeline.newBuilder().build();
+ final ChunkManager chunkManager = new ChunkManager(storage, objectKey, null, transformPipeline);
when(storage.fetch("test.log", chunkIndex.chunks().get(0).range()))
.thenReturn(new ByteArrayInputStream("0123456789".getBytes()));
@@ -95,8 +97,8 @@ void testGetChunkWithCaching() throws StorageBackendException {
final ChunkManager chunkManager = new ChunkManager(
storage,
objectKey,
- null,
- new UnboundInMemoryChunkCache()
+ new UnboundInMemoryChunkCache(),
+ TransformPipeline.newBuilder().build()
);
assertThat(chunkManager.getChunk(remoteLogSegmentMetadata, manifest, 0)).hasContent("0123456789");
@@ -132,8 +134,8 @@ void testGetChunkWithEncryption() throws Exception {
final ChunkManager chunkManager = new ChunkManager(
storage,
objectKey,
- aesEncryptionProvider,
- new UnboundInMemoryChunkCache()
+ new UnboundInMemoryChunkCache(),
+ TransformPipeline.newBuilder().withEncryption(publicKeyPem, privateKeyPem).build()
);
assertThat(chunkManager.getChunk(remoteLogSegmentMetadata, manifest, 0)).hasBinaryContent(TEST_CHUNK_CONTENT);
@@ -165,8 +167,8 @@ void testGetChunkWithCompression() throws Exception {
final ChunkManager chunkManager = new ChunkManager(
storage,
objectKey,
- null,
- new UnboundInMemoryChunkCache()
+ new UnboundInMemoryChunkCache(),
+ TransformPipeline.newBuilder().withCompression().build()
);
assertThat(chunkManager.getChunk(remoteLogSegmentMetadata, manifest, 0)).hasBinaryContent(TEST_CHUNK_CONTENT);
diff --git a/core/src/test/java/io/aiven/kafka/tieredstorage/transform/FetchChunkEnumerationSourceInputStreamClosingTest.java b/core/src/test/java/io/aiven/kafka/tieredstorage/transform/FetchChunkEnumerationSourceInputStreamClosingTest.java
index c3cffec83..79c0e2c5c 100644
--- a/core/src/test/java/io/aiven/kafka/tieredstorage/transform/FetchChunkEnumerationSourceInputStreamClosingTest.java
+++ b/core/src/test/java/io/aiven/kafka/tieredstorage/transform/FetchChunkEnumerationSourceInputStreamClosingTest.java
@@ -96,7 +96,8 @@ void setup() {
void test(final ChunkCache chunkCache,
final boolean readFully,
final BytesRange range) throws StorageBackendException, IOException {
- final var chunkManager = new ChunkManager(fetcher, objectKey, null, chunkCache);
+ final TransformPipeline transformPipeline = TransformPipeline.newBuilder().build();
+ final var chunkManager = new ChunkManager(fetcher, objectKey, chunkCache, transformPipeline);
final var is = new FetchChunkEnumeration(chunkManager, REMOTE_LOG_SEGMENT_METADATA, SEGMENT_MANIFEST, range)
.toInputStream();
if (readFully) {
diff --git a/core/src/test/java/io/aiven/kafka/tieredstorage/transform/TransformsEndToEndTest.java b/core/src/test/java/io/aiven/kafka/tieredstorage/transform/TransformPipelineTest.java
similarity index 60%
rename from core/src/test/java/io/aiven/kafka/tieredstorage/transform/TransformsEndToEndTest.java
rename to core/src/test/java/io/aiven/kafka/tieredstorage/transform/TransformPipelineTest.java
index 5668cc2a9..89493b9fc 100644
--- a/core/src/test/java/io/aiven/kafka/tieredstorage/transform/TransformsEndToEndTest.java
+++ b/core/src/test/java/io/aiven/kafka/tieredstorage/transform/TransformPipelineTest.java
@@ -18,10 +18,12 @@
import java.io.ByteArrayInputStream;
import java.io.IOException;
+import java.util.List;
import java.util.Random;
-import io.aiven.kafka.tieredstorage.AesKeyAwareTest;
-import io.aiven.kafka.tieredstorage.manifest.index.ChunkIndex;
+import io.aiven.kafka.tieredstorage.Chunk;
+import io.aiven.kafka.tieredstorage.RsaKeyAwareTest;
+import io.aiven.kafka.tieredstorage.manifest.SegmentManifest;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.params.ParameterizedTest;
@@ -29,7 +31,7 @@
import static org.assertj.core.api.Assertions.assertThat;
-public class TransformsEndToEndTest extends AesKeyAwareTest {
+public class TransformPipelineTest extends RsaKeyAwareTest {
static final int ORIGINAL_SIZE = 1812004;
static byte[] original;
@@ -37,66 +39,65 @@ public class TransformsEndToEndTest extends AesKeyAwareTest {
@BeforeAll
static void init() {
original = new byte[ORIGINAL_SIZE];
- final var random = new Random();
+ final Random random = new Random();
random.nextBytes(original);
}
@ParameterizedTest
@ValueSource(ints = {1, 2, 3, 5, 13, 1024, 1024 * 2, 1024 * 5 + 3, ORIGINAL_SIZE - 1, ORIGINAL_SIZE * 2})
void plaintext(final int chunkSize) throws IOException {
- test(chunkSize, false, false);
+ test(TransformPipeline.newBuilder().withChunkSize(chunkSize).build());
}
@ParameterizedTest
// Small chunks would make encryption and compression tests very slow, skipping them
@ValueSource(ints = {1024 - 1, 1024, 1024 * 2 + 2, 1024 * 5 + 3, ORIGINAL_SIZE - 1, ORIGINAL_SIZE * 2})
void encryption(final int chunkSize) throws IOException {
- test(chunkSize, false, true);
+ test(TransformPipeline.newBuilder()
+ .withChunkSize(chunkSize)
+ .withEncryption(publicKeyPem, privateKeyPem)
+ .build());
}
@ParameterizedTest
// Small chunks would make compression tests going very slowly, skipping them
@ValueSource(ints = {1024 - 1, 1024, 1024 * 2 + 2, 1024 * 5 + 3, ORIGINAL_SIZE - 1, ORIGINAL_SIZE * 2})
void compression(final int chunkSize) throws IOException {
- test(chunkSize, true, false);
+ test(TransformPipeline.newBuilder()
+ .withChunkSize(chunkSize)
+ .withCompression()
+ .build());
}
@ParameterizedTest
// Small chunks would make compression tests going very slowly, skipping them
@ValueSource(ints = {1024 - 1, 1024, 1024 * 2 + 2, 1024 * 5 + 3, ORIGINAL_SIZE - 1, ORIGINAL_SIZE * 2})
void compressionAndEncryption(final int chunkSize) throws IOException {
- test(chunkSize, true, true);
+ test(TransformPipeline.newBuilder()
+ .withChunkSize(chunkSize)
+ .withCompression()
+ .withEncryption(publicKeyPem, privateKeyPem)
+ .build());
}
- private void test(final int chunkSize, final boolean compression, final boolean encryption) throws IOException {
+ private void test(final TransformPipeline pipeline) throws IOException {
// Transform.
- TransformChunkEnumeration transformEnum = new BaseTransformChunkEnumeration(
- new ByteArrayInputStream(original), chunkSize);
- if (compression) {
- transformEnum = new CompressionChunkEnumeration(transformEnum);
- }
- if (encryption) {
- transformEnum = new EncryptionChunkEnumeration(transformEnum, AesKeyAwareTest::encryptionCipherSupplier);
- }
- final var transformFinisher = new TransformFinisher(transformEnum, ORIGINAL_SIZE);
+ final ByteArrayInputStream inputContent = new ByteArrayInputStream(original);
+ final InboundTransformChain inboundTransformChain = pipeline.inboundTransformChain(inputContent, ORIGINAL_SIZE);
+ final TransformFinisher transformFinisher = inboundTransformChain.complete();
+
final byte[] uploadedData;
- final ChunkIndex chunkIndex;
try (final var sis = transformFinisher.toInputStream()) {
uploadedData = sis.readAllBytes();
- chunkIndex = transformFinisher.chunkIndex();
}
// Detransform.
- DetransformChunkEnumeration detransformEnum = new BaseDetransformChunkEnumeration(
- new ByteArrayInputStream(uploadedData), chunkIndex.chunks());
- if (encryption) {
- detransformEnum = new DecryptionChunkEnumeration(
- detransformEnum, ivSize, AesKeyAwareTest::decryptionCipherSupplier);
- }
- if (compression) {
- detransformEnum = new DecompressionChunkEnumeration(detransformEnum);
- }
- final var detransformFinisher = new DetransformFinisher(detransformEnum);
+ final SegmentManifest segmentManifest = pipeline.segmentManifest(transformFinisher.chunkIndex());
+ final ByteArrayInputStream uploadedContent = new ByteArrayInputStream(uploadedData);
+ final List chunks = segmentManifest.chunkIndex().chunks();
+ final OutboundTransformChain outboundTransformChain =
+ pipeline.outboundTransformChain(uploadedContent, segmentManifest, chunks);
+ final var detransformFinisher = outboundTransformChain.complete();
try (final var sis = detransformFinisher.toInputStream()) {
final byte[] downloaded = sis.readAllBytes();
assertThat(downloaded).isEqualTo(original);