Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add transform pipeline #217

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions checkstyle/suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,13 @@
<suppress checks="ClassDataAbstractionCoupling" files=".*Test\.java"/>
<suppress checks="ClassFanOutComplexity" files=".*Test\.java"/>
<suppress checks="ClassFanOutComplexity" files="S3ClientWrapper.java"/>
<suppress checks="ClassFanOutComplexity" files="TransformPipeline.java"/>
<suppress checks="ClassFanOutComplexity" files="RemoteStorageManager.java"/>
<suppress checks="ClassDataAbstractionCoupling" files="S3ClientWrapper.java"/>
<suppress checks="ClassDataAbstractionCoupling" files="S3RemoteStorageManagerConfig.java"/>
<suppress checks="ClassDataAbstractionCoupling" files="S3StorageConfig.java"/>
<suppress checks="ClassDataAbstractionCoupling" files="ChunkManager.java"/>
<suppress checks="ClassDataAbstractionCoupling" files="TransformPipeline.java"/>
<suppress checks="AbbreviationAsWordInName" files="DataKeyAndAADEqualsTest"/>
<suppress checks="ClassDataAbstractionCoupling" files="RemoteStorageManager.java"/>
</suppressions>
Original file line number Diff line number Diff line change
Expand Up @@ -19,39 +19,32 @@
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.List;
import java.util.Optional;

import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;

import io.aiven.kafka.tieredstorage.cache.ChunkCache;
import io.aiven.kafka.tieredstorage.manifest.SegmentEncryptionMetadata;
import io.aiven.kafka.tieredstorage.manifest.SegmentManifest;
import io.aiven.kafka.tieredstorage.security.AesEncryptionProvider;
import io.aiven.kafka.tieredstorage.storage.ObjectFetcher;
import io.aiven.kafka.tieredstorage.storage.StorageBackendException;
import io.aiven.kafka.tieredstorage.transform.BaseDetransformChunkEnumeration;
import io.aiven.kafka.tieredstorage.transform.DecompressionChunkEnumeration;
import io.aiven.kafka.tieredstorage.transform.DecryptionChunkEnumeration;
import io.aiven.kafka.tieredstorage.transform.DetransformChunkEnumeration;
import io.aiven.kafka.tieredstorage.transform.DetransformFinisher;
import io.aiven.kafka.tieredstorage.transform.TransformPipeline;

import org.apache.commons.io.IOUtils;

public class ChunkManager {
private final ObjectFetcher fetcher;
private final ObjectKey objectKey;
private final AesEncryptionProvider aesEncryptionProvider;
private final ChunkCache chunkCache;
private final TransformPipeline transformPipeline;

public ChunkManager(final ObjectFetcher fetcher,
final ObjectKey objectKey,
final AesEncryptionProvider aesEncryptionProvider,
final ChunkCache chunkCache) {
final ChunkCache chunkCache,
final TransformPipeline transformPipeline) {
this.fetcher = fetcher;
this.objectKey = objectKey;
this.aesEncryptionProvider = aesEncryptionProvider;
this.chunkCache = chunkCache;
this.transformPipeline = transformPipeline;
}

/**
Expand All @@ -63,19 +56,8 @@ public InputStream getChunk(final RemoteLogSegmentMetadata remoteLogSegmentMetad
final int chunkId) throws StorageBackendException {
final Chunk chunk = manifest.chunkIndex().chunks().get(chunkId);
final InputStream chunkContent = getChunkContent(remoteLogSegmentMetadata, chunk);
DetransformChunkEnumeration detransformEnum = new BaseDetransformChunkEnumeration(chunkContent, List.of(chunk));
final Optional<SegmentEncryptionMetadata> encryptionMetadata = manifest.encryption();
if (encryptionMetadata.isPresent()) {
detransformEnum = new DecryptionChunkEnumeration(
detransformEnum,
encryptionMetadata.get().ivSize(),
encryptedChunk -> aesEncryptionProvider.decryptionCipher(encryptedChunk, encryptionMetadata.get())
);
}
if (manifest.compression()) {
detransformEnum = new DecompressionChunkEnumeration(detransformEnum);
}
final DetransformFinisher detransformFinisher = new DetransformFinisher(detransformEnum);
final var detransformFinisher = transformPipeline.outboundTransformChain(chunkContent, manifest, chunk)
.complete();
return detransformFinisher.toInputStream();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,6 @@

package io.aiven.kafka.tieredstorage;

import javax.crypto.SecretKey;
import javax.crypto.spec.SecretKeySpec;

import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.IOException;
Expand All @@ -35,31 +32,17 @@
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
import org.apache.kafka.server.log.remote.storage.RemoteStorageException;

import io.aiven.kafka.tieredstorage.manifest.SegmentEncryptionMetadataV1;
import io.aiven.kafka.tieredstorage.manifest.SegmentManifest;
import io.aiven.kafka.tieredstorage.manifest.SegmentManifestV1;
import io.aiven.kafka.tieredstorage.manifest.index.ChunkIndex;
import io.aiven.kafka.tieredstorage.manifest.serde.DataKeyDeserializer;
import io.aiven.kafka.tieredstorage.manifest.serde.DataKeySerializer;
import io.aiven.kafka.tieredstorage.metrics.Metrics;
import io.aiven.kafka.tieredstorage.security.AesEncryptionProvider;
import io.aiven.kafka.tieredstorage.security.DataKeyAndAAD;
import io.aiven.kafka.tieredstorage.security.RsaEncryptionProvider;
import io.aiven.kafka.tieredstorage.storage.BytesRange;
import io.aiven.kafka.tieredstorage.storage.ObjectDeleter;
import io.aiven.kafka.tieredstorage.storage.ObjectFetcher;
import io.aiven.kafka.tieredstorage.storage.ObjectUploader;
import io.aiven.kafka.tieredstorage.storage.StorageBackendException;
import io.aiven.kafka.tieredstorage.transform.BaseTransformChunkEnumeration;
import io.aiven.kafka.tieredstorage.transform.CompressionChunkEnumeration;
import io.aiven.kafka.tieredstorage.transform.EncryptionChunkEnumeration;
import io.aiven.kafka.tieredstorage.transform.FetchChunkEnumeration;
import io.aiven.kafka.tieredstorage.transform.TransformChunkEnumeration;
import io.aiven.kafka.tieredstorage.transform.TransformFinisher;
import io.aiven.kafka.tieredstorage.transform.TransformPipeline;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.fasterxml.jackson.datatype.jdk8.Jdk8Module;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -83,13 +66,9 @@ public class RemoteStorageManager implements org.apache.kafka.server.log.remote.
private ObjectDeleter deleter;
private boolean compressionEnabled;
private boolean compressionHeuristic;
private boolean encryptionEnabled;
private int chunkSize;
private RsaEncryptionProvider rsaEncryptionProvider;
private AesEncryptionProvider aesEncryptionProvider;
private ObjectMapper mapper;
private ChunkManager chunkManager;
private ObjectKey objectKey;
private TransformPipeline transformPipeline;

private SegmentManifestProvider segmentManifestProvider;

Expand All @@ -111,49 +90,27 @@ public void configure(final Map<String, ?> configs) {
uploader = config.storage();
deleter = config.storage();
objectKey = new ObjectKey(config.keyPrefix());
encryptionEnabled = config.encryptionEnabled();
if (encryptionEnabled) {
rsaEncryptionProvider = RsaEncryptionProvider.of(
config.encryptionPublicKeyFile(),
config.encryptionPrivateKeyFile()
);
aesEncryptionProvider = new AesEncryptionProvider();
}
chunkManager = new ChunkManager(
fetcher,
objectKey,
aesEncryptionProvider,
config.chunkCache()
);

chunkSize = config.chunkSize();
compressionEnabled = config.compressionEnabled();
compressionHeuristic = config.compressionHeuristicEnabled();

mapper = getObjectMapper();
transformPipeline = TransformPipeline.newBuilder().fromConfig(config).build();

chunkManager = new ChunkManager(
fetcher,
objectKey,
config.chunkCache(),
transformPipeline
);
segmentManifestProvider = new SegmentManifestProvider(
objectKey,
config.segmentManifestCacheSize(),
config.segmentManifestCacheRetention(),
fetcher,
mapper,
transformPipeline.objectMapper(),
executor);
}

private ObjectMapper getObjectMapper() {
final ObjectMapper objectMapper = new ObjectMapper();
objectMapper.registerModule(new Jdk8Module());
if (encryptionEnabled) {
final SimpleModule simpleModule = new SimpleModule();
simpleModule.addSerializer(SecretKey.class, new DataKeySerializer(rsaEncryptionProvider::encryptDataKey));
simpleModule.addDeserializer(SecretKey.class, new DataKeyDeserializer(
b -> new SecretKeySpec(rsaEncryptionProvider.decryptDataKey(b), "AES")));
objectMapper.registerModule(simpleModule);
}
return objectMapper;
}

@Override
public void copyLogSegmentData(final RemoteLogSegmentMetadata remoteLogSegmentMetadata,
final LogSegmentData logSegmentData) throws RemoteStorageException {
Expand All @@ -165,27 +122,11 @@ public void copyLogSegmentData(final RemoteLogSegmentMetadata remoteLogSegmentMe
final long startedMs = time.milliseconds();

try {
TransformChunkEnumeration transformEnum = new BaseTransformChunkEnumeration(
Files.newInputStream(logSegmentData.logSegment()), chunkSize);
SegmentEncryptionMetadataV1 encryptionMetadata = null;
final boolean requiresCompression = requiresCompression(logSegmentData);
if (requiresCompression) {
transformEnum = new CompressionChunkEnumeration(transformEnum);
}
if (encryptionEnabled) {
final DataKeyAndAAD dataKeyAndAAD = aesEncryptionProvider.createDataKeyAndAAD();
transformEnum = new EncryptionChunkEnumeration(
transformEnum,
() -> aesEncryptionProvider.encryptionCipher(dataKeyAndAAD));
encryptionMetadata = new SegmentEncryptionMetadataV1(dataKeyAndAAD.dataKey, dataKeyAndAAD.aad);
}
final TransformFinisher transformFinisher =
new TransformFinisher(transformEnum, remoteLogSegmentMetadata.segmentSizeInBytes());
final var inboundTransformChain = transformPipeline.inboundTransformChain(logSegmentData.logSegment());
final var transformFinisher = inboundTransformChain.complete();
uploadSegmentLog(remoteLogSegmentMetadata, transformFinisher);

final ChunkIndex chunkIndex = transformFinisher.chunkIndex();
final SegmentManifest segmentManifest =
new SegmentManifestV1(chunkIndex, requiresCompression, encryptionMetadata);
final SegmentManifest segmentManifest = transformPipeline.segmentManifest(transformFinisher.chunkIndex());
uploadManifest(remoteLogSegmentMetadata, segmentManifest);

final InputStream offsetIndex = Files.newInputStream(logSegmentData.offsetIndex());
Expand Down Expand Up @@ -248,10 +189,9 @@ private void uploadIndexFile(final RemoteLogSegmentMetadata remoteLogSegmentMeta
private void uploadManifest(final RemoteLogSegmentMetadata remoteLogSegmentMetadata,
final SegmentManifest segmentManifest)
throws StorageBackendException, IOException {
final String manifest = mapper.writeValueAsString(segmentManifest);
final String manifestFileKey = objectKey.key(remoteLogSegmentMetadata, ObjectKey.Suffix.MANIFEST);

try (final ByteArrayInputStream manifestContent = new ByteArrayInputStream(manifest.getBytes())) {
final byte[] manifestBytes = transformPipeline.objectMapper().writeValueAsBytes(segmentManifest);
try (final var manifestContent = new ByteArrayInputStream(manifestBytes)) {
final String manifestFileKey = objectKey.key(remoteLogSegmentMetadata, ObjectKey.Suffix.MANIFEST);
uploader.upload(manifestContent, manifestFileKey);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,35 +243,35 @@ ChunkCache chunkCache() {
}
}

String keyPrefix() {
public String keyPrefix() {
return getString(OBJECT_KEY_PREFIX_CONFIG);
}

int chunkSize() {
public int chunkSize() {
return getInt(CHUNK_SIZE_CONFIG);
}

boolean compressionEnabled() {
public boolean compressionEnabled() {
return getBoolean(COMPRESSION_ENABLED_CONFIG);
}

boolean compressionHeuristicEnabled() {
public boolean compressionHeuristicEnabled() {
return getBoolean(COMPRESSION_HEURISTIC_ENABLED_CONFIG);
}

boolean encryptionEnabled() {
public boolean encryptionEnabled() {
return getBoolean(ENCRYPTION_CONFIG);
}

Path encryptionPublicKeyFile() {
public Path encryptionPublicKeyFile() {
final String value = getString(ENCRYPTION_PUBLIC_KEY_FILE_CONFIG);
if (value == null) {
return null;
}
return Path.of(value);
}

Path encryptionPrivateKeyFile() {
public Path encryptionPrivateKeyFile() {
final String value = getString(ENCRYPTION_PRIVATE_KEY_FILE_CONFIG);
if (value == null) {
return null;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Copyright 2023 Aiven Oy
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.aiven.kafka.tieredstorage.transform;

import java.io.InputStream;
import java.util.function.Function;

public final class InboundTransformChain {
final int originalSize;
TransformChunkEnumeration inner;

InboundTransformChain(final InputStream content, final int size, final int chunkSize) {
this.originalSize = size;
this.inner = new BaseTransformChunkEnumeration(content, chunkSize);
}

void chain(final Function<TransformChunkEnumeration, TransformChunkEnumeration> transformSupplier) {
this.inner = transformSupplier.apply(this.inner);
}

public TransformFinisher complete() {
return new TransformFinisher(inner, originalSize);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright 2023 Aiven Oy
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.aiven.kafka.tieredstorage.transform;

import java.io.InputStream;
import java.util.List;
import java.util.function.Function;

import io.aiven.kafka.tieredstorage.Chunk;

public final class OutboundTransformChain {
DetransformChunkEnumeration inner;

OutboundTransformChain(final InputStream uploadedData, final List<Chunk> chunks) {
this.inner = new BaseDetransformChunkEnumeration(uploadedData, chunks);
}

void chain(final Function<DetransformChunkEnumeration, DetransformChunkEnumeration> transform) {
this.inner = transform.apply(this.inner);
}

public DetransformFinisher complete() {
return new DetransformFinisher(inner);
}
}
Loading