diff --git a/core/src/main/scala/kafka/log/s3/DefaultS3Client.java b/core/src/main/scala/kafka/log/s3/DefaultS3Client.java index 2fcb597984..d15722e493 100644 --- a/core/src/main/scala/kafka/log/s3/DefaultS3Client.java +++ b/core/src/main/scala/kafka/log/s3/DefaultS3Client.java @@ -22,6 +22,7 @@ import kafka.log.es.api.StreamClient; import kafka.log.s3.cache.DefaultS3BlockCache; import kafka.log.s3.cache.S3BlockCache; +import kafka.log.s3.compact.CompactionManager; import kafka.log.s3.metadata.StreamMetadataManager; import kafka.log.s3.network.ControllerRequestSender; import kafka.log.s3.network.ControllerRequestSender.RetryPolicyContext; @@ -51,6 +52,8 @@ public class DefaultS3Client implements Client { private final StreamManager streamManager; + private final CompactionManager compactionManager; + private final S3StreamClient streamClient; private final KVClient kvClient; @@ -65,6 +68,8 @@ public DefaultS3Client(BrokerServer brokerServer, KafkaConfig config, S3Operator this.streamManager = new ControllerStreamManager(this.requestSender, config); this.objectManager = new ControllerObjectManager(this.requestSender, this.metadataManager, this.config); this.blockCache = new DefaultS3BlockCache(config.s3CacheSize(), objectManager, operator); + this.compactionManager = new CompactionManager(this.config, this.objectManager, this.metadataManager, this.operator); + this.compactionManager.start(); this.storage = new S3Storage(config, new MemoryWriteAheadLog(), objectManager, blockCache, operator); this.streamClient = new S3StreamClient(this.streamManager, this.storage, this.objectManager, this.operator, this.config); this.kvClient = new ControllerKVClient(this.requestSender); @@ -81,6 +86,7 @@ public KVClient kvClient() { } public void shutdown() { + this.compactionManager.shutdown(); this.streamClient.shutdown(); } } diff --git a/core/src/main/scala/kafka/log/s3/compact/CompactResult.java b/core/src/main/scala/kafka/log/s3/compact/CompactResult.java new file mode 100644 index 0000000000..72d58de9e3 --- /dev/null +++ b/core/src/main/scala/kafka/log/s3/compact/CompactResult.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.log.s3.compact; + +public enum CompactResult { + SUCCESS, + SKIPPED, + FAILED + +} diff --git a/core/src/main/scala/kafka/log/s3/compact/CompactionAnalyzer.java b/core/src/main/scala/kafka/log/s3/compact/CompactionAnalyzer.java new file mode 100644 index 0000000000..1fc5f4610b --- /dev/null +++ b/core/src/main/scala/kafka/log/s3/compact/CompactionAnalyzer.java @@ -0,0 +1,243 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.log.s3.compact; + +import kafka.log.s3.compact.objects.CompactedObject; +import kafka.log.s3.compact.objects.CompactedObjectBuilder; +import kafka.log.s3.compact.objects.CompactionType; +import kafka.log.s3.compact.objects.StreamDataBlock; +import kafka.log.s3.compact.operator.DataBlockReader; +import kafka.log.s3.operator.S3Operator; +import org.apache.kafka.metadata.stream.S3WALObject; +import org.apache.kafka.metadata.stream.S3WALObjectMetadata; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; + +public class CompactionAnalyzer { + private final static Logger LOGGER = org.slf4j.LoggerFactory.getLogger(CompactionAnalyzer.class); + private final long compactionCacheSize; + private final double executionScoreThreshold; + private final long streamSplitSize; + private final S3Operator s3Operator; + + public CompactionAnalyzer(long compactionCacheSize, double executionScoreThreshold, long streamSplitSize, S3Operator s3Operator) { + this.compactionCacheSize = compactionCacheSize; + this.executionScoreThreshold = executionScoreThreshold; + this.streamSplitSize = streamSplitSize; + this.s3Operator = s3Operator; + } + + public List analyze(List objectMetadataList) { + List compactionPlans = new ArrayList<>(); + try { + List compactedObjectBuilders = buildCompactedObjects(objectMetadataList); + List compactedObjects = new ArrayList<>(); + CompactedObjectBuilder compactedWALObjectBuilder = null; + long totalSize = 0L; + for (int i = 0; i < compactedObjectBuilders.size(); ) { + CompactedObjectBuilder compactedObjectBuilder = compactedObjectBuilders.get(i); + if (totalSize + compactedObjectBuilder.totalBlockSize() > compactionCacheSize) { + if (shouldSplitObject(compactedObjectBuilder)) { + // split object to fit into cache + int endOffset = 0; + for (int j = 0; j < compactedObjectBuilder.streamDataBlocks().size(); j++) { + if (totalSize + compactedObjectBuilder.streamDataBlocks().get(j).getBlockSize() > compactionCacheSize) { + endOffset = j; + break; + } + } + if (endOffset != 0) { + CompactedObjectBuilder builder = compactedObjectBuilder.split(0, endOffset); + compactedWALObjectBuilder = addOrMergeCompactedObject(builder, compactedObjects, compactedWALObjectBuilder); + } + } + compactionPlans.add(generateCompactionPlan(compactedObjects, compactedWALObjectBuilder)); + compactedObjects.clear(); + compactedWALObjectBuilder = null; + totalSize = 0; + } else { + // object fits into cache size + compactedWALObjectBuilder = addOrMergeCompactedObject(compactedObjectBuilder, compactedObjects, compactedWALObjectBuilder); + totalSize += compactedObjectBuilder.totalBlockSize(); + i++; + } + + } + if (!compactedObjects.isEmpty()) { + compactionPlans.add(generateCompactionPlan(compactedObjects, compactedWALObjectBuilder)); + } + return compactionPlans; + } catch (Exception e) { + LOGGER.error("Error while analyzing compaction plan", e); + } + return compactionPlans; + } + + private CompactedObjectBuilder addOrMergeCompactedObject(CompactedObjectBuilder compactedObjectBuilder, + List compactedObjects, + CompactedObjectBuilder compactedWALObjectBuilder) { + if (compactedObjectBuilder.type() == CompactionType.SPLIT) { + compactedObjects.add(compactedObjectBuilder.build()); + } else { + if (compactedWALObjectBuilder == null) { + compactedWALObjectBuilder = new CompactedObjectBuilder(); + } + compactedWALObjectBuilder.merge(compactedObjectBuilder); + } + return compactedWALObjectBuilder; + } + + private boolean shouldSplitObject(CompactedObjectBuilder compactedObjectBuilder) { + //TODO: split object depends on available cache size and current object size + //TODO: use multipart upload to upload split stream object + return true; + } + + private CompactionPlan generateCompactionPlan(List compactedObjects, CompactedObjectBuilder compactedWALObject) { + if (compactedWALObject != null) { + compactedObjects.add(compactedWALObject.build()); + } + Map> streamDataBlockMap = new HashMap<>(); + for (CompactedObject compactedObject : compactedObjects) { + for (StreamDataBlock streamDataBlock : compactedObject.streamDataBlocks()) { + streamDataBlockMap.computeIfAbsent(streamDataBlock.getObjectId(), k -> new ArrayList<>()).add(streamDataBlock); + } + } + for (List dataBlocks : streamDataBlockMap.values()) { + dataBlocks.sort(StreamDataBlock.BLOCK_POSITION_COMPARATOR); + } + + return new CompactionPlan(new ArrayList<>(compactedObjects), streamDataBlockMap); + } + + public List buildCompactedObjects(List objects) { + List streamDataBlocks = blockWaitObjectIndices(objects); + if (!shouldCompact(streamDataBlocks)) { + return new ArrayList<>(); + } + return compactObjects(sortStreamRangePositions(streamDataBlocks)); + } + + List blockWaitObjectIndices(List objectMetadataList) { + Map s3WALObjectMap = objectMetadataList.stream() + .collect(Collectors.toMap(e -> e.getWalObject().objectId(), S3WALObjectMetadata::getWalObject)); + List>> objectStreamRangePositionFutures = new ArrayList<>(); + for (S3WALObjectMetadata walObjectMetadata : objectMetadataList) { + DataBlockReader dataBlockReader = new DataBlockReader(walObjectMetadata.getObjectMetadata(), s3Operator); + dataBlockReader.parseDataBlockIndex(); + objectStreamRangePositionFutures.add(dataBlockReader.getDataBlockIndex()); + } + return objectStreamRangePositionFutures.stream().flatMap(f -> { + try { + List streamDataBlocks = f.join(); + List validStreamDataBlocks = new ArrayList<>(); + S3WALObject s3WALObject = s3WALObjectMap.get(streamDataBlocks.get(0).getObjectId()); + // filter out invalid stream data blocks in case metadata is inconsistent with S3 index block + for (StreamDataBlock streamDataBlock : streamDataBlocks) { + if (s3WALObject.intersect(streamDataBlock.getStreamId(), streamDataBlock.getStartOffset(), streamDataBlock.getEndOffset())) { + validStreamDataBlocks.add(streamDataBlock); + } + } + return validStreamDataBlocks.stream(); + } catch (Exception ex) { + // continue compaction without invalid object + LOGGER.error("Read on invalid object ", ex); + return null; + } + }).collect(Collectors.toList()); + } + + private List compactObjects(List streamDataBlocks) { + List compactedObjectBuilders = new ArrayList<>(); + CompactedObjectBuilder builder = new CompactedObjectBuilder(); + for (StreamDataBlock streamDataBlock : streamDataBlocks) { + if (builder.lastStreamId() == -1L) { + // init state + builder.addStreamDataBlock(streamDataBlock); + } else if (builder.lastStreamId() == streamDataBlock.getStreamId()) { + // data range from same stream + if (streamDataBlock.getStartOffset() > builder.lastOffset()) { + // data range is not continuous, split current object as StreamObject + builder = splitObject(builder, compactedObjectBuilders); + builder.addStreamDataBlock(streamDataBlock); + } else if (streamDataBlock.getStartOffset() == builder.lastOffset()) { + builder.addStreamDataBlock(streamDataBlock); + } else { + // should not go there + LOGGER.error("FATAL ERROR: illegal stream range position, last offset: {}, curr: {}", + builder.lastOffset(), streamDataBlock); + return new ArrayList<>(); + } + } else { + builder = splitAndAddBlock(builder, streamDataBlock, compactedObjectBuilders); + } + } + compactedObjectBuilders.add(builder); + return compactedObjectBuilders; + } + + boolean shouldCompact(List streamDataBlocks) { + // do compact if there is any stream with data placed in more than one WAL objects + Map streamIdToDistinctObjectMap = streamDataBlocks.stream() + .collect(Collectors.groupingBy(StreamDataBlock::getStreamId, Collectors.mapping(StreamDataBlock::getObjectId, Collectors.toSet()))) + .entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().size())); + return streamIdToDistinctObjectMap.values().stream().filter(v -> v > 1).count() > 1; + } + + private CompactedObjectBuilder splitAndAddBlock(CompactedObjectBuilder builder, + StreamDataBlock streamDataBlock, + List compactedObjectBuilders) { + if (builder.currStreamBlockSize() > streamSplitSize) { + builder = splitObject(builder, compactedObjectBuilders); + } + builder.addStreamDataBlock(streamDataBlock); + return builder; + } + + private CompactedObjectBuilder splitObject(CompactedObjectBuilder builder, + List compactedObjectBuilders) { + CompactedObjectBuilder splitBuilder = builder.splitCurrentStream(); + splitBuilder.setType(CompactionType.SPLIT); + if (builder.totalBlockSize() != 0) { + compactedObjectBuilders.add(builder); + } + compactedObjectBuilders.add(splitBuilder); + builder = new CompactedObjectBuilder(); + return builder; + } + + List sortStreamRangePositions(List streamDataBlocks) { + //TODO: use merge sort + Map> sortedStreamObjectMap = new TreeMap<>(); + for (StreamDataBlock streamDataBlock : streamDataBlocks) { + sortedStreamObjectMap.computeIfAbsent(streamDataBlock.getStreamId(), k -> new ArrayList<>()).add(streamDataBlock); + } + return sortedStreamObjectMap.values().stream().flatMap(list -> { + list.sort(StreamDataBlock.STREAM_OFFSET_COMPARATOR); + return list.stream(); + }).collect(Collectors.toList()); + } + +} diff --git a/core/src/main/scala/kafka/log/s3/compact/CompactionManager.java b/core/src/main/scala/kafka/log/s3/compact/CompactionManager.java new file mode 100644 index 0000000000..8df10adc24 --- /dev/null +++ b/core/src/main/scala/kafka/log/s3/compact/CompactionManager.java @@ -0,0 +1,203 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.log.s3.compact; + +import io.netty.util.concurrent.DefaultThreadFactory; +import kafka.log.s3.compact.objects.CompactedObject; +import kafka.log.s3.compact.objects.CompactionType; +import kafka.log.s3.compact.objects.StreamDataBlock; +import kafka.log.s3.compact.operator.DataBlockReader; +import kafka.log.s3.metadata.StreamMetadataManager; +import kafka.log.s3.objects.CommitWALObjectRequest; +import kafka.log.s3.objects.ObjectManager; +import kafka.log.s3.objects.ObjectStreamRange; +import kafka.log.s3.objects.StreamObject; +import kafka.log.s3.operator.S3Operator; +import kafka.server.KafkaConfig; +import org.apache.kafka.metadata.stream.S3ObjectMetadata; +import org.apache.kafka.metadata.stream.S3WALObjectMetadata; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +public class CompactionManager { + private final static Logger LOGGER = LoggerFactory.getLogger(CompactionManager.class); + private final ObjectManager objectManager; + private final StreamMetadataManager streamMetadataManager; + private final S3Operator s3Operator; + private final CompactionAnalyzer compactionAnalyzer; + private final ScheduledExecutorService executorService; + private final CompactionUploader uploader; + private final long compactionCacheSize; + private final double executionScoreThreshold; + private final long streamSplitSize; + private final int compactionInterval; + private final int forceSplitObjectPeriod; + private final TokenBucketThrottle networkInThrottle; + + public CompactionManager(KafkaConfig config, ObjectManager objectManager, StreamMetadataManager streamMetadataManager, S3Operator s3Operator) { + this.objectManager = objectManager; + this.streamMetadataManager = streamMetadataManager; + this.s3Operator = s3Operator; + this.compactionInterval = config.s3ObjectCompactionInterval(); + this.compactionCacheSize = config.s3ObjectCompactionCacheSize(); + this.executionScoreThreshold = config.s3ObjectCompactionExecutionScoreThreshold(); + this.streamSplitSize = config.s3ObjectCompactionStreamSplitSize(); + this.forceSplitObjectPeriod = config.s3ObjectCompactionForceSplitPeriod(); + this.networkInThrottle = new TokenBucketThrottle(config.s3ObjectCompactionNWInBandwidth()); + this.uploader = new CompactionUploader(objectManager, s3Operator, config); + this.compactionAnalyzer = new CompactionAnalyzer(compactionCacheSize, executionScoreThreshold, streamSplitSize, s3Operator); + this.executorService = Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("object-compaction-manager")); + } + + public void start() { + this.executorService.scheduleWithFixedDelay(() -> { + long start = System.currentTimeMillis(); + this.compact() + .thenAccept(result -> LOGGER.info("Compaction complete, cost {} ms, result {}", + System.currentTimeMillis() - start, result)) + .exceptionally(ex -> { + LOGGER.error("Compaction failed, cost {} ms, ", System.currentTimeMillis() - start, ex); + return null; + }); + }, 0, this.compactionInterval, TimeUnit.MINUTES); + } + + public void shutdown() { + this.executorService.shutdown(); + this.networkInThrottle.stop(); + this.uploader.stop(); + } + + public CompletableFuture compact() { + List s3ObjectMetadata = this.streamMetadataManager.getWALObjects(); + Map> objectMetadataFilterMap = s3ObjectMetadata.stream() + .collect(Collectors.partitioningBy(e -> (System.currentTimeMillis() - e.getWalObject().dataTimeInMs()) + >= this.forceSplitObjectPeriod)); + // force split objects that exists for too long + splitWALObjects(objectMetadataFilterMap.get(true)).thenAccept(v -> LOGGER.info("Force split {} objects", + objectMetadataFilterMap.get(true).size())); + + try { + List compactionPlans = this.compactionAnalyzer.analyze(objectMetadataFilterMap.get(false)); + if (compactionPlans.isEmpty()) { + return CompletableFuture.completedFuture(CompactResult.SKIPPED); + } + CommitWALObjectRequest request = buildCompactRequest(compactionPlans, s3ObjectMetadata); + return objectManager.commitWALObject(request).thenApply(nil -> { + LOGGER.info("Commit compact request succeed, WAL object id: {}, size: {}, stream object num: {}", + request.getObjectId(), request.getObjectSize(), request.getStreamObjects().size()); + return CompactResult.SUCCESS; + }); + } catch (Exception e) { + LOGGER.error("Error while compaction objects", e); + return CompletableFuture.failedFuture(e); + } + + } + + public CompletableFuture forceSplitAll() { + CompletableFuture cf = new CompletableFuture<>(); + //TODO: deal with metadata delay + this.executorService.execute(() -> splitWALObjects(this.streamMetadataManager.getWALObjects()) + .thenAccept(v -> cf.complete(null)) + .exceptionally(ex -> { + cf.completeExceptionally(ex); + return null; + }) + ); + return cf; + } + + CompletableFuture splitWALObjects(List objectMetadataList) { + return CompletableFuture.completedFuture(null); + } + + CommitWALObjectRequest buildCompactRequest(List compactionPlans, List s3ObjectMetadata) + throws IllegalArgumentException { + CommitWALObjectRequest request = new CommitWALObjectRequest(); + Map s3ObjectMetadataMap = s3ObjectMetadata.stream() + .collect(Collectors.toMap(e -> e.getWalObject().objectId(), e -> e)); + for (CompactionPlan compactionPlan : compactionPlans) { + // iterate over each compaction plan + for (Map.Entry> streamDataBlocEntry : compactionPlan.streamDataBlocksMap().entrySet()) { + S3ObjectMetadata metadata = s3ObjectMetadataMap.get(streamDataBlocEntry.getKey()).getObjectMetadata(); + List streamDataBlocks = streamDataBlocEntry.getValue(); + List blockIndices = CompactionUtils.buildBlockIndicesFromStreamDataBlock(streamDataBlocks); + networkInThrottle.throttle(streamDataBlocks.stream().mapToLong(StreamDataBlock::getBlockSize).sum()); + DataBlockReader reader = new DataBlockReader(metadata, s3Operator); + reader.readBlocks(blockIndices).thenAccept(dataBlocks -> { + for (int i = 0; i < blockIndices.size(); i++) { + StreamDataBlock streamDataBlock = streamDataBlocks.get(i); + streamDataBlock.getDataCf().complete(dataBlocks.get(i).buffer()); + } + }).exceptionally(ex -> { + LOGGER.error("read on invalid object {}, ex ", metadata.key(), ex); + for (int i = 0; i < blockIndices.size(); i++) { + StreamDataBlock streamDataBlock = streamDataBlocks.get(i); + streamDataBlock.getDataCf().completeExceptionally(ex); + } + return null; + }); + } + List> streamObjectCFList = new ArrayList<>(); + CompletableFuture> walObjectCF = null; + List objectStreamRanges = new ArrayList<>(); + for (CompactedObject compactedObject : compactionPlan.compactedObjects()) { + if (compactedObject.type() == CompactionType.COMPACT) { + objectStreamRanges = CompactionUtils.buildObjectStreamRange(compactedObject); + walObjectCF = uploader.writeWALObject(compactedObject); + } else { + streamObjectCFList.add(uploader.writeStreamObject(compactedObject)); + } + } + // wait for all stream objects and wal object part to be uploaded + try { + if (walObjectCF != null) { + // wait for all blocks to be uploaded or added to waiting list + CompletableFuture writeObjectCF = walObjectCF.join(); + // force upload all blocks still in waiting list + uploader.forceUploadWAL(); + // wait for all blocks to be uploaded + writeObjectCF.join(); + objectStreamRanges.forEach(request::addStreamRange); + } + streamObjectCFList.stream().map(CompletableFuture::join).forEach(request::addStreamObject); + } catch (Exception ex) { + LOGGER.error("Error while uploading compaction objects", ex); + uploader.reset(); + throw new IllegalArgumentException("Error while uploading compaction objects", ex); + } +// compactionPlan.streamDataBlocksMap().values().forEach(e -> e.forEach(StreamDataBlock::free)); + } + request.setObjectId(uploader.getWALObjectId()); + // set wal object id to be the first object id of compacted objects + request.setOrderId(s3ObjectMetadata.get(0).getObjectMetadata().objectId()); + request.setCompactedObjectIds(s3ObjectMetadata.stream().map(s -> s.getObjectMetadata().objectId()).collect(Collectors.toList())); + request.setObjectSize(uploader.completeWAL()); + uploader.reset(); + return request; + } +} diff --git a/core/src/main/scala/kafka/log/s3/compact/CompactionPlan.java b/core/src/main/scala/kafka/log/s3/compact/CompactionPlan.java new file mode 100644 index 0000000000..38ad0763a5 --- /dev/null +++ b/core/src/main/scala/kafka/log/s3/compact/CompactionPlan.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.log.s3.compact; + +import kafka.log.s3.compact.objects.CompactedObject; +import kafka.log.s3.compact.objects.StreamDataBlock; + +import java.util.List; +import java.util.Map; + +public class CompactionPlan { + private final List compactedObjects; + private final Map> streamDataBlocksMap; + + public CompactionPlan(List compactedObjects, + Map> streamDataBlocksMap) { + this.compactedObjects = compactedObjects; + this.streamDataBlocksMap = streamDataBlocksMap; + } + + public List compactedObjects() { + return compactedObjects; + } + + public Map> streamDataBlocksMap() { + return streamDataBlocksMap; + } +} diff --git a/core/src/main/scala/kafka/log/s3/compact/CompactionUploader.java b/core/src/main/scala/kafka/log/s3/compact/CompactionUploader.java new file mode 100644 index 0000000000..6c913d1d37 --- /dev/null +++ b/core/src/main/scala/kafka/log/s3/compact/CompactionUploader.java @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.log.s3.compact; + +import kafka.log.s3.compact.objects.CompactedObject; +import kafka.log.s3.compact.objects.CompactionType; +import kafka.log.s3.compact.objects.StreamDataBlock; +import kafka.log.s3.compact.operator.DataBlockWriter; +import kafka.log.s3.objects.ObjectManager; +import kafka.log.s3.objects.StreamObject; +import kafka.log.s3.operator.S3Operator; +import kafka.server.KafkaConfig; +import org.apache.kafka.common.utils.ThreadUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +public class CompactionUploader { + private final static Logger LOGGER = LoggerFactory.getLogger(CompactionUploader.class); + private final ObjectManager objectManager; + private final TokenBucketThrottle throttle; + private final ExecutorService executorService; + private final S3Operator s3Operator; + private final KafkaConfig kafkaConfig; + private CompletableFuture walObjectIdCf = null; + private DataBlockWriter walObjectWriter = null; + + // TODO: add network outbound throttle + public CompactionUploader(ObjectManager objectManager, S3Operator s3Operator, KafkaConfig kafkaConfig) { + this.objectManager = objectManager; + this.s3Operator = s3Operator; + this.kafkaConfig = kafkaConfig; + this.throttle = new TokenBucketThrottle(kafkaConfig.s3ObjectCompactionNWOutBandwidth()); + this.executorService = Executors.newFixedThreadPool(kafkaConfig.s3ObjectCompactionUploadConcurrency(), + ThreadUtils.createThreadFactory("compaction-uploader-%d", true)); + } + + public void stop() { + this.executorService.shutdown(); + this.throttle.stop(); + } + + public CompletableFuture> writeWALObject(CompactedObject compactedObject) { + if (compactedObject.type() != CompactionType.COMPACT) { + return CompletableFuture.failedFuture(new IllegalArgumentException("wrong compacted object type, expected COMPACT")); + } + return CompletableFuture.allOf(compactedObject.streamDataBlocks() + .stream() + .map(StreamDataBlock::getDataCf) + .toArray(CompletableFuture[]::new)) + .thenComposeAsync(v -> prepareObjectAndWrite(compactedObject), executorService) + .exceptionally(ex -> { + LOGGER.error("wal object write failed", ex); + return null; + }); + } + + private CompletableFuture> prepareObjectAndWrite(CompactedObject compactedObject) { + // no race condition, only one thread at a time will request for wal object id + if (walObjectIdCf == null) { + walObjectIdCf = this.objectManager.prepareObject(1, TimeUnit.MINUTES.toMillis(30)); + } + return walObjectIdCf.thenApplyAsync(objectId -> { + if (walObjectWriter == null) { + walObjectWriter = new DataBlockWriter(objectId, s3Operator, kafkaConfig.s3ObjectPartSize()); + } + List> writeFutureList = new ArrayList<>(); + for (StreamDataBlock streamDataBlock : compactedObject.streamDataBlocks()) { + writeFutureList.add(walObjectWriter.write(streamDataBlock)); + } + return CompletableFuture.allOf(writeFutureList.toArray(new CompletableFuture[0])); + }, executorService).exceptionally(ex -> { + LOGGER.error("prepare and write wal object failed", ex); + return null; + }); + } + + public CompletableFuture writeStreamObject(CompactedObject compactedObject) { + if (compactedObject.type() != CompactionType.SPLIT) { + return CompletableFuture.failedFuture(new IllegalArgumentException("wrong compacted object type, expected SPLIT")); + } + return CompletableFuture.allOf(compactedObject.streamDataBlocks() + .stream() + .map(StreamDataBlock::getDataCf) + .toArray(CompletableFuture[]::new)) + .thenComposeAsync(v -> objectManager.prepareObject(1, TimeUnit.MINUTES.toMillis(30)) + .thenComposeAsync(objectId -> { + DataBlockWriter dataBlockWriter = new DataBlockWriter(objectId, s3Operator, kafkaConfig.s3ObjectPartSize()); + for (StreamDataBlock streamDataBlock : compactedObject.streamDataBlocks()) { + dataBlockWriter.write(streamDataBlock); + } + long streamId = compactedObject.streamDataBlocks().get(0).getStreamId(); + long startOffset = compactedObject.streamDataBlocks().get(0).getStartOffset(); + long endOffset = compactedObject.streamDataBlocks().get(compactedObject.streamDataBlocks().size() - 1).getEndOffset(); + StreamObject streamObject = new StreamObject(); + streamObject.setObjectId(objectId); + streamObject.setStreamId(streamId); + streamObject.setStartOffset(startOffset); + streamObject.setEndOffset(endOffset); + return dataBlockWriter.close().thenApply(nil -> { + streamObject.setObjectSize(dataBlockWriter.size()); + return streamObject; + }); + }, executorService), + executorService) + .exceptionally(ex -> { + LOGGER.error("stream object write failed", ex); + return null; + }); + } + + public void forceUploadWAL() { + if (walObjectWriter == null) { + return; + } + walObjectWriter.uploadWaitingList(); + } + + public long completeWAL() { + if (walObjectWriter == null) { + return 0L; + } + walObjectWriter.close().join(); + return walObjectWriter.size(); + } + + public void reset() { + walObjectIdCf = null; + walObjectWriter = null; + } + + public long getWALObjectId() { + if (walObjectIdCf == null) { + return -1; + } + return walObjectIdCf.getNow(-1L); + } +} diff --git a/core/src/main/scala/kafka/log/s3/compact/CompactionUtils.java b/core/src/main/scala/kafka/log/s3/compact/CompactionUtils.java new file mode 100644 index 0000000000..0b4e5a1e63 --- /dev/null +++ b/core/src/main/scala/kafka/log/s3/compact/CompactionUtils.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.log.s3.compact; + +import kafka.log.s3.compact.objects.CompactedObject; +import kafka.log.s3.compact.objects.StreamDataBlock; +import kafka.log.s3.compact.operator.DataBlockReader; +import kafka.log.s3.objects.ObjectStreamRange; + +import java.util.ArrayList; +import java.util.List; + +public class CompactionUtils { + public static List buildObjectStreamRange(CompactedObject compactedObject) { + List objectStreamRanges = new ArrayList<>(); + ObjectStreamRange currObjectStreamRange = null; + for (StreamDataBlock streamDataBlock : compactedObject.streamDataBlocks()) { + if (currObjectStreamRange == null) { + currObjectStreamRange = new ObjectStreamRange(streamDataBlock.getStreamId(), -1L, + streamDataBlock.getStartOffset(), streamDataBlock.getEndOffset()); + } else { + if (currObjectStreamRange.getStreamId() == streamDataBlock.getStreamId()) { + currObjectStreamRange.setEndOffset(streamDataBlock.getEndOffset()); + } else { + objectStreamRanges.add(currObjectStreamRange); + currObjectStreamRange = new ObjectStreamRange(streamDataBlock.getStreamId(), -1L, + streamDataBlock.getStartOffset(), streamDataBlock.getEndOffset()); + } + } + } + objectStreamRanges.add(currObjectStreamRange); + return objectStreamRanges; + } + + public static List buildBlockIndicesFromStreamDataBlock(List streamDataBlocks) { + List blockIndices = new ArrayList<>(); + for (StreamDataBlock streamDataBlock : streamDataBlocks) { + blockIndices.add(new DataBlockReader.DataBlockIndex(streamDataBlock.getBlockId(), streamDataBlock.getBlockPosition(), + streamDataBlock.getBlockSize(), streamDataBlock.getRecordCount())); + } + return blockIndices; + } +} diff --git a/core/src/main/scala/kafka/log/s3/compact/TokenBucketThrottle.java b/core/src/main/scala/kafka/log/s3/compact/TokenBucketThrottle.java new file mode 100644 index 0000000000..c445b91b28 --- /dev/null +++ b/core/src/main/scala/kafka/log/s3/compact/TokenBucketThrottle.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.log.s3.compact; + +import io.netty.util.concurrent.DefaultThreadFactory; + +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +public class TokenBucketThrottle { + private final Lock lock = new ReentrantLock(); + private final Condition condition = lock.newCondition(); + private final long tokenSize; + private long availableTokens; + + private final ScheduledExecutorService executorService; + + public TokenBucketThrottle(long tokenSize) { + this.tokenSize = tokenSize; + this.executorService = Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("token-bucket-throttle")); + this.executorService.scheduleAtFixedRate(() -> { + try { + lock.lock(); + availableTokens = tokenSize; + condition.signalAll(); + } finally { + lock.unlock(); + } + }, 0, 1, java.util.concurrent.TimeUnit.SECONDS); + } + + public void stop() { + this.executorService.shutdown(); + } + + public long getTokenSize() { + return tokenSize; + } + + public void throttle(long size) { + try { + lock.lock(); + while (availableTokens < size) { + condition.await(); + } + availableTokens -= size; + } catch (InterruptedException ignored) { + } finally { + lock.unlock(); + } + } +} diff --git a/core/src/main/scala/kafka/log/s3/compact/objects/CompactedObject.java b/core/src/main/scala/kafka/log/s3/compact/objects/CompactedObject.java new file mode 100644 index 0000000000..990ea5c3d6 --- /dev/null +++ b/core/src/main/scala/kafka/log/s3/compact/objects/CompactedObject.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.log.s3.compact.objects; + +import java.util.List; + +public class CompactedObject { + private final CompactionType type; + private final List streamDataBlocks; + private final long size; + + public CompactedObject(CompactionType type, List streamDataBlocks, long size) { + this.type = type; + this.streamDataBlocks = streamDataBlocks; + this.size = size; + } + + public CompactionType type() { + return type; + } + + public List streamDataBlocks() { + return this.streamDataBlocks; + } + + public long size() { + return size; + } +} diff --git a/core/src/main/scala/kafka/log/s3/compact/objects/CompactedObjectBuilder.java b/core/src/main/scala/kafka/log/s3/compact/objects/CompactedObjectBuilder.java new file mode 100644 index 0000000000..15588fce1e --- /dev/null +++ b/core/src/main/scala/kafka/log/s3/compact/objects/CompactedObjectBuilder.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.log.s3.compact.objects; + +import java.util.ArrayList; +import java.util.List; + +public class CompactedObjectBuilder { + private CompactionType type; + private final List streamDataBlocks; + private int currStreamIndexHead; + private int currStreamIndexTail; + + public CompactedObjectBuilder() { + this.type = CompactionType.COMPACT; + this.streamDataBlocks = new ArrayList<>(); + this.currStreamIndexHead = -1; + this.currStreamIndexTail = -1; + } + + public CompactedObjectBuilder splitCurrentStream() { + return split(currStreamIndexHead, currStreamIndexTail); + } + + public CompactedObjectBuilder split(int start, int end) { + if (start < 0 || end > currStreamIndexTail) { + // split out of range + return new CompactedObjectBuilder(); + } + CompactedObjectBuilder builder = new CompactedObjectBuilder(); + List streamRangePositionsSubList = streamDataBlocks.subList(start, end); + for (StreamDataBlock streamRangePosition : streamRangePositionsSubList) { + builder.addStreamDataBlock(streamRangePosition); + } + builder.setType(type); + streamRangePositionsSubList.clear(); + resetCurrStreamPosition(); + return builder; + } + + private void resetCurrStreamPosition() { + currStreamIndexHead = -1; + currStreamIndexTail = -1; + long currStreamId = -1; + for (int i = 0; i < streamDataBlocks.size(); i++) { + StreamDataBlock streamDataBlock = streamDataBlocks.get(i); + if (currStreamId != streamDataBlock.getStreamId()) { + currStreamId = streamDataBlock.getStreamId(); + currStreamIndexHead = i; + } + currStreamIndexTail = i + 1; + } + } + + private List getCurrentStreamRangePositions() { + if (currStreamIndexHead == -1 || currStreamIndexTail == -1) { + return new ArrayList<>(); + } + return streamDataBlocks.subList(currStreamIndexHead, currStreamIndexTail); + } + + public CompactedObjectBuilder setType(CompactionType type) { + this.type = type; + return this; + } + + public CompactionType type() { + return this.type; + } + + public long lastStreamId() { + if (streamDataBlocks.isEmpty()) { + return -1; + } + return streamDataBlocks.get(streamDataBlocks.size() - 1).getStreamId(); + } + + public long lastOffset() { + if (streamDataBlocks.isEmpty()) { + return -1; + } + return streamDataBlocks.get(streamDataBlocks.size() - 1).getEndOffset(); + } + + public CompactedObjectBuilder addStreamDataBlock(StreamDataBlock streamDataBlock) { + if (streamDataBlock.getStreamId() != lastStreamId()) { + this.currStreamIndexHead = this.streamDataBlocks.size(); + } + this.streamDataBlocks.add(streamDataBlock); + this.currStreamIndexTail = this.streamDataBlocks.size(); + return this; + } + + public List streamDataBlocks() { + return this.streamDataBlocks; + } + + public long currStreamBlockSize() { + return getCurrentStreamRangePositions().stream().mapToLong(StreamDataBlock::getBlockSize).sum(); + } + + public long totalBlockSize() { + return streamDataBlocks.stream().mapToLong(StreamDataBlock::getBlockSize).sum(); + } + + public void merge(CompactedObjectBuilder other) { + if (other.type == CompactionType.SPLIT) { + // cannot merge compacted object of split type as split strategy is determined when constructing compacted objects + return; + } + this.streamDataBlocks.addAll(other.streamDataBlocks); + resetCurrStreamPosition(); + } + + public CompactedObject build() { + return new CompactedObject(type, streamDataBlocks, totalBlockSize()); + } +} diff --git a/core/src/main/scala/kafka/log/s3/compact/objects/CompactionType.java b/core/src/main/scala/kafka/log/s3/compact/objects/CompactionType.java new file mode 100644 index 0000000000..a6312990cd --- /dev/null +++ b/core/src/main/scala/kafka/log/s3/compact/objects/CompactionType.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.log.s3.compact.objects; + +public enum CompactionType { + COMPACT, + SPLIT, +} diff --git a/core/src/main/scala/kafka/log/s3/compact/objects/StreamDataBlock.java b/core/src/main/scala/kafka/log/s3/compact/objects/StreamDataBlock.java new file mode 100644 index 0000000000..076bb42d40 --- /dev/null +++ b/core/src/main/scala/kafka/log/s3/compact/objects/StreamDataBlock.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.log.s3.compact.objects; + +import io.netty.buffer.ByteBuf; + +import java.util.Comparator; +import java.util.Objects; +import java.util.concurrent.CompletableFuture; + +public class StreamDataBlock { + public static final Comparator STREAM_OFFSET_COMPARATOR = Comparator.comparingLong(StreamDataBlock::getStartOffset); + public static final Comparator BLOCK_POSITION_COMPARATOR = Comparator.comparingLong(StreamDataBlock::getBlockPosition); + + // Stream attributes + private final long streamId; + private final long startOffset; + private final long endOffset; + private final int blockId; + + // Object attributes + private final long objectId; + private final long blockPosition; + private final int blockSize; + private final int recordCount; + private final CompletableFuture dataCf = new CompletableFuture<>(); + + public StreamDataBlock(long streamId, long startOffset, long endOffset, int blockId, + long objectId, long blockPosition, int blockSize, int recordCount) { + this.streamId = streamId; + this.startOffset = startOffset; + this.endOffset = endOffset; + this.blockId = blockId; + this.objectId = objectId; + this.blockPosition = blockPosition; + this.blockSize = blockSize; + this.recordCount = recordCount; + } + + public long getStreamId() { + return streamId; + } + + public long getStartOffset() { + return startOffset; + } + + public long getEndOffset() { + return endOffset; + } + + public long getStreamRangeSize() { + return endOffset - startOffset; + } + + public int getBlockId() { + return blockId; + } + + public long getObjectId() { + return objectId; + } + + public long getBlockPosition() { + return blockPosition; + } + + public int getBlockSize() { + return blockSize; + } + + public int getRecordCount() { + return recordCount; + } + + public CompletableFuture getDataCf() { + return this.dataCf; + } + + public void free() { + this.dataCf.thenAccept(buf -> { + if (buf != null) { + buf.release(); + } + }); + } + + @Override + public String toString() { + return "ObjectStreamRangePosition{" + + "streamId=" + streamId + + ", startOffset=" + startOffset + + ", endOffset=" + endOffset + + ", objectId=" + objectId + + ", blockPosition=" + blockPosition + + ", blockSize=" + blockSize + + '}'; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + StreamDataBlock that = (StreamDataBlock) o; + return streamId == that.streamId && startOffset == that.startOffset && endOffset == that.endOffset + && blockId == that.blockId && objectId == that.objectId && blockPosition == that.blockPosition + && blockSize == that.blockSize && recordCount == that.recordCount; + } + + @Override + public int hashCode() { + return Objects.hash(streamId, startOffset, endOffset, blockId, objectId, blockPosition, blockSize, recordCount); + } + +} \ No newline at end of file diff --git a/core/src/main/scala/kafka/log/s3/compact/operator/DataBlockReader.java b/core/src/main/scala/kafka/log/s3/compact/operator/DataBlockReader.java new file mode 100644 index 0000000000..3c823bb4a7 --- /dev/null +++ b/core/src/main/scala/kafka/log/s3/compact/operator/DataBlockReader.java @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.log.s3.compact.operator; + +import io.netty.buffer.ByteBuf; +import kafka.log.s3.ObjectReader; +import kafka.log.s3.compact.objects.StreamDataBlock; +import kafka.log.s3.operator.S3Operator; +import org.apache.kafka.metadata.stream.S3ObjectMetadata; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; + +//TODO: refactor to reduce duplicate code with ObjectWriter +public class DataBlockReader { + private static final Logger LOGGER = LoggerFactory.getLogger(ObjectReader.class); + private final S3ObjectMetadata metadata; + private final String objectKey; + private final S3Operator s3Operator; + private final CompletableFuture> indexBlockCf = new CompletableFuture<>(); + + public DataBlockReader(S3ObjectMetadata metadata, S3Operator s3Operator) { + this.metadata = metadata; + this.objectKey = metadata.key(); + this.s3Operator = s3Operator; + } + + public CompletableFuture> getDataBlockIndex() { + return indexBlockCf; + } + + public void parseDataBlockIndex() { + parseDataBlockIndex(Math.max(0, metadata.objectSize() - 1024 * 1024)); + } + + public void parseDataBlockIndex(long startPosition) { + s3Operator.rangeRead(objectKey, startPosition, metadata.objectSize()) + .thenAccept(buf -> { + try { + indexBlockCf.complete(IndexBlock.parse(buf, metadata.objectSize(), metadata.objectId())); + } catch (IndexBlockParseException ex) { + parseDataBlockIndex(ex.indexBlockPosition); + } + }).exceptionally(ex -> { + // unrecoverable error, possibly read on a deleted object + LOGGER.warn("s3 range read from {} [{}, {}) failed, ex", objectKey, startPosition, metadata.objectSize(), ex); + indexBlockCf.completeExceptionally(ex); + return null; + }); + } + + public CompletableFuture> readBlocks(List blockIndices) { + CompletableFuture rangeReadCf = s3Operator.rangeRead(objectKey, blockIndices.get(0).startPosition(), + blockIndices.get(blockIndices.size() - 1).endPosition()); + return rangeReadCf.thenApply(buf -> { + List dataBlocks = new ArrayList<>(); + parseDataBlocks(buf, blockIndices, dataBlocks); + return dataBlocks; + }); + } + + private void parseDataBlocks(ByteBuf buf, List blockIndices, List dataBlocks) { + for (DataBlockIndex blockIndexEntry : blockIndices) { + int blockSize = blockIndexEntry.size; + ByteBuf blockBuf = buf.slice(buf.readerIndex(), blockSize); + buf.skipBytes(blockSize); + dataBlocks.add(new DataBlock(blockBuf, blockIndexEntry.recordCount)); + } + } + + static class IndexBlock { + static List parse(ByteBuf objectTailBuf, long objectSize, long objectId) throws IndexBlockParseException { + long indexBlockPosition = objectTailBuf.getLong(objectTailBuf.readableBytes() - 48); + int indexBlockSize = objectTailBuf.getInt(objectTailBuf.readableBytes() - 40); + if (indexBlockPosition + objectTailBuf.readableBytes() < objectSize) { + throw new IndexBlockParseException(indexBlockPosition); + } else { + int indexRelativePosition = objectTailBuf.readableBytes() - (int) (objectSize - indexBlockPosition); + ByteBuf indexBlockBuf = objectTailBuf.slice(objectTailBuf.readerIndex() + indexRelativePosition, indexBlockSize); + int blockCount = indexBlockBuf.readInt(); + ByteBuf blocks = indexBlockBuf.slice(indexBlockBuf.readerIndex(), blockCount * 16); + List dataBlockIndices = new ArrayList<>(); + for (int i = 0; i < blockCount; i++) { + long blockPosition = blocks.readLong(); + int blockSize = blocks.readInt(); + int recordCount = blocks.readInt(); + dataBlockIndices.add(new DataBlockIndex(i, blockPosition, blockSize, recordCount)); + } + indexBlockBuf.skipBytes(blockCount * 16); + ByteBuf streamRanges = indexBlockBuf.slice(indexBlockBuf.readerIndex(), indexBlockBuf.readableBytes()); + List streamDataBlocks = new ArrayList<>(); + for (int i = 0; i < blockCount; i++) { + long streamId = streamRanges.readLong(); + long startOffset = streamRanges.readLong(); + int rangeSize = streamRanges.readInt(); + int blockIndex = streamRanges.readInt(); + streamDataBlocks.add(new StreamDataBlock(streamId, startOffset, startOffset + rangeSize, blockIndex, + objectId, dataBlockIndices.get(i).startPosition, dataBlockIndices.get(i).size, dataBlockIndices.get(i).recordCount)); + } + return streamDataBlocks; + } + } + + } + + static class IndexBlockParseException extends Exception { + long indexBlockPosition; + + public IndexBlockParseException(long indexBlockPosition) { + this.indexBlockPosition = indexBlockPosition; + } + + } + + + public static class DataBlockIndex { + public static final int BLOCK_INDEX_SIZE = 8 + 4 + 4; + private final int blockId; + private final long startPosition; + private final int size; + private final int recordCount; + + public DataBlockIndex(int blockId, long startPosition, int size, int recordCount) { + this.blockId = blockId; + this.startPosition = startPosition; + this.size = size; + this.recordCount = recordCount; + } + + public int blockId() { + return blockId; + } + + public long startPosition() { + return startPosition; + } + + public long endPosition() { + return startPosition + size; + } + + public int recordCount() { + return recordCount; + } + } + + public static class DataBlock { + private final ByteBuf buf; + private final int recordCount; + + public DataBlock(ByteBuf buf, int recordCount) { + this.buf = buf; + this.recordCount = recordCount; + } + + public ByteBuf buffer() { + return buf; + } + } + +} diff --git a/core/src/main/scala/kafka/log/s3/compact/operator/DataBlockWriter.java b/core/src/main/scala/kafka/log/s3/compact/operator/DataBlockWriter.java new file mode 100644 index 0000000000..c2b222d0fd --- /dev/null +++ b/core/src/main/scala/kafka/log/s3/compact/operator/DataBlockWriter.java @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.log.s3.compact.operator; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.CompositeByteBuf; +import io.netty.buffer.Unpooled; +import kafka.log.s3.compact.objects.StreamDataBlock; +import kafka.log.s3.operator.S3Operator; +import kafka.log.s3.operator.Writer; +import org.apache.kafka.metadata.stream.ObjectUtils; + +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; + +//TODO: refactor to reduce duplicate code with ObjectWriter +public class DataBlockWriter { + private final int partSizeThreshold; + private final List waitingUploadBlocks; + private final Map> waitingUploadBlockCfs; + private final List completedBlocks; + private IndexBlock indexBlock; + private final Writer writer; + private final long objectId; + private long nextDataBlockPosition; + private long size; + + public DataBlockWriter(long objectId, S3Operator s3Operator, int partSizeThreshold) { + this.objectId = objectId; + String objectKey = ObjectUtils.genKey(0, "todocluster", objectId); + this.partSizeThreshold = partSizeThreshold; + waitingUploadBlocks = new LinkedList<>(); + waitingUploadBlockCfs = new ConcurrentHashMap<>(); + completedBlocks = new LinkedList<>(); + writer = s3Operator.writer(objectKey); + } + + public long getObjectId() { + return objectId; + } + + public CompletableFuture write(StreamDataBlock dataBlock) { + CompletableFuture writeCf = new CompletableFuture<>(); + waitingUploadBlockCfs.put(dataBlock, writeCf); + waitingUploadBlocks.add(dataBlock); + long waitingUploadSize = waitingUploadBlocks.stream().mapToLong(StreamDataBlock::getBlockSize).sum(); + if (waitingUploadSize >= partSizeThreshold) { + uploadWaitingList(); + } + return writeCf; + } + + public void uploadWaitingList() { + CompositeByteBuf partBuf = Unpooled.compositeBuffer(); + for (StreamDataBlock block : waitingUploadBlocks) { + partBuf.addComponent(true, block.getDataCf().join()); + completedBlocks.add(block); + nextDataBlockPosition += block.getBlockSize(); + } + List blocks = new LinkedList<>(waitingUploadBlocks); + writer.write(partBuf).thenAccept(v -> { + for (StreamDataBlock block : blocks) { + waitingUploadBlockCfs.get(block).complete(null); + waitingUploadBlockCfs.remove(block); + } + }); + waitingUploadBlocks.clear(); + } + + public CompletableFuture close() { + CompositeByteBuf buf = Unpooled.compositeBuffer(); + for (StreamDataBlock block : waitingUploadBlocks) { + buf.addComponent(true, block.getDataCf().join()); + completedBlocks.add(block); + nextDataBlockPosition += block.getBlockSize(); + } + waitingUploadBlocks.clear(); + indexBlock = new IndexBlock(); + buf.addComponent(true, indexBlock.buffer()); + Footer footer = new Footer(); + buf.addComponent(true, footer.buffer()); + writer.write(buf.duplicate()); + size = indexBlock.position() + indexBlock.size() + footer.size(); + return writer.close(); + } + + public long objectId() { + return objectId; + } + + public long size() { + return size; + } + + class IndexBlock { + private final ByteBuf buf; + private final long position; + + public IndexBlock() { + position = nextDataBlockPosition; + buf = Unpooled.buffer(calculateIndexBlockSize()); + buf.writeInt(completedBlocks.size()); // block count + // block index + for (StreamDataBlock block : completedBlocks) { + buf.writeLong(block.getBlockPosition()); + buf.writeInt(block.getBlockSize()); + buf.writeInt(block.getRecordCount()); + } + // object stream range + for (int blockIndex = 0; blockIndex < completedBlocks.size(); blockIndex++) { + StreamDataBlock block = completedBlocks.get(blockIndex); + buf.writeLong(block.getStreamId()); + buf.writeLong(block.getStartOffset()); + buf.writeInt((int) (block.getEndOffset() - block.getStartOffset())); + buf.writeInt(blockIndex); + } + } + + private int calculateIndexBlockSize() { + return 4 + completedBlocks.size() * 40; + } + + public ByteBuf buffer() { + return buf.duplicate(); + } + + public long position() { + return position; + } + + public int size() { + return buf.readableBytes(); + } + } + + class Footer { + private static final int FOOTER_SIZE = 48; + private static final long MAGIC = 0x88e241b785f4cff7L; + private final ByteBuf buf; + + public Footer() { + buf = Unpooled.buffer(FOOTER_SIZE); + buf.writeLong(indexBlock.position()); + buf.writeInt(indexBlock.size()); + buf.writeZero(40 - 8 - 4); + buf.writeLong(MAGIC); + } + + public ByteBuf buffer() { + return buf.duplicate(); + } + + public int size() { + return FOOTER_SIZE; + } + + } +} diff --git a/core/src/main/scala/kafka/log/s3/exceptions/IndexBlockParseException.java b/core/src/main/scala/kafka/log/s3/exceptions/IndexBlockParseException.java new file mode 100644 index 0000000000..2a257ac8ac --- /dev/null +++ b/core/src/main/scala/kafka/log/s3/exceptions/IndexBlockParseException.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.log.s3.exceptions; + +public class IndexBlockParseException extends Exception { + private final long indexBlockPosition; + + public IndexBlockParseException(long indexBlockPosition) { + this.indexBlockPosition = indexBlockPosition; + } + + public long getIndexBlockPosition() { + return indexBlockPosition; + } +} diff --git a/core/src/main/scala/kafka/log/s3/memory/MemoryMetadataManager.java b/core/src/main/scala/kafka/log/s3/memory/MemoryMetadataManager.java index e20f209568..9ebc2e3960 100644 --- a/core/src/main/scala/kafka/log/s3/memory/MemoryMetadataManager.java +++ b/core/src/main/scala/kafka/log/s3/memory/MemoryMetadataManager.java @@ -51,6 +51,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; import java.util.function.Supplier; import java.util.stream.Collectors; @@ -59,16 +60,16 @@ public class MemoryMetadataManager implements StreamManager, ObjectManager { private static final int MOCK_BROKER_ID = 0; private static final Logger LOGGER = LoggerFactory.getLogger(MemoryMetadataManager.class); private final EventDriver eventDriver; - private volatile long nextAssignedObjectId = 0; + private final AtomicLong nextAssignedObjectId = new AtomicLong(0L); private final Map objectsMetadata; - private volatile long nextAssignedStreamId = 0; + private final AtomicLong nextAssignedStreamId = new AtomicLong(0L); private final Map streamsMetadata; private final Map brokerWALMetadata; private static class MemoryStreamMetadata { private final long streamId; - private StreamState state = StreamState.CLOSED; + private StreamState state; private long epoch = S3StreamConstant.INIT_EPOCH; private long startOffset = S3StreamConstant.INIT_START_OFFSET; private long endOffset = S3StreamConstant.INIT_END_OFFSET; @@ -115,7 +116,7 @@ public MemoryMetadataManager() { public CompletableFuture submitEvent(Supplier eventHandler) { CompletableFuture cb = new CompletableFuture<>(); - MemoryMetadataEvent event = new MemoryMetadataEvent(cb, eventHandler); + MemoryMetadataEvent event = new MemoryMetadataEvent<>(cb, eventHandler); if (!eventDriver.submit(event)) { throw new RuntimeException("Offer event failed"); } @@ -133,13 +134,14 @@ public void start() { @Override public CompletableFuture prepareObject(int count, long ttl) { return this.submitEvent(() -> { - long objectRangeStart = this.nextAssignedObjectId; + List objectIds = new ArrayList<>(); for (int i = 0; i < count; i++) { - long objectId = this.nextAssignedObjectId++; + long objectId = this.nextAssignedObjectId.getAndIncrement(); + objectIds.add(objectId); S3Object object = prepareObject(objectId, ttl); this.objectsMetadata.put(objectId, object); } - return objectRangeStart; + return objectIds.get(0); }); } @@ -168,7 +170,7 @@ public CompletableFuture commitWALObject(CommitWALObjec MemoryBrokerWALMetadata walMetadata = this.brokerWALMetadata.computeIfAbsent(MOCK_BROKER_ID, k -> new MemoryBrokerWALMetadata(k)); Map> index = new HashMap<>(); - streamRanges.stream().forEach(range -> { + streamRanges.forEach(range -> { long streamId = range.getStreamId(); long startOffset = range.getStartOffset(); long endOffset = range.getEndOffset(); @@ -281,7 +283,7 @@ public CompletableFuture> getOpeningStreams() { @Override public CompletableFuture createStream() { return this.submitEvent(() -> { - long streamId = this.nextAssignedStreamId++; + long streamId = this.nextAssignedStreamId.getAndIncrement(); this.streamsMetadata.put(streamId, new MemoryStreamMetadata(streamId)); return streamId; @@ -383,7 +385,7 @@ public EventDriver() { } public void start() { - this.service.submit(this::run); + this.service.submit(this); } public void stop() { diff --git a/core/src/main/scala/kafka/log/s3/metadata/StreamMetadataManager.java b/core/src/main/scala/kafka/log/s3/metadata/StreamMetadataManager.java index 8ed6a4d477..dc221e0d26 100644 --- a/core/src/main/scala/kafka/log/s3/metadata/StreamMetadataManager.java +++ b/core/src/main/scala/kafka/log/s3/metadata/StreamMetadataManager.java @@ -30,6 +30,7 @@ import org.apache.kafka.metadata.stream.S3Object; import org.apache.kafka.metadata.stream.S3ObjectMetadata; import org.apache.kafka.metadata.stream.S3StreamConstant; +import org.apache.kafka.metadata.stream.S3WALObjectMetadata; import org.apache.kafka.metadata.stream.S3StreamObject; import org.apache.kafka.metadata.stream.StreamOffsetRange; import org.apache.kafka.raft.OffsetAndEpoch; @@ -98,6 +99,18 @@ private void onImageChanged(MetadataDelta delta, MetadataImage newImage) { } } + public List getWALObjects() { + synchronized (this) { + return this.streamsImage.getWALObjects(config.brokerId()).stream() + .map(walObject -> { + S3Object s3Object = this.objectsImage.getObjectMetadata(walObject.objectId()); + S3ObjectMetadata metadata = new S3ObjectMetadata(walObject.objectId(), s3Object.getObjectSize(), walObject.objectType()); + return new S3WALObjectMetadata(walObject, metadata); + }) + .collect(Collectors.toList()); + } + } + // must access thread safe private List removePendingTasks() { if (this.pendingGetObjectsTasks == null || this.pendingGetObjectsTasks.isEmpty()) { diff --git a/core/src/main/scala/kafka/log/s3/operator/DefaultS3Operator.java b/core/src/main/scala/kafka/log/s3/operator/DefaultS3Operator.java index d217098a91..c0b78513c8 100644 --- a/core/src/main/scala/kafka/log/s3/operator/DefaultS3Operator.java +++ b/core/src/main/scala/kafka/log/s3/operator/DefaultS3Operator.java @@ -207,7 +207,7 @@ private void init() { } @Override - public void write(ByteBuf part) { + public CompletableFuture write(ByteBuf part) { long start = System.nanoTime(); OBJECT_UPLOAD_SIZE.inc(part.readableBytes()); int partNumber = nextPartNumber.getAndIncrement(); @@ -218,6 +218,7 @@ public void write(ByteBuf part) { PART_UPLOAD_COST.update(System.nanoTime() - start); part.release(); }); + return partCf; } private void write0(String uploadId, int partNumber, ByteBuf part, CompletableFuture partCf) { diff --git a/core/src/main/scala/kafka/log/s3/operator/MemoryS3Operator.java b/core/src/main/scala/kafka/log/s3/operator/MemoryS3Operator.java index 7cc2276bb1..856789008d 100644 --- a/core/src/main/scala/kafka/log/s3/operator/MemoryS3Operator.java +++ b/core/src/main/scala/kafka/log/s3/operator/MemoryS3Operator.java @@ -20,13 +20,14 @@ import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.Unpooled; import kafka.log.es.FutureUtil; +import software.amazon.awssdk.services.s3.model.CompletedPart; -import java.util.HashMap; import java.util.Map; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; public class MemoryS3Operator implements S3Operator { - private final Map storage = new HashMap<>(); + private final Map storage = new ConcurrentHashMap<>(); @Override public void close() { @@ -54,8 +55,9 @@ public Writer writer(String path) { storage.put(path, buf); return new Writer() { @Override - public void write(ByteBuf part) { + public CompletableFuture write(ByteBuf part) { buf.writeBytes(part); + return CompletableFuture.completedFuture(null); } @Override diff --git a/core/src/main/scala/kafka/log/s3/operator/Writer.java b/core/src/main/scala/kafka/log/s3/operator/Writer.java index b311f5e77e..988e35dbde 100644 --- a/core/src/main/scala/kafka/log/s3/operator/Writer.java +++ b/core/src/main/scala/kafka/log/s3/operator/Writer.java @@ -18,6 +18,7 @@ package kafka.log.s3.operator; import io.netty.buffer.ByteBuf; +import software.amazon.awssdk.services.s3.model.CompletedPart; import java.util.concurrent.CompletableFuture; @@ -30,7 +31,7 @@ public interface Writer { * * @param part object part. */ - void write(ByteBuf part); + CompletableFuture write(ByteBuf part); /** * Copy a part of the object. diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 2994d54046..6dc1ae4daf 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -309,6 +309,16 @@ object Defaults { val QuorumLingerMs = RaftConfig.DEFAULT_QUORUM_LINGER_MS val QuorumRequestTimeoutMs = RaftConfig.DEFAULT_QUORUM_REQUEST_TIMEOUT_MS val QuorumRetryBackoffMs = RaftConfig.DEFAULT_QUORUM_RETRY_BACKOFF_MS + + /** ********* Kafka on S3 Configuration *********/ + val S3ObjectCompactionInterval: Int = 20 // 20min + val S3ObjectCompactionCacheSize: Long = 2 * 1024 * 1024 * 1024 // 2GB + val S3ObjectCompactionNWInBandwidth: Long = 50 * 1024 * 1024 // 50MB/s + val S3ObjectCompactionNWOutBandwidth: Long = 50 * 1024 * 1024 // 50MB/s + val S3ObjectCompactionUploadConcurrency: Int = 8 + val S3ObjectCompactionExecutionScoreThreshold: Double = 0.5 + val S3ObjectCompactionStreamSplitSize: Long = 16 * 1024 * 1024 // 16MB + val S3ObjectCompactionForceSplitPeriod: Int = 120 // 120min } object KafkaConfig { @@ -691,6 +701,14 @@ object KafkaConfig { val S3StreamObjectCompactionLivingTimeThresholdProp = "s3.stream.object.compaction.living.time.threshold" val S3ControllerRequestRetryMaxCountProp = "s3.controller.request.retry.max.count" val S3ControllerRequestRetryBaseDelayMsProp = "s3.controller.request.retry.base.delay.ms" + val S3ObjectCompactionIntervalProp = "s3.object.compaction.interval.minutes" + val S3ObjectCompactionCacheSizeProp = "s3.object.compaction.cache.size" + val S3ObjectCompactionNWInBandwidthProp = "s3.object.compaction.network.in.bandwidth" + val S3ObjectCompactionNWOutBandwidthProp = "s3.object.compaction.network.out.bandwidth" + val S3ObjectCompactionUploadConcurrencyProp = "s3.object.compaction.upload.concurrency" + val S3ObjectCompactionExecutionScoreThresholdProp = "s3.object.compaction.execution.score.threshold" + val S3ObjectCompactionStreamSplitSizeProp = "s3.object.compaction.stream.split.size" + val S3ObjectCompactionForceSplitPeriodProp = "s3.object.compaction.force.split.time" val S3EndpointDoc = "The S3 endpoint, ex. https://s3.{region}.amazonaws.com." val S3RegionDoc = "The S3 region, ex. us-east-1." @@ -708,6 +726,14 @@ object KafkaConfig { val S3StreamObjectCompactionLivingTimeThresholdDoc = "The S3 stream object compaction living time threshold in hours." val S3ControllerRequestRetryMaxCountDoc = "The S3 controller request retry max count." val S3ControllerRequestRetryBaseDelayMsDoc = "The S3 controller request retry base delay in milliseconds." + val S3ObjectCompactionIntervalDoc = "The execution interval of S3 object compaction in minutes." + val S3ObjectCompactionCacheSizeDoc = "The S3 object compaction cache size in Bytes." + val S3ObjectCompactionNWInBandwidthDoc = "The S3 object compaction network in bandwidth in Bytes/s." + val S3ObjectCompactionNWOutBandwidthDoc = "The S3 object compaction network out bandwidth in Bytes/s." + val S3ObjectCompactionUploadConcurrencyDoc = "The S3 object compaction upload concurrency." + val S3ObjectCompactionExecutionScoreThresholdDoc = "The S3 object compaction execution score threshold." + val S3ObjectCompactionStreamSplitSizeDoc = "The S3 object compaction stream split size threshold in Bytes." + val S3ObjectCompactionForceSplitPeriodDoc = "The S3 object compaction force split period in minutes." // Kafka on S3 inject end @@ -1523,6 +1549,14 @@ object KafkaConfig { .define(S3StreamObjectCompactionLivingTimeThresholdProp, INT, 1, MEDIUM, S3StreamObjectCompactionLivingTimeThresholdDoc) .define(S3ControllerRequestRetryMaxCountProp, INT, 5, MEDIUM, S3ControllerRequestRetryMaxCountDoc) .define(S3ControllerRequestRetryBaseDelayMsProp, LONG, 500, MEDIUM, S3ControllerRequestRetryBaseDelayMsDoc) + .define(S3ObjectCompactionIntervalProp, INT, Defaults.S3ObjectCompactionInterval, MEDIUM, S3ObjectCompactionIntervalDoc) + .define(S3ObjectCompactionCacheSizeProp, LONG, Defaults.S3ObjectCompactionCacheSize, MEDIUM, S3ObjectCompactionCacheSizeDoc) + .define(S3ObjectCompactionNWInBandwidthProp, LONG, Defaults.S3ObjectCompactionNWInBandwidth, MEDIUM, S3ObjectCompactionNWInBandwidthDoc) + .define(S3ObjectCompactionNWOutBandwidthProp, LONG, Defaults.S3ObjectCompactionNWOutBandwidth, MEDIUM, S3ObjectCompactionNWOutBandwidthDoc) + .define(S3ObjectCompactionUploadConcurrencyProp, INT, Defaults.S3ObjectCompactionUploadConcurrency, MEDIUM, S3ObjectCompactionUploadConcurrencyDoc) + .define(S3ObjectCompactionExecutionScoreThresholdProp, DOUBLE, Defaults.S3ObjectCompactionExecutionScoreThreshold, MEDIUM, S3ObjectCompactionExecutionScoreThresholdDoc) + .define(S3ObjectCompactionStreamSplitSizeProp, LONG, Defaults.S3ObjectCompactionStreamSplitSize, MEDIUM, S3ObjectCompactionStreamSplitSizeDoc) + .define(S3ObjectCompactionForceSplitPeriodProp, INT, Defaults.S3ObjectCompactionForceSplitPeriod, MEDIUM, S3ObjectCompactionForceSplitPeriodDoc) // Kafka on S3 inject end } @@ -2075,6 +2109,14 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami val s3ControllerRequestRetryBaseDelayMs = getLong(KafkaConfig.S3ControllerRequestRetryBaseDelayMsProp) // TODO: ensure incremental epoch => Store epoch in disk, if timestamp flip back, we could use disk epoch to keep the incremental epoch. val brokerEpoch = System.currentTimeMillis() + val s3ObjectCompactionInterval = getInt(KafkaConfig.S3ObjectCompactionIntervalProp) + val s3ObjectCompactionCacheSize = getLong(KafkaConfig.S3ObjectCompactionCacheSizeProp) + val s3ObjectCompactionNWInBandwidth = getLong(KafkaConfig.S3ObjectCompactionNWInBandwidthProp) + val s3ObjectCompactionNWOutBandwidth = getLong(KafkaConfig.S3ObjectCompactionNWInBandwidthProp) + val s3ObjectCompactionUploadConcurrency = getInt(KafkaConfig.S3ObjectCompactionUploadConcurrencyProp) + val s3ObjectCompactionExecutionScoreThreshold = getDouble(KafkaConfig.S3ObjectCompactionExecutionScoreThresholdProp) + val s3ObjectCompactionStreamSplitSize = getLong(KafkaConfig.S3ObjectCompactionStreamSplitSizeProp) + val s3ObjectCompactionForceSplitPeriod = getInt(KafkaConfig.S3ObjectCompactionForceSplitPeriodProp) // Kafka on S3 inject end def addReconfigurable(reconfigurable: Reconfigurable): Unit = { diff --git a/core/src/test/java/kafka/log/s3/StreamMetadataManagerTest.java b/core/src/test/java/kafka/log/s3/StreamMetadataManagerTest.java index 5c895db7d4..5019889eef 100644 --- a/core/src/test/java/kafka/log/s3/StreamMetadataManagerTest.java +++ b/core/src/test/java/kafka/log/s3/StreamMetadataManagerTest.java @@ -20,6 +20,7 @@ import kafka.log.s3.metadata.StreamMetadataManager; import kafka.log.s3.metadata.StreamMetadataManager.StreamMetadataListener; import kafka.server.BrokerServer; +import kafka.server.KafkaConfig; import kafka.server.metadata.BrokerMetadataListener; import kafka.server.metadata.KRaftMetadataCache; import org.apache.kafka.image.BrokerS3WALMetadataImage; @@ -31,9 +32,15 @@ import org.apache.kafka.metadata.stream.InRangeObjects; import org.apache.kafka.metadata.stream.RangeMetadata; import org.apache.kafka.metadata.stream.S3Object; +import org.apache.kafka.metadata.stream.S3ObjectMetadata; import org.apache.kafka.metadata.stream.S3ObjectState; +import org.apache.kafka.metadata.stream.S3ObjectType; import org.apache.kafka.metadata.stream.S3StreamConstant; import org.apache.kafka.metadata.stream.S3StreamObject; +import org.apache.kafka.metadata.stream.S3WALObject; +import org.apache.kafka.metadata.stream.S3WALObjectMetadata; +import org.apache.kafka.metadata.stream.SortedWALObjectsList; +import org.apache.kafka.metadata.stream.StreamOffsetRange; import org.apache.kafka.metadata.stream.StreamState; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Tag; @@ -42,6 +49,7 @@ import org.mockito.Mockito; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; @@ -60,6 +68,7 @@ public class StreamMetadataManagerTest { private static final int BROKER1 = 1; private static final long STREAM0 = 0; private static final long STREAM1 = 1; + private static final long STREAM2 = 2; private BrokerServer mockBroker; private KRaftMetadataCache mockMetadataCache; @@ -79,7 +88,9 @@ public void setUp() { return null; }).when(this.mockBrokerMetadataListener).registerStreamMetadataListener(any()); Mockito.when(this.mockMetadataCache.currentImage()).thenReturn(MetadataImage.EMPTY); - this.manager = new StreamMetadataManager(this.mockBroker, null); + KafkaConfig config = Mockito.mock(KafkaConfig.class); + Mockito.when(config.brokerId()).thenReturn(BROKER0); + this.manager = new StreamMetadataManager(this.mockBroker, config); } private static MetadataImage image0; @@ -99,8 +110,15 @@ public void setUp() { Map streamObjects = Map.of( 0L, new S3StreamObject(0L, STREAM0, 10L, 100L, S3StreamConstant.INVALID_TS)); S3StreamMetadataImage streamImage = new S3StreamMetadataImage(STREAM0, 1L, StreamState.OPENED, 0, 10L, ranges, streamObjects); + + BrokerS3WALMetadataImage walMetadataImage0 = new BrokerS3WALMetadataImage(BROKER0, new SortedWALObjectsList(List.of( + new S3WALObject(1L, BROKER0, Map.of( + STREAM1, List.of(new StreamOffsetRange(STREAM1, 0L, 100L))), 1L), + new S3WALObject(2L, BROKER0, Map.of( + STREAM2, List.of(new StreamOffsetRange(STREAM2, 0L, 100L))), 1L)))); + S3StreamsMetadataImage streamsImage = new S3StreamsMetadataImage(STREAM0, Map.of(STREAM0, streamImage), - Map.of(BROKER0, BrokerS3WALMetadataImage.EMPTY)); + Map.of(BROKER0, walMetadataImage0)); image0 = new MetadataImage(new MetadataProvenance(0, 0, 0), null, null, null, null, null, null, null, streamsImage, objectsImage, null); ranges = new HashMap<>(ranges); @@ -194,4 +212,17 @@ public void testFetch() throws Exception { } + @Test + public void testGetWALObjects() { + this.streamMetadataListener.onChange(null, image0); + List objectMetadata = this.manager.getWALObjects(); + List expected = List.of(new S3ObjectMetadata(1L, 128, S3ObjectType.WAL), + new S3ObjectMetadata(2L, 128, S3ObjectType.WAL)); + // compare objectMetadata with expected + assertEquals(expected.size(), objectMetadata.size()); + for (int i = 0; i < expected.size(); i++) { + assertEquals(expected.get(i), objectMetadata.get(i).getObjectMetadata()); + } + } + } diff --git a/core/src/test/java/kafka/log/s3/compact/CompactionAnalyzerTest.java b/core/src/test/java/kafka/log/s3/compact/CompactionAnalyzerTest.java new file mode 100644 index 0000000000..6ed80b78fe --- /dev/null +++ b/core/src/test/java/kafka/log/s3/compact/CompactionAnalyzerTest.java @@ -0,0 +1,271 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.log.s3.compact; + +import kafka.log.s3.compact.objects.CompactedObject; +import kafka.log.s3.compact.objects.CompactedObjectBuilder; +import kafka.log.s3.compact.objects.CompactionType; +import kafka.log.s3.compact.objects.StreamDataBlock; +import org.apache.kafka.metadata.stream.S3ObjectMetadata; +import org.apache.kafka.metadata.stream.S3ObjectType; +import org.apache.kafka.metadata.stream.S3WALObject; +import org.apache.kafka.metadata.stream.S3WALObjectMetadata; +import org.apache.kafka.metadata.stream.StreamOffsetRange; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +@Timeout(30) +@Tag("S3Unit") +public class CompactionAnalyzerTest extends CompactionTestBase { + + @BeforeEach + public void setUp() throws Exception { + super.setUp(); + } + + @AfterEach + public void tearDown() { + super.tearDown(); + } + + @Test + public void testReadObjectIndices() { + CompactionAnalyzer compactionAnalyzer = new CompactionAnalyzer(CACHE_SIZE, EXECUTION_SCORE_THRESHOLD, STREAM_SPLIT_SIZE, s3Operator); + List streamDataBlocks = compactionAnalyzer.blockWaitObjectIndices(S3_WAL_OBJECT_METADATA_LIST); + List expectedBlocks = List.of( + new StreamDataBlock(STREAM_0, 0, 20, 0, OBJECT_0, -1, -1, 1), + new StreamDataBlock(STREAM_1, 30, 60, 1, OBJECT_0, -1, -1, 1), + new StreamDataBlock(STREAM_2, 30, 60, 2, OBJECT_0, -1, -1, 1), + new StreamDataBlock(STREAM_0, 20, 25, 0, OBJECT_1, -1, -1, 1), + new StreamDataBlock(STREAM_1, 60, 120, 1, OBJECT_1, -1, -1, 1), + new StreamDataBlock(STREAM_1, 400, 500, 0, OBJECT_2, -1, -1, 1), + new StreamDataBlock(STREAM_2, 230, 270, 1, OBJECT_2, -1, -1, 1)); + for (int i = 0; i < streamDataBlocks.size(); i++) { + Assertions.assertTrue(compare(streamDataBlocks.get(i), expectedBlocks.get(i))); + } + } + + @Test + public void testShouldCompact() { + CompactionAnalyzer compactionAnalyzer = new CompactionAnalyzer(CACHE_SIZE, EXECUTION_SCORE_THRESHOLD, STREAM_SPLIT_SIZE, s3Operator); + List streamDataBlocks = compactionAnalyzer.blockWaitObjectIndices(S3_WAL_OBJECT_METADATA_LIST); + Assertions.assertTrue(compactionAnalyzer.shouldCompact(streamDataBlocks)); + } + + @Test + public void testSortStreamRangePositions() { + CompactionAnalyzer compactionAnalyzer = new CompactionAnalyzer(CACHE_SIZE, EXECUTION_SCORE_THRESHOLD, STREAM_SPLIT_SIZE, s3Operator); + List streamDataBlocks = compactionAnalyzer.blockWaitObjectIndices(S3_WAL_OBJECT_METADATA_LIST); + List sortedStreamDataBlocks = compactionAnalyzer.sortStreamRangePositions(streamDataBlocks); + List expectedBlocks = List.of( + new StreamDataBlock(STREAM_0, 0, 20, 0, OBJECT_0, -1, -1, 1), + new StreamDataBlock(STREAM_0, 20, 25, 0, OBJECT_1, -1, -1, 1), + new StreamDataBlock(STREAM_1, 30, 60, 1, OBJECT_0, -1, -1, 1), + new StreamDataBlock(STREAM_1, 60, 120, 1, OBJECT_1, -1, -1, 1), + new StreamDataBlock(STREAM_1, 400, 500, 0, OBJECT_2, -1, -1, 1), + new StreamDataBlock(STREAM_2, 30, 60, 2, OBJECT_0, -1, -1, 1), + new StreamDataBlock(STREAM_2, 230, 270, 1, OBJECT_2, -1, -1, 1)); + for (int i = 0; i < sortedStreamDataBlocks.size(); i++) { + Assertions.assertTrue(compare(sortedStreamDataBlocks.get(i), expectedBlocks.get(i))); + } + } + + @Test + public void testBuildCompactedObject1() { + CompactionAnalyzer compactionAnalyzer = new CompactionAnalyzer(CACHE_SIZE, EXECUTION_SCORE_THRESHOLD, 100, s3Operator); + List compactedObjectBuilders = compactionAnalyzer.buildCompactedObjects(S3_WAL_OBJECT_METADATA_LIST); + List expectedCompactedObject = List.of( + new CompactedObjectBuilder() + .setType(CompactionType.COMPACT) + .addStreamDataBlock(new StreamDataBlock(STREAM_0, 0, 20, 0, OBJECT_0, -1, -1, 1)) + .addStreamDataBlock(new StreamDataBlock(STREAM_0, 20, 25, 0, OBJECT_1, -1, -1, 1)), + new CompactedObjectBuilder() + .setType(CompactionType.SPLIT) + .addStreamDataBlock(new StreamDataBlock(STREAM_1, 30, 60, 1, OBJECT_0, -1, -1, 1)) + .addStreamDataBlock(new StreamDataBlock(STREAM_1, 60, 120, 1, OBJECT_1, -1, -1, 1)), + new CompactedObjectBuilder() + .setType(CompactionType.SPLIT) + .addStreamDataBlock(new StreamDataBlock(STREAM_1, 400, 500, 0, OBJECT_2, -1, -1, 1)), + new CompactedObjectBuilder() + .setType(CompactionType.SPLIT) + .addStreamDataBlock(new StreamDataBlock(STREAM_2, 30, 60, 2, OBJECT_0, -1, -1, 1)), + new CompactedObjectBuilder() + .setType(CompactionType.COMPACT) + .addStreamDataBlock(new StreamDataBlock(STREAM_2, 230, 270, 1, OBJECT_2, -1, -1, 1))); + for (int i = 0; i < compactedObjectBuilders.size(); i++) { + Assertions.assertTrue(compare(compactedObjectBuilders.get(i), expectedCompactedObject.get(i))); + } + } + + @Test + public void testBuildCompactedObject2() { + CompactionAnalyzer compactionAnalyzer = new CompactionAnalyzer(CACHE_SIZE, EXECUTION_SCORE_THRESHOLD, 30, s3Operator); + List compactedObjectBuilders = compactionAnalyzer.buildCompactedObjects(S3_WAL_OBJECT_METADATA_LIST); + List expectedCompactedObject = List.of( + new CompactedObjectBuilder() + .setType(CompactionType.SPLIT) + .addStreamDataBlock(new StreamDataBlock(STREAM_0, 0, 20, 0, OBJECT_0, -1, -1, 1)) + .addStreamDataBlock(new StreamDataBlock(STREAM_0, 20, 25, 0, OBJECT_1, -1, -1, 1)), + new CompactedObjectBuilder() + .setType(CompactionType.SPLIT) + .addStreamDataBlock(new StreamDataBlock(STREAM_1, 30, 60, 1, OBJECT_0, -1, -1, 1)) + .addStreamDataBlock(new StreamDataBlock(STREAM_1, 60, 120, 1, OBJECT_1, -1, -1, 1)), + new CompactedObjectBuilder() + .setType(CompactionType.SPLIT) + .addStreamDataBlock(new StreamDataBlock(STREAM_1, 400, 500, 0, OBJECT_2, -1, -1, 1)), + new CompactedObjectBuilder() + .setType(CompactionType.SPLIT) + .addStreamDataBlock(new StreamDataBlock(STREAM_2, 30, 60, 2, OBJECT_0, -1, -1, 1)), + new CompactedObjectBuilder() + .setType(CompactionType.COMPACT) + .addStreamDataBlock(new StreamDataBlock(STREAM_2, 230, 270, 1, OBJECT_2, -1, -1, 1))); + for (int i = 0; i < compactedObjectBuilders.size(); i++) { + Assertions.assertTrue(compare(compactedObjectBuilders.get(i), expectedCompactedObject.get(i))); + } + } + + @Test + public void testCompactionPlans1() { + CompactionAnalyzer compactionAnalyzer = new CompactionAnalyzer(CACHE_SIZE, EXECUTION_SCORE_THRESHOLD, 100, s3Operator); + List compactionPlans = compactionAnalyzer.analyze(S3_WAL_OBJECT_METADATA_LIST); + Assertions.assertEquals(1, compactionPlans.size()); + List expectCompactedObjects = List.of( + new CompactedObjectBuilder() + .setType(CompactionType.SPLIT) + .addStreamDataBlock(new StreamDataBlock(STREAM_1, 30, 60, 1, OBJECT_0, -1, -1, 1)) + .addStreamDataBlock(new StreamDataBlock(STREAM_1, 60, 120, 1, OBJECT_1, -1, -1, 1)) + .build(), + new CompactedObjectBuilder() + .setType(CompactionType.SPLIT) + .addStreamDataBlock(new StreamDataBlock(STREAM_1, 400, 500, 0, OBJECT_2, -1, -1, 1)) + .build(), + new CompactedObjectBuilder() + .setType(CompactionType.SPLIT) + .addStreamDataBlock(new StreamDataBlock(STREAM_2, 30, 60, 2, OBJECT_0, -1, -1, 1)) + .build(), + new CompactedObjectBuilder() + .setType(CompactionType.COMPACT) + .addStreamDataBlock(new StreamDataBlock(STREAM_0, 0, 20, 0, OBJECT_0, -1, -1, 1)) + .addStreamDataBlock(new StreamDataBlock(STREAM_0, 20, 25, 0, OBJECT_1, -1, -1, 1)) + .addStreamDataBlock(new StreamDataBlock(STREAM_2, 230, 270, 1, OBJECT_2, -1, -1, 1)) + .build()); + Map> expectObjectStreamDataBlocks = Map.of( + OBJECT_0, List.of( + new StreamDataBlock(STREAM_0, 0, 20, 0, OBJECT_0, -1, -1, 1), + new StreamDataBlock(STREAM_1, 30, 60, 1, OBJECT_0, -1, -1, 1), + new StreamDataBlock(STREAM_2, 30, 60, 2, OBJECT_0, -1, -1, 1)), + OBJECT_1, List.of( + new StreamDataBlock(STREAM_0, 20, 25, 0, OBJECT_1, -1, -1, 1), + new StreamDataBlock(STREAM_1, 60, 120, 1, OBJECT_1, -1, -1, 1)), + OBJECT_2, List.of( + new StreamDataBlock(STREAM_1, 400, 500, 0, OBJECT_2, -1, -1, 1), + new StreamDataBlock(STREAM_2, 230, 270, 1, OBJECT_2, -1, -1, 1))); + CompactionPlan compactionPlan = compactionPlans.get(0); + for (int i = 0; i < compactionPlan.compactedObjects().size(); i++) { + Assertions.assertTrue(compare(compactionPlan.compactedObjects().get(i), expectCompactedObjects.get(i))); + } + for (Long objectId : compactionPlan.streamDataBlocksMap().keySet()) { + Assertions.assertTrue(compare(compactionPlan.streamDataBlocksMap().get(objectId), expectObjectStreamDataBlocks.get(objectId))); + } + } + + private void checkCompactionPlan2(List compactionPlans) { + Assertions.assertEquals(2, compactionPlans.size()); + + // first iteration + List expectCompactedObjects = List.of( + new CompactedObjectBuilder() + .setType(CompactionType.SPLIT) + .addStreamDataBlock(new StreamDataBlock(STREAM_1, 30, 60, 1, OBJECT_0, -1, -1, 1)) + .addStreamDataBlock(new StreamDataBlock(STREAM_1, 60, 120, 1, OBJECT_1, -1, -1, 1)) + .build(), + new CompactedObjectBuilder() + .setType(CompactionType.COMPACT) + .addStreamDataBlock(new StreamDataBlock(STREAM_0, 0, 20, 0, OBJECT_0, -1, -1, 1)) + .addStreamDataBlock(new StreamDataBlock(STREAM_0, 20, 25, 0, OBJECT_1, -1, -1, 1)) + .build()); + Map> expectObjectStreamDataBlocks = Map.of( + OBJECT_0, List.of( + new StreamDataBlock(STREAM_0, 0, 20, 0, OBJECT_0, -1, -1, 1), + new StreamDataBlock(STREAM_1, 30, 60, 1, OBJECT_0, -1, -1, 1)), + OBJECT_1, List.of( + new StreamDataBlock(STREAM_0, 20, 25, 0, OBJECT_1, -1, -1, 1), + new StreamDataBlock(STREAM_1, 60, 120, 1, OBJECT_1, -1, -1, 1))); + CompactionPlan compactionPlan = compactionPlans.get(0); + for (int i = 0; i < compactionPlan.compactedObjects().size(); i++) { + Assertions.assertTrue(compare(compactionPlan.compactedObjects().get(i), expectCompactedObjects.get(i))); + } + for (Long objectId : compactionPlan.streamDataBlocksMap().keySet()) { + Assertions.assertTrue(compare(compactionPlan.streamDataBlocksMap().get(objectId), expectObjectStreamDataBlocks.get(objectId))); + } + + // second iteration + expectCompactedObjects = List.of( + new CompactedObjectBuilder() + .setType(CompactionType.SPLIT) + .addStreamDataBlock(new StreamDataBlock(STREAM_1, 400, 500, 0, OBJECT_2, -1, -1, 1)) + .build(), + new CompactedObjectBuilder() + .setType(CompactionType.SPLIT) + .addStreamDataBlock(new StreamDataBlock(STREAM_2, 30, 60, 2, OBJECT_0, -1, -1, 1)) + .build(), + new CompactedObjectBuilder() + .setType(CompactionType.COMPACT) + .addStreamDataBlock(new StreamDataBlock(STREAM_2, 230, 270, 1, OBJECT_2, -1, -1, 1)) + .build()); + expectObjectStreamDataBlocks = Map.of( + OBJECT_0, List.of( + new StreamDataBlock(STREAM_2, 30, 60, 2, OBJECT_0, -1, -1, 1)), + OBJECT_2, List.of( + new StreamDataBlock(STREAM_1, 400, 500, 0, OBJECT_2, -1, -1, 1), + new StreamDataBlock(STREAM_2, 230, 270, 1, OBJECT_2, -1, -1, 1))); + compactionPlan = compactionPlans.get(1); + for (int i = 0; i < compactionPlan.compactedObjects().size(); i++) { + Assertions.assertTrue(compare(compactionPlan.compactedObjects().get(i), expectCompactedObjects.get(i))); + } + for (Long objectId : compactionPlan.streamDataBlocksMap().keySet()) { + Assertions.assertTrue(compare(compactionPlan.streamDataBlocksMap().get(objectId), expectObjectStreamDataBlocks.get(objectId))); + } + } + + @Test + public void testCompactionPlans2() { + CompactionAnalyzer compactionAnalyzer = new CompactionAnalyzer(300, EXECUTION_SCORE_THRESHOLD, 100, s3Operator); + List compactionPlans = compactionAnalyzer.analyze(S3_WAL_OBJECT_METADATA_LIST); + checkCompactionPlan2(compactionPlans); + } + + @Test + public void testCompactionPlansWithInvalidObject() { + CompactionAnalyzer compactionAnalyzer = new CompactionAnalyzer(300, EXECUTION_SCORE_THRESHOLD, 100, s3Operator); + List s3ObjectMetadata = new ArrayList<>(S3_WAL_OBJECT_METADATA_LIST); + s3ObjectMetadata.add(new S3WALObjectMetadata(new S3WALObject(100, 0, Map.of( + STREAM_2, List.of(new StreamOffsetRange(STREAM_2, 1000, 1200)) + ), 0), new S3ObjectMetadata(100, 0, S3ObjectType.WAL))); + List compactionPlans = compactionAnalyzer.analyze(s3ObjectMetadata); + checkCompactionPlan2(compactionPlans); + } +} diff --git a/core/src/test/java/kafka/log/s3/compact/CompactionManagerTest.java b/core/src/test/java/kafka/log/s3/compact/CompactionManagerTest.java new file mode 100644 index 0000000000..8de7d4627d --- /dev/null +++ b/core/src/test/java/kafka/log/s3/compact/CompactionManagerTest.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.log.s3.compact; + +import kafka.log.s3.compact.objects.CompactionType; +import kafka.log.s3.compact.objects.StreamDataBlock; +import kafka.log.s3.metadata.StreamMetadataManager; +import kafka.log.s3.objects.CommitWALObjectRequest; +import kafka.server.KafkaConfig; +import org.apache.kafka.metadata.stream.S3WALObjectMetadata; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; +import org.mockito.Mockito; + +import java.util.List; +import java.util.stream.Collectors; + +@Timeout(30) +@Tag("S3Unit") +public class CompactionManagerTest extends CompactionTestBase { + private static final int BROKER0 = 0; + private StreamMetadataManager streamMetadataManager; + private CompactionAnalyzer compactionAnalyzer; + private CompactionManager compactionManager; + + @BeforeEach + public void setUp() throws Exception { + super.setUp(); + KafkaConfig config = Mockito.mock(KafkaConfig.class); + Mockito.when(config.brokerId()).thenReturn(BROKER0); + Mockito.when(config.s3ObjectCompactionNWInBandwidth()).thenReturn(500L); + Mockito.when(config.s3ObjectCompactionNWOutBandwidth()).thenReturn(500L); + Mockito.when(config.s3ObjectCompactionUploadConcurrency()).thenReturn(3); + Mockito.when(config.s3ObjectPartSize()).thenReturn(100); + Mockito.when(config.s3ObjectCompactionCacheSize()).thenReturn(300L); + Mockito.when(config.s3ObjectCompactionExecutionScoreThreshold()).thenReturn(0.5); + Mockito.when(config.s3ObjectCompactionStreamSplitSize()).thenReturn(100L); + streamMetadataManager = Mockito.mock(StreamMetadataManager.class); + Mockito.when(streamMetadataManager.getWALObjects()).thenReturn(S3_WAL_OBJECT_METADATA_LIST); + compactionAnalyzer = new CompactionAnalyzer(config.s3ObjectCompactionCacheSize(), + config.s3ObjectCompactionExecutionScoreThreshold(), config.s3ObjectCompactionStreamSplitSize(), s3Operator); + compactionManager = new CompactionManager(config, objectManager, streamMetadataManager, s3Operator); + } + + @AfterEach + public void tearDown() { + super.tearDown(); + } + + @Test + public void testCompact() { + List s3ObjectMetadata = this.streamMetadataManager.getWALObjects(); + List compactionPlans = this.compactionAnalyzer.analyze(s3ObjectMetadata); + Assertions.assertEquals(2, compactionPlans.size()); + CommitWALObjectRequest request = compactionManager.buildCompactRequest(compactionPlans, s3ObjectMetadata); + + Assertions.assertEquals(List.of(OBJECT_0, OBJECT_1, OBJECT_2), request.getCompactedObjectIds()); + Assertions.assertEquals(OBJECT_0, request.getOrderId()); + Assertions.assertTrue(request.getObjectId() > OBJECT_2); + request.getStreamObjects().forEach(s -> Assertions.assertTrue(s.getObjectId() > OBJECT_2)); + Assertions.assertEquals(3, request.getStreamObjects().size()); + Assertions.assertEquals(2, request.getStreamRanges().size()); + + List walStreamDataBlocks = compactionPlans.stream() + .map(p -> p.compactedObjects().stream() + .filter(c -> c.type() == CompactionType.COMPACT) + .flatMap(c -> c.streamDataBlocks().stream()).collect(Collectors.toList())) + .flatMap(List::stream).collect(Collectors.toList()); + long expectedWALSize = calculateObjectSize(walStreamDataBlocks); + Assertions.assertEquals(expectedWALSize, request.getObjectSize()); + } + + @Test + public void testCompactNoneExistObjects() { + List s3ObjectMetadata = this.streamMetadataManager.getWALObjects(); + List compactionPlans = this.compactionAnalyzer.analyze(s3ObjectMetadata); + s3Operator.delete(s3ObjectMetadata.get(0).getObjectMetadata().key()).join(); + + Assertions.assertThrowsExactly(IllegalArgumentException.class, () -> compactionManager.buildCompactRequest(compactionPlans, s3ObjectMetadata)); + } +} diff --git a/core/src/test/java/kafka/log/s3/compact/CompactionTestBase.java b/core/src/test/java/kafka/log/s3/compact/CompactionTestBase.java new file mode 100644 index 0000000000..857265e46f --- /dev/null +++ b/core/src/test/java/kafka/log/s3/compact/CompactionTestBase.java @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.log.s3.compact; + +import kafka.log.s3.ObjectWriter; +import kafka.log.s3.TestUtils; +import kafka.log.s3.compact.objects.CompactedObject; +import kafka.log.s3.compact.objects.CompactedObjectBuilder; +import kafka.log.s3.compact.objects.StreamDataBlock; +import kafka.log.s3.memory.MemoryMetadataManager; +import kafka.log.s3.model.StreamRecordBatch; +import kafka.log.s3.operator.MemoryS3Operator; +import kafka.log.s3.operator.S3Operator; +import org.apache.kafka.metadata.stream.S3ObjectMetadata; +import org.apache.kafka.metadata.stream.S3ObjectType; +import org.apache.kafka.metadata.stream.S3WALObject; +import org.apache.kafka.metadata.stream.S3WALObjectMetadata; +import org.apache.kafka.metadata.stream.StreamOffsetRange; +import org.junit.jupiter.api.Assertions; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +public class CompactionTestBase { + protected static final int BROKER_0 = 0; + protected static final long STREAM_0 = 0; + protected static final long STREAM_1 = 1; + protected static final long STREAM_2 = 2; + protected static final long OBJECT_0 = 0; + protected static final long OBJECT_1 = 1; + protected static final long OBJECT_2 = 2; + protected static final long CACHE_SIZE = 1024; + protected static final double EXECUTION_SCORE_THRESHOLD = 0.5; + protected static final long STREAM_SPLIT_SIZE = 30; + protected static final List S3_WAL_OBJECT_METADATA_LIST = new ArrayList<>(); + protected MemoryMetadataManager objectManager; + protected S3Operator s3Operator; + + public void setUp() throws Exception { + objectManager = new MemoryMetadataManager(); + objectManager.start(); + s3Operator = new MemoryS3Operator(); + // stream data for object 0 + objectManager.prepareObject(1, TimeUnit.MINUTES.toMillis(30)).thenAccept(objectId -> { + Assertions.assertEquals(OBJECT_0, objectId); + ObjectWriter objectWriter = new ObjectWriter(objectId, s3Operator, 1024, 1024); + StreamRecordBatch r1 = new StreamRecordBatch(STREAM_0, 0, 0, 20, TestUtils.random(20)); + StreamRecordBatch r2 = new StreamRecordBatch(STREAM_1, 0, 30, 30, TestUtils.random(30)); + StreamRecordBatch r3 = new StreamRecordBatch(STREAM_2, 0, 30, 30, TestUtils.random(30)); + objectWriter.write(STREAM_0, List.of(r1)); + objectWriter.write(STREAM_1, List.of(r2)); + objectWriter.write(STREAM_2, List.of(r3)); + objectWriter.close().join(); + S3ObjectMetadata objectMetadata = new S3ObjectMetadata(OBJECT_0, objectWriter.size(), S3ObjectType.WAL); + Map> streamsIndex = Map.of( + STREAM_0, List.of(new StreamOffsetRange(STREAM_0, 0, 20)), + STREAM_1, List.of(new StreamOffsetRange(STREAM_1, 30, 60)), + STREAM_2, List.of(new StreamOffsetRange(STREAM_2, 30, 60)) + ); + S3WALObject walObject = new S3WALObject(OBJECT_0, BROKER_0, streamsIndex, OBJECT_0); + S3_WAL_OBJECT_METADATA_LIST.add(new S3WALObjectMetadata(walObject, objectMetadata)); + }).join(); + + // stream data for object 1 + objectManager.prepareObject(1, TimeUnit.MINUTES.toMillis(30)).thenAccept(objectId -> { + Assertions.assertEquals(OBJECT_1, objectId); + ObjectWriter objectWriter = new ObjectWriter(OBJECT_1, s3Operator, 1024, 1024); + StreamRecordBatch r4 = new StreamRecordBatch(STREAM_0, 0, 20, 5, TestUtils.random(5)); + StreamRecordBatch r5 = new StreamRecordBatch(STREAM_1, 0, 60, 60, TestUtils.random(60)); + objectWriter.write(STREAM_0, List.of(r4)); + objectWriter.write(STREAM_1, List.of(r5)); + objectWriter.close().join(); + S3ObjectMetadata objectMetadata = new S3ObjectMetadata(OBJECT_1, objectWriter.size(), S3ObjectType.WAL); + Map> streamsIndex = Map.of( + STREAM_0, List.of(new StreamOffsetRange(STREAM_0, 20, 25)), + STREAM_1, List.of(new StreamOffsetRange(STREAM_1, 60, 120)) + ); + S3WALObject walObject = new S3WALObject(OBJECT_1, BROKER_0, streamsIndex, OBJECT_1); + S3_WAL_OBJECT_METADATA_LIST.add(new S3WALObjectMetadata(walObject, objectMetadata)); + }).join(); + + // stream data for object 2 + objectManager.prepareObject(1, TimeUnit.MINUTES.toMillis(30)).thenAccept(objectId -> { + Assertions.assertEquals(OBJECT_2, objectId); + ObjectWriter objectWriter = new ObjectWriter(OBJECT_2, s3Operator, 1024, 1024); + // redundant record + StreamRecordBatch r6 = new StreamRecordBatch(STREAM_1, 0, 260, 20, TestUtils.random(20)); + StreamRecordBatch r7 = new StreamRecordBatch(STREAM_1, 0, 400, 100, TestUtils.random(100)); + StreamRecordBatch r8 = new StreamRecordBatch(STREAM_2, 0, 230, 40, TestUtils.random(40)); + objectWriter.write(STREAM_1, List.of(r6)); + objectWriter.write(STREAM_1, List.of(r7)); + objectWriter.write(STREAM_2, List.of(r8)); + objectWriter.close().join(); + S3ObjectMetadata objectMetadata = new S3ObjectMetadata(OBJECT_2, objectWriter.size(), S3ObjectType.WAL); + Map> streamsIndex = Map.of( + STREAM_1, List.of(new StreamOffsetRange(STREAM_1, 400, 500)), + STREAM_2, List.of(new StreamOffsetRange(STREAM_2, 230, 270)) + ); + S3WALObject walObject = new S3WALObject(OBJECT_2, BROKER_0, streamsIndex, OBJECT_2); + S3_WAL_OBJECT_METADATA_LIST.add(new S3WALObjectMetadata(walObject, objectMetadata)); + }).join(); + } + + public void tearDown() { + S3_WAL_OBJECT_METADATA_LIST.clear(); + objectManager.shutdown(); + } + + protected boolean compare(StreamDataBlock block1, StreamDataBlock block2) { + return block1.getStreamId() == block2.getStreamId() && + block1.getStartOffset() == block2.getStartOffset() && + block1.getEndOffset() == block2.getEndOffset() && + block1.getRecordCount() == block2.getRecordCount() && + block1.getObjectId() == block2.getObjectId(); + } + + protected boolean compare(List streamDataBlocks1, List streamDataBlocks2) { + if (streamDataBlocks1.size() != streamDataBlocks2.size()) { + return false; + } + for (int i = 0; i < streamDataBlocks1.size(); i++) { + if (!compare(streamDataBlocks1.get(i), streamDataBlocks2.get(i))) { + return false; + } + } + return true; + } + + protected boolean compare(CompactedObjectBuilder builder1, CompactedObjectBuilder builder2) { + if (builder1.type() != builder2.type()) { + return false; + } + return compare(builder1.streamDataBlocks(), builder2.streamDataBlocks()); + } + + protected boolean compare(CompactedObject compactedObject1, CompactedObject compactedObject2) { + if (compactedObject1.type() != compactedObject2.type()) { + return false; + } + return compare(compactedObject1.streamDataBlocks(), compactedObject2.streamDataBlocks()); + } + + protected long calculateObjectSize(List streamDataBlocks) { + long bodySize = streamDataBlocks.stream().mapToLong(StreamDataBlock::getBlockSize).sum(); + long indexBlockSize = 4 + 40L * streamDataBlocks.size(); + long tailSize = 48; + return bodySize + indexBlockSize + tailSize; + } +} diff --git a/core/src/test/java/kafka/log/s3/compact/CompactionUploaderTest.java b/core/src/test/java/kafka/log/s3/compact/CompactionUploaderTest.java new file mode 100644 index 0000000000..99af740b9d --- /dev/null +++ b/core/src/test/java/kafka/log/s3/compact/CompactionUploaderTest.java @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.log.s3.compact; + +import kafka.log.s3.TestUtils; +import kafka.log.s3.compact.objects.CompactedObject; +import kafka.log.s3.compact.objects.CompactionType; +import kafka.log.s3.compact.objects.StreamDataBlock; +import kafka.log.s3.compact.operator.DataBlockReader; +import kafka.log.s3.memory.MemoryMetadataManager; +import kafka.log.s3.objects.ObjectStreamRange; +import kafka.log.s3.objects.StreamObject; +import kafka.log.s3.operator.MemoryS3Operator; +import kafka.server.KafkaConfig; +import org.apache.kafka.metadata.stream.S3ObjectMetadata; +import org.apache.kafka.metadata.stream.S3ObjectType; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; +import org.mockito.Mockito; + +import java.util.List; +import java.util.concurrent.CompletableFuture; + +@Timeout(30) +@Tag("S3Unit") +public class CompactionUploaderTest extends CompactionTestBase { + + private MemoryMetadataManager objectManager; + private KafkaConfig config; + + @BeforeEach + public void setUp() throws Exception { + s3Operator = new MemoryS3Operator(); + objectManager = new MemoryMetadataManager(); + objectManager.start(); + config = Mockito.mock(KafkaConfig.class); + Mockito.when(config.s3ObjectCompactionNWOutBandwidth()).thenReturn(500L); + Mockito.when(config.s3ObjectCompactionUploadConcurrency()).thenReturn(3); + Mockito.when(config.s3ObjectPartSize()).thenReturn(100); + } + + @AfterEach + public void tearDown() { + objectManager.shutdown(); + } + + @Test + public void testWriteWALObject() { + List streamDataBlocks = List.of( + new StreamDataBlock(STREAM_0, 0, 20, -1, -1, 0, 20, 1), + new StreamDataBlock(STREAM_0, 20, 25, -1, -1, 20, 5, 1), + new StreamDataBlock(STREAM_2, 40, 120, -1, -1, 25, 80, 1), + new StreamDataBlock(STREAM_2, 120, 150, -1, -1, 105, 30, 1)); + CompactedObject compactedObject = new CompactedObject(CompactionType.COMPACT, streamDataBlocks, 100); + List result = CompactionUtils.buildObjectStreamRange(compactedObject); + Assertions.assertEquals(2, result.size()); + Assertions.assertEquals(STREAM_0, result.get(0).getStreamId()); + Assertions.assertEquals(0, result.get(0).getStartOffset()); + Assertions.assertEquals(25, result.get(0).getEndOffset()); + Assertions.assertEquals(STREAM_2, result.get(1).getStreamId()); + Assertions.assertEquals(40, result.get(1).getStartOffset()); + Assertions.assertEquals(150, result.get(1).getEndOffset()); + + CompactionUploader uploader = new CompactionUploader(objectManager, s3Operator, config); + CompletableFuture> cf = uploader.writeWALObject(compactedObject); + for (StreamDataBlock streamDataBlock : streamDataBlocks) { + streamDataBlock.getDataCf().complete(TestUtils.random((int) streamDataBlock.getStreamRangeSize())); + } + CompletableFuture writeCf = cf.join(); + uploader.forceUploadWAL(); + writeCf.join(); + long walObjectSize = uploader.completeWAL(); + System.out.printf("write size: %d%n", walObjectSize); + Assertions.assertEquals(walObjectSize, calculateObjectSize(streamDataBlocks)); + + //check s3 object + DataBlockReader reader = new DataBlockReader(new S3ObjectMetadata(OBJECT_0, walObjectSize, S3ObjectType.WAL), s3Operator); + reader.parseDataBlockIndex(); + List streamDataBlocksFromS3 = reader.getDataBlockIndex().join(); + for (int i = 0; i < streamDataBlocks.size(); i++) { + compare(streamDataBlocksFromS3.get(i), streamDataBlocks.get(i)); + } + List blockIndices = CompactionUtils.buildBlockIndicesFromStreamDataBlock(streamDataBlocksFromS3); + List dataBlocks = reader.readBlocks(blockIndices).join(); + for (int i = 0; i < dataBlocks.size(); i++) { + Assertions.assertEquals(streamDataBlocks.get(i).getDataCf().join(), dataBlocks.get(i).buffer()); + } + } + + @Test + public void testWriteStreamObject() { + List streamDataBlocks = List.of( + new StreamDataBlock(STREAM_0, 0, 60, -1, -1, 0, 60, 1), + new StreamDataBlock(STREAM_0, 60, 120, -1, -1, 60, 60, 1)); + CompactedObject compactedObject = new CompactedObject(CompactionType.SPLIT, streamDataBlocks, 100); + + CompactionUploader uploader = new CompactionUploader(objectManager, s3Operator, config); + CompletableFuture cf = uploader.writeStreamObject(compactedObject); + for (StreamDataBlock streamDataBlock : streamDataBlocks) { + streamDataBlock.getDataCf().complete(TestUtils.random((int) streamDataBlock.getStreamRangeSize())); + } + StreamObject streamObject = cf.join(); + System.out.printf("write size: %d%n", streamObject.getObjectSize()); + Assertions.assertEquals(streamObject.getObjectSize(), calculateObjectSize(streamDataBlocks)); + + //check s3 object + DataBlockReader reader = new DataBlockReader(new S3ObjectMetadata(OBJECT_0, streamObject.getObjectSize(), S3ObjectType.STREAM), s3Operator); + reader.parseDataBlockIndex(); + List streamDataBlocksFromS3 = reader.getDataBlockIndex().join(); + for (int i = 0; i < streamDataBlocks.size(); i++) { + compare(streamDataBlocksFromS3.get(i), streamDataBlocks.get(i)); + } + List blockIndices = CompactionUtils.buildBlockIndicesFromStreamDataBlock(streamDataBlocksFromS3); + List dataBlocks = reader.readBlocks(blockIndices).join(); + for (int i = 0; i < dataBlocks.size(); i++) { + Assertions.assertEquals(streamDataBlocks.get(i).getDataCf().join(), dataBlocks.get(i).buffer()); + } + } +} diff --git a/metadata/src/main/java/org/apache/kafka/image/S3StreamsMetadataImage.java b/metadata/src/main/java/org/apache/kafka/image/S3StreamsMetadataImage.java index 27bb183a88..cb1f970e3d 100644 --- a/metadata/src/main/java/org/apache/kafka/image/S3StreamsMetadataImage.java +++ b/metadata/src/main/java/org/apache/kafka/image/S3StreamsMetadataImage.java @@ -34,6 +34,7 @@ import org.apache.kafka.metadata.stream.S3ObjectMetadata; import org.apache.kafka.metadata.stream.StreamOffsetRange; import org.apache.kafka.metadata.stream.S3StreamObject; +import org.apache.kafka.metadata.stream.S3WALObject; import org.apache.kafka.server.common.ApiMessageAndVersion; public final class S3StreamsMetadataImage { @@ -125,6 +126,14 @@ public List getStreamObjects(long streamId, long startOffset, lo }).sorted(Comparator.comparing(S3StreamObject::streamOffsetRange)).limit(limit).collect(Collectors.toCollection(ArrayList::new)); } + public List getWALObjects(int brokerId) { + BrokerS3WALMetadataImage wal = brokerWALMetadata.get(brokerId); + if (wal == null) { + return Collections.emptyList(); + } + return wal.getWalObjects().list(); + } + private List rangeSearchers(long streamId, long startOffset, long endOffset) { S3StreamMetadataImage streamMetadata = streamsMetadata.get(streamId); List rangeSearchers = new ArrayList<>(); diff --git a/metadata/src/main/java/org/apache/kafka/metadata/stream/S3WALObjectMetadata.java b/metadata/src/main/java/org/apache/kafka/metadata/stream/S3WALObjectMetadata.java new file mode 100644 index 0000000000..84ea92afcf --- /dev/null +++ b/metadata/src/main/java/org/apache/kafka/metadata/stream/S3WALObjectMetadata.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.metadata.stream; + +public class S3WALObjectMetadata { + private final S3WALObject walObject; + private final S3ObjectMetadata objectMetadata; + + public S3WALObjectMetadata(S3WALObject walObject, S3ObjectMetadata objectMetadata) { + this.walObject = walObject; + this.objectMetadata = objectMetadata; + } + + public S3WALObject getWalObject() { + return walObject; + } + + public S3ObjectMetadata getObjectMetadata() { + return objectMetadata; + } +}