diff --git a/config/kraft/server.properties b/config/kraft/server.properties index 361c96cb51..d08adcda07 100644 --- a/config/kraft/server.properties +++ b/config/kraft/server.properties @@ -158,4 +158,10 @@ elasticstream.endpoint=s3:// s3.endpoint=https://s3.amazonaws.com s3.region=us-east-1 s3.bucket=ko3 -s3.wal.path=/tmp/kraft-combined-logs/s3wal \ No newline at end of file +s3.wal.path=/tmp/kraft-combined-logs/s3wal +# 10 minutes +s3.object.compaction.interval.minutes=10 +# 2G +s3.object.compaction.cache.size=2147483648 +# 16MB +s3.object.compaction.stream.split.size=16777216 \ No newline at end of file diff --git a/core/src/main/scala/kafka/log/s3/compact/CompactionAnalyzer.java b/core/src/main/scala/kafka/log/s3/compact/CompactionAnalyzer.java index 1fc5f4610b..71b9b2e845 100644 --- a/core/src/main/scala/kafka/log/s3/compact/CompactionAnalyzer.java +++ b/core/src/main/scala/kafka/log/s3/compact/CompactionAnalyzer.java @@ -21,28 +21,33 @@ import kafka.log.s3.compact.objects.CompactedObjectBuilder; import kafka.log.s3.compact.objects.CompactionType; import kafka.log.s3.compact.objects.StreamDataBlock; -import kafka.log.s3.compact.operator.DataBlockReader; import kafka.log.s3.operator.S3Operator; -import org.apache.kafka.metadata.stream.S3WALObject; +import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.metadata.stream.S3WALObjectMetadata; import org.slf4j.Logger; import java.util.ArrayList; +import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.TreeMap; -import java.util.concurrent.CompletableFuture; import java.util.stream.Collectors; public class CompactionAnalyzer { - private final static Logger LOGGER = org.slf4j.LoggerFactory.getLogger(CompactionAnalyzer.class); + private final Logger logger; private final long compactionCacheSize; private final double executionScoreThreshold; private final long streamSplitSize; private final S3Operator s3Operator; public CompactionAnalyzer(long compactionCacheSize, double executionScoreThreshold, long streamSplitSize, S3Operator s3Operator) { + this(compactionCacheSize, executionScoreThreshold, streamSplitSize, s3Operator, new LogContext("[CompactionAnalyzer]")); + } + + public CompactionAnalyzer(long compactionCacheSize, double executionScoreThreshold, long streamSplitSize, S3Operator s3Operator, LogContext logContext) { + this.logger = logContext.logger(CompactionAnalyzer.class); this.compactionCacheSize = compactionCacheSize; this.executionScoreThreshold = executionScoreThreshold; this.streamSplitSize = streamSplitSize; @@ -50,6 +55,9 @@ public CompactionAnalyzer(long compactionCacheSize, double executionScoreThresho } public List analyze(List objectMetadataList) { + if (objectMetadataList.isEmpty()) { + return new ArrayList<>(); + } List compactionPlans = new ArrayList<>(); try { List compactedObjectBuilders = buildCompactedObjects(objectMetadataList); @@ -90,7 +98,7 @@ public List analyze(List objectMetadataList } return compactionPlans; } catch (Exception e) { - LOGGER.error("Error while analyzing compaction plan", e); + logger.error("Error while analyzing compaction plan", e); } return compactionPlans; } @@ -133,40 +141,13 @@ private CompactionPlan generateCompactionPlan(List compactedObj } public List buildCompactedObjects(List objects) { - List streamDataBlocks = blockWaitObjectIndices(objects); - if (!shouldCompact(streamDataBlocks)) { + Map> streamDataBlocksMap = CompactionUtils.blockWaitObjectIndices(objects, s3Operator); + Map> filteredMap = filterBlocksToCompact(streamDataBlocksMap); + this.logger.info("{} WAL objects to compact after filter", filteredMap.size()); + if (filteredMap.isEmpty()) { return new ArrayList<>(); } - return compactObjects(sortStreamRangePositions(streamDataBlocks)); - } - - List blockWaitObjectIndices(List objectMetadataList) { - Map s3WALObjectMap = objectMetadataList.stream() - .collect(Collectors.toMap(e -> e.getWalObject().objectId(), S3WALObjectMetadata::getWalObject)); - List>> objectStreamRangePositionFutures = new ArrayList<>(); - for (S3WALObjectMetadata walObjectMetadata : objectMetadataList) { - DataBlockReader dataBlockReader = new DataBlockReader(walObjectMetadata.getObjectMetadata(), s3Operator); - dataBlockReader.parseDataBlockIndex(); - objectStreamRangePositionFutures.add(dataBlockReader.getDataBlockIndex()); - } - return objectStreamRangePositionFutures.stream().flatMap(f -> { - try { - List streamDataBlocks = f.join(); - List validStreamDataBlocks = new ArrayList<>(); - S3WALObject s3WALObject = s3WALObjectMap.get(streamDataBlocks.get(0).getObjectId()); - // filter out invalid stream data blocks in case metadata is inconsistent with S3 index block - for (StreamDataBlock streamDataBlock : streamDataBlocks) { - if (s3WALObject.intersect(streamDataBlock.getStreamId(), streamDataBlock.getStartOffset(), streamDataBlock.getEndOffset())) { - validStreamDataBlocks.add(streamDataBlock); - } - } - return validStreamDataBlocks.stream(); - } catch (Exception ex) { - // continue compaction without invalid object - LOGGER.error("Read on invalid object ", ex); - return null; - } - }).collect(Collectors.toList()); + return compactObjects(sortStreamRangePositions(filteredMap)); } private List compactObjects(List streamDataBlocks) { @@ -186,7 +167,7 @@ private List compactObjects(List stream builder.addStreamDataBlock(streamDataBlock); } else { // should not go there - LOGGER.error("FATAL ERROR: illegal stream range position, last offset: {}, curr: {}", + logger.error("FATAL ERROR: illegal stream range position, last offset: {}, curr: {}", builder.lastOffset(), streamDataBlock); return new ArrayList<>(); } @@ -198,12 +179,18 @@ private List compactObjects(List stream return compactedObjectBuilders; } - boolean shouldCompact(List streamDataBlocks) { - // do compact if there is any stream with data placed in more than one WAL objects - Map streamIdToDistinctObjectMap = streamDataBlocks.stream() - .collect(Collectors.groupingBy(StreamDataBlock::getStreamId, Collectors.mapping(StreamDataBlock::getObjectId, Collectors.toSet()))) - .entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().size())); - return streamIdToDistinctObjectMap.values().stream().filter(v -> v > 1).count() > 1; + Map> filterBlocksToCompact(Map> streamDataBlocksMap) { + Map> streamToObjectIds = streamDataBlocksMap.values().stream() + .flatMap(Collection::stream) + .collect(Collectors.groupingBy(StreamDataBlock::getStreamId, Collectors.mapping(StreamDataBlock::getObjectId, Collectors.toSet()))); + Set objectIdsToCompact = streamToObjectIds + .entrySet().stream() + .filter(e -> e.getValue().size() > 1) + .flatMap(e -> e.getValue().stream()) + .collect(Collectors.toSet()); + return streamDataBlocksMap.entrySet().stream() + .filter(e -> objectIdsToCompact.contains(e.getKey())) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); } private CompactedObjectBuilder splitAndAddBlock(CompactedObjectBuilder builder, @@ -228,11 +215,11 @@ private CompactedObjectBuilder splitObject(CompactedObjectBuilder builder, return builder; } - List sortStreamRangePositions(List streamDataBlocks) { + List sortStreamRangePositions(Map> streamDataBlocksMap) { //TODO: use merge sort Map> sortedStreamObjectMap = new TreeMap<>(); - for (StreamDataBlock streamDataBlock : streamDataBlocks) { - sortedStreamObjectMap.computeIfAbsent(streamDataBlock.getStreamId(), k -> new ArrayList<>()).add(streamDataBlock); + for (List streamDataBlocks : streamDataBlocksMap.values()) { + streamDataBlocks.forEach(e -> sortedStreamObjectMap.computeIfAbsent(e.getStreamId(), k -> new ArrayList<>()).add(e)); } return sortedStreamObjectMap.values().stream().flatMap(list -> { list.sort(StreamDataBlock.STREAM_OFFSET_COMPARATOR); diff --git a/core/src/main/scala/kafka/log/s3/compact/CompactionManager.java b/core/src/main/scala/kafka/log/s3/compact/CompactionManager.java index 8df10adc24..c6be5ff815 100644 --- a/core/src/main/scala/kafka/log/s3/compact/CompactionManager.java +++ b/core/src/main/scala/kafka/log/s3/compact/CompactionManager.java @@ -28,27 +28,29 @@ import kafka.log.s3.objects.StreamObject; import kafka.log.s3.operator.S3Operator; import kafka.server.KafkaConfig; +import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.metadata.stream.S3ObjectMetadata; import org.apache.kafka.metadata.stream.S3WALObjectMetadata; import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; public class CompactionManager { - private final static Logger LOGGER = LoggerFactory.getLogger(CompactionManager.class); + private final Logger logger; private final ObjectManager objectManager; private final StreamMetadataManager streamMetadataManager; private final S3Operator s3Operator; private final CompactionAnalyzer compactionAnalyzer; - private final ScheduledExecutorService executorService; + private final ScheduledExecutorService scheduledExecutorService; + private final ExecutorService executorService; private final CompactionUploader uploader; private final long compactionCacheSize; private final double executionScoreThreshold; @@ -58,6 +60,7 @@ public class CompactionManager { private final TokenBucketThrottle networkInThrottle; public CompactionManager(KafkaConfig config, ObjectManager objectManager, StreamMetadataManager streamMetadataManager, S3Operator s3Operator) { + this.logger = new LogContext(String.format("[CompactionManager id=%d] ", config.brokerId())).logger(CompactionManager.class); this.objectManager = objectManager; this.streamMetadataManager = streamMetadataManager; this.s3Operator = s3Operator; @@ -68,51 +71,64 @@ public CompactionManager(KafkaConfig config, ObjectManager objectManager, Stream this.forceSplitObjectPeriod = config.s3ObjectCompactionForceSplitPeriod(); this.networkInThrottle = new TokenBucketThrottle(config.s3ObjectCompactionNWInBandwidth()); this.uploader = new CompactionUploader(objectManager, s3Operator, config); - this.compactionAnalyzer = new CompactionAnalyzer(compactionCacheSize, executionScoreThreshold, streamSplitSize, s3Operator); - this.executorService = Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("object-compaction-manager")); + this.compactionAnalyzer = new CompactionAnalyzer(compactionCacheSize, executionScoreThreshold, streamSplitSize, + s3Operator, new LogContext(String.format("[CompactionAnalyzer id=%d] ", config.brokerId()))); + this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("object-compaction-manager")); + this.executorService = Executors.newFixedThreadPool(8, new DefaultThreadFactory("force-split-executor")); + this.logger.info("Compaction manager initialized with config: compactionInterval: {} min, compactionCacheSize: {} bytes, " + + "executionScoreThreshold: {}, streamSplitSize: {} bytes, forceSplitObjectPeriod: {} min", + compactionInterval, compactionCacheSize, executionScoreThreshold, streamSplitSize, forceSplitObjectPeriod); } public void start() { - this.executorService.scheduleWithFixedDelay(() -> { + this.scheduledExecutorService.scheduleWithFixedDelay(() -> { + logger.info("Compaction started"); long start = System.currentTimeMillis(); this.compact() - .thenAccept(result -> LOGGER.info("Compaction complete, cost {} ms, result {}", + .thenAccept(result -> logger.info("Compaction complete, total cost {} ms, result {}", System.currentTimeMillis() - start, result)) .exceptionally(ex -> { - LOGGER.error("Compaction failed, cost {} ms, ", System.currentTimeMillis() - start, ex); + logger.error("Compaction failed, cost {} ms, ", System.currentTimeMillis() - start, ex); return null; }); - }, 0, this.compactionInterval, TimeUnit.MINUTES); + }, 1, this.compactionInterval, TimeUnit.MINUTES); } public void shutdown() { - this.executorService.shutdown(); + this.scheduledExecutorService.shutdown(); this.networkInThrottle.stop(); this.uploader.stop(); } public CompletableFuture compact() { List s3ObjectMetadata = this.streamMetadataManager.getWALObjects(); + logger.info("Get {} WAL objects from metadata", s3ObjectMetadata.size()); Map> objectMetadataFilterMap = s3ObjectMetadata.stream() .collect(Collectors.partitioningBy(e -> (System.currentTimeMillis() - e.getWalObject().dataTimeInMs()) - >= this.forceSplitObjectPeriod)); + >= TimeUnit.MINUTES.toMillis(this.forceSplitObjectPeriod))); // force split objects that exists for too long - splitWALObjects(objectMetadataFilterMap.get(true)).thenAccept(v -> LOGGER.info("Force split {} objects", - objectMetadataFilterMap.get(true).size())); + logger.info("{} WAL objects need force split", objectMetadataFilterMap.get(true).size()); + long splitStart = System.currentTimeMillis(); + splitWALObjects(objectMetadataFilterMap.get(true)).thenAccept(v -> + logger.info("Force split {} objects, time cost: {} ms", objectMetadataFilterMap.get(true).size(), System.currentTimeMillis() - splitStart)); try { + logger.info("{} WAL objects as compact candidates", objectMetadataFilterMap.get(false).size()); + long compactionStart = System.currentTimeMillis(); List compactionPlans = this.compactionAnalyzer.analyze(objectMetadataFilterMap.get(false)); + logger.info("Analyze compaction plans complete, {} plans generated", compactionPlans.size()); if (compactionPlans.isEmpty()) { return CompletableFuture.completedFuture(CompactResult.SKIPPED); } CommitWALObjectRequest request = buildCompactRequest(compactionPlans, s3ObjectMetadata); + logger.info("Build compact request complete, time cost: {} ms, start committing objects", System.currentTimeMillis() - compactionStart); return objectManager.commitWALObject(request).thenApply(nil -> { - LOGGER.info("Commit compact request succeed, WAL object id: {}, size: {}, stream object num: {}", - request.getObjectId(), request.getObjectSize(), request.getStreamObjects().size()); + logger.info("Commit compact request succeed, WAL object id: {}, size: {}, stream object num: {}, time cost: {} ms", + request.getObjectId(), request.getObjectSize(), request.getStreamObjects().size(), System.currentTimeMillis() - compactionStart); return CompactResult.SUCCESS; }); } catch (Exception e) { - LOGGER.error("Error while compaction objects", e); + logger.error("Error while compaction objects", e); return CompletableFuture.failedFuture(e); } @@ -121,7 +137,7 @@ public CompletableFuture compact() { public CompletableFuture forceSplitAll() { CompletableFuture cf = new CompletableFuture<>(); //TODO: deal with metadata delay - this.executorService.execute(() -> splitWALObjects(this.streamMetadataManager.getWALObjects()) + this.scheduledExecutorService.execute(() -> splitWALObjects(this.streamMetadataManager.getWALObjects()) .thenAccept(v -> cf.complete(null)) .exceptionally(ex -> { cf.completeExceptionally(ex); @@ -132,7 +148,27 @@ public CompletableFuture forceSplitAll() { } CompletableFuture splitWALObjects(List objectMetadataList) { - return CompletableFuture.completedFuture(null); + if (objectMetadataList.isEmpty()) { + return CompletableFuture.completedFuture(null); + } + + Map> streamDataBlocksMap = CompactionUtils.blockWaitObjectIndices(objectMetadataList, s3Operator); + List> splitFutureList = new ArrayList<>(); + for (Map.Entry> entry : streamDataBlocksMap.entrySet()) { + List streamDataBlocks = entry.getValue(); + for (StreamDataBlock streamDataBlock : streamDataBlocks) { + splitFutureList.add(objectManager.prepareObject(1, TimeUnit.MINUTES.toMillis(30)) + .thenAcceptAsync(objectId -> { + logger.info("Split {} to {}", streamDataBlock, objectId); + //TODO: implement this + }, executorService).exceptionally(ex -> { + logger.error("Prepare object failed", ex); + return null; + }) + ); + } + } + return CompletableFuture.allOf(splitFutureList.toArray(new CompletableFuture[0])); } CommitWALObjectRequest buildCompactRequest(List compactionPlans, List s3ObjectMetadata) @@ -154,7 +190,7 @@ CommitWALObjectRequest buildCompactRequest(List compactionPlans, streamDataBlock.getDataCf().complete(dataBlocks.get(i).buffer()); } }).exceptionally(ex -> { - LOGGER.error("read on invalid object {}, ex ", metadata.key(), ex); + logger.error("read on invalid object {}, ex ", metadata.key(), ex); for (int i = 0; i < blockIndices.size(); i++) { StreamDataBlock streamDataBlock = streamDataBlocks.get(i); streamDataBlock.getDataCf().completeExceptionally(ex); @@ -186,11 +222,10 @@ CommitWALObjectRequest buildCompactRequest(List compactionPlans, } streamObjectCFList.stream().map(CompletableFuture::join).forEach(request::addStreamObject); } catch (Exception ex) { - LOGGER.error("Error while uploading compaction objects", ex); + logger.error("Error while uploading compaction objects", ex); uploader.reset(); throw new IllegalArgumentException("Error while uploading compaction objects", ex); } -// compactionPlan.streamDataBlocksMap().values().forEach(e -> e.forEach(StreamDataBlock::free)); } request.setObjectId(uploader.getWALObjectId()); // set wal object id to be the first object id of compacted objects diff --git a/core/src/main/scala/kafka/log/s3/compact/CompactionUtils.java b/core/src/main/scala/kafka/log/s3/compact/CompactionUtils.java index 0b4e5a1e63..554c6f1f21 100644 --- a/core/src/main/scala/kafka/log/s3/compact/CompactionUtils.java +++ b/core/src/main/scala/kafka/log/s3/compact/CompactionUtils.java @@ -21,9 +21,18 @@ import kafka.log.s3.compact.objects.StreamDataBlock; import kafka.log.s3.compact.operator.DataBlockReader; import kafka.log.s3.objects.ObjectStreamRange; +import kafka.log.s3.operator.S3Operator; +import org.apache.kafka.metadata.stream.S3WALObject; +import org.apache.kafka.metadata.stream.S3WALObjectMetadata; +import java.util.AbstractMap; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; public class CompactionUtils { public static List buildObjectStreamRange(CompactedObject compactedObject) { @@ -55,4 +64,35 @@ public static List buildBlockIndicesFromStreamDa } return blockIndices; } + + public static Map> blockWaitObjectIndices(List objectMetadataList, S3Operator s3Operator) { + Map s3WALObjectMap = objectMetadataList.stream() + .collect(Collectors.toMap(e -> e.getWalObject().objectId(), S3WALObjectMetadata::getWalObject)); + Map>> objectStreamRangePositionFutures = new HashMap<>(); + for (S3WALObjectMetadata walObjectMetadata : objectMetadataList) { + DataBlockReader dataBlockReader = new DataBlockReader(walObjectMetadata.getObjectMetadata(), s3Operator); + dataBlockReader.parseDataBlockIndex(); + objectStreamRangePositionFutures.put(walObjectMetadata.getObjectMetadata().objectId(), dataBlockReader.getDataBlockIndex()); + } + return objectStreamRangePositionFutures.entrySet().stream() + .map(f -> { + try { + List streamDataBlocks = f.getValue().join(); + List validStreamDataBlocks = new ArrayList<>(); + S3WALObject s3WALObject = s3WALObjectMap.get(streamDataBlocks.get(0).getObjectId()); + // filter out invalid stream data blocks in case metadata is inconsistent with S3 index block + for (StreamDataBlock streamDataBlock : streamDataBlocks) { + if (s3WALObject.intersect(streamDataBlock.getStreamId(), streamDataBlock.getStartOffset(), streamDataBlock.getEndOffset())) { + validStreamDataBlocks.add(streamDataBlock); + } + } + return new AbstractMap.SimpleEntry<>(f.getKey(), validStreamDataBlocks); + } catch (Exception ex) { + // continue compaction without invalid object + return null; + } + }) + .filter(Objects::nonNull) + .collect(Collectors.toMap(AbstractMap.SimpleEntry::getKey, AbstractMap.SimpleEntry::getValue)); + } } diff --git a/core/src/test/java/kafka/log/s3/compact/CompactionAnalyzerTest.java b/core/src/test/java/kafka/log/s3/compact/CompactionAnalyzerTest.java index 6ed80b78fe..668872a258 100644 --- a/core/src/test/java/kafka/log/s3/compact/CompactionAnalyzerTest.java +++ b/core/src/test/java/kafka/log/s3/compact/CompactionAnalyzerTest.java @@ -53,33 +53,59 @@ public void tearDown() { @Test public void testReadObjectIndices() { + Map> streamDataBlocksMap = CompactionUtils.blockWaitObjectIndices(S3_WAL_OBJECT_METADATA_LIST, s3Operator); + Map> expectedBlocksMap = Map.of( + OBJECT_0, List.of( + new StreamDataBlock(STREAM_0, 0, 20, 0, OBJECT_0, -1, -1, 1), + new StreamDataBlock(STREAM_1, 30, 60, 1, OBJECT_0, -1, -1, 1), + new StreamDataBlock(STREAM_2, 30, 60, 2, OBJECT_0, -1, -1, 1)), + OBJECT_1, List.of( + new StreamDataBlock(STREAM_0, 20, 25, 0, OBJECT_1, -1, -1, 1), + new StreamDataBlock(STREAM_1, 60, 120, 1, OBJECT_1, -1, -1, 1)), + OBJECT_2, List.of( + new StreamDataBlock(STREAM_1, 400, 500, 0, OBJECT_2, -1, -1, 1), + new StreamDataBlock(STREAM_2, 230, 270, 1, OBJECT_2, -1, -1, 1))); + Assertions.assertTrue(compare(streamDataBlocksMap, expectedBlocksMap)); + } + + @Test + public void testFilterBlocksToCompact() { CompactionAnalyzer compactionAnalyzer = new CompactionAnalyzer(CACHE_SIZE, EXECUTION_SCORE_THRESHOLD, STREAM_SPLIT_SIZE, s3Operator); - List streamDataBlocks = compactionAnalyzer.blockWaitObjectIndices(S3_WAL_OBJECT_METADATA_LIST); - List expectedBlocks = List.of( - new StreamDataBlock(STREAM_0, 0, 20, 0, OBJECT_0, -1, -1, 1), - new StreamDataBlock(STREAM_1, 30, 60, 1, OBJECT_0, -1, -1, 1), - new StreamDataBlock(STREAM_2, 30, 60, 2, OBJECT_0, -1, -1, 1), - new StreamDataBlock(STREAM_0, 20, 25, 0, OBJECT_1, -1, -1, 1), - new StreamDataBlock(STREAM_1, 60, 120, 1, OBJECT_1, -1, -1, 1), - new StreamDataBlock(STREAM_1, 400, 500, 0, OBJECT_2, -1, -1, 1), - new StreamDataBlock(STREAM_2, 230, 270, 1, OBJECT_2, -1, -1, 1)); - for (int i = 0; i < streamDataBlocks.size(); i++) { - Assertions.assertTrue(compare(streamDataBlocks.get(i), expectedBlocks.get(i))); - } + Map> streamDataBlocksMap = CompactionUtils.blockWaitObjectIndices(S3_WAL_OBJECT_METADATA_LIST, s3Operator); + Map> filteredMap = compactionAnalyzer.filterBlocksToCompact(streamDataBlocksMap); + Assertions.assertTrue(compare(filteredMap, streamDataBlocksMap)); } @Test - public void testShouldCompact() { + public void testFilterBlocksToCompact2() { CompactionAnalyzer compactionAnalyzer = new CompactionAnalyzer(CACHE_SIZE, EXECUTION_SCORE_THRESHOLD, STREAM_SPLIT_SIZE, s3Operator); - List streamDataBlocks = compactionAnalyzer.blockWaitObjectIndices(S3_WAL_OBJECT_METADATA_LIST); - Assertions.assertTrue(compactionAnalyzer.shouldCompact(streamDataBlocks)); + Map> streamDataBlocksMap = Map.of( + OBJECT_0, List.of( + new StreamDataBlock(STREAM_0, 0, 20, 0, OBJECT_0, -1, -1, 1), + new StreamDataBlock(STREAM_1, 30, 60, 1, OBJECT_0, -1, -1, 1)), + OBJECT_1, List.of( + new StreamDataBlock(STREAM_0, 20, 25, 0, OBJECT_1, -1, -1, 1), + new StreamDataBlock(STREAM_1, 60, 120, 1, OBJECT_1, -1, -1, 1)), + OBJECT_2, List.of( + new StreamDataBlock(STREAM_2, 230, 270, 1, OBJECT_2, -1, -1, 1)), + OBJECT_3, List.of( + new StreamDataBlock(STREAM_3, 0, 50, 1, OBJECT_3, -1, -1, 1))); + Map> result = compactionAnalyzer.filterBlocksToCompact(streamDataBlocksMap); + Map> expectedBlocksMap = Map.of( + OBJECT_0, List.of( + new StreamDataBlock(STREAM_0, 0, 20, 0, OBJECT_0, -1, -1, 1), + new StreamDataBlock(STREAM_1, 30, 60, 1, OBJECT_0, -1, -1, 1)), + OBJECT_1, List.of( + new StreamDataBlock(STREAM_0, 20, 25, 0, OBJECT_1, -1, -1, 1), + new StreamDataBlock(STREAM_1, 60, 120, 1, OBJECT_1, -1, -1, 1))); + Assertions.assertTrue(compare(result, expectedBlocksMap)); } @Test public void testSortStreamRangePositions() { CompactionAnalyzer compactionAnalyzer = new CompactionAnalyzer(CACHE_SIZE, EXECUTION_SCORE_THRESHOLD, STREAM_SPLIT_SIZE, s3Operator); - List streamDataBlocks = compactionAnalyzer.blockWaitObjectIndices(S3_WAL_OBJECT_METADATA_LIST); - List sortedStreamDataBlocks = compactionAnalyzer.sortStreamRangePositions(streamDataBlocks); + Map> streamDataBlocksMap = CompactionUtils.blockWaitObjectIndices(S3_WAL_OBJECT_METADATA_LIST, s3Operator); + List sortedStreamDataBlocks = compactionAnalyzer.sortStreamRangePositions(streamDataBlocksMap); List expectedBlocks = List.of( new StreamDataBlock(STREAM_0, 0, 20, 0, OBJECT_0, -1, -1, 1), new StreamDataBlock(STREAM_0, 20, 25, 0, OBJECT_1, -1, -1, 1), diff --git a/core/src/test/java/kafka/log/s3/compact/CompactionTestBase.java b/core/src/test/java/kafka/log/s3/compact/CompactionTestBase.java index 857265e46f..0a08d351fb 100644 --- a/core/src/test/java/kafka/log/s3/compact/CompactionTestBase.java +++ b/core/src/test/java/kafka/log/s3/compact/CompactionTestBase.java @@ -43,9 +43,11 @@ public class CompactionTestBase { protected static final long STREAM_0 = 0; protected static final long STREAM_1 = 1; protected static final long STREAM_2 = 2; + protected static final long STREAM_3 = 3; protected static final long OBJECT_0 = 0; protected static final long OBJECT_1 = 1; protected static final long OBJECT_2 = 2; + protected static final long OBJECT_3 = 3; protected static final long CACHE_SIZE = 1024; protected static final double EXECUTION_SCORE_THRESHOLD = 0.5; protected static final long STREAM_SPLIT_SIZE = 30; @@ -143,6 +145,21 @@ protected boolean compare(List streamDataBlocks1, List> streamDataBlockMap1, Map> streamDataBlockMap2) { + if (streamDataBlockMap1.size() != streamDataBlockMap2.size()) { + return false; + } + for (Map.Entry> entry : streamDataBlockMap1.entrySet()) { + long objectId = entry.getKey(); + List streamDataBlocks = entry.getValue(); + Assertions.assertTrue(streamDataBlockMap2.containsKey(objectId)); + if (!compare(streamDataBlocks, streamDataBlockMap2.get(objectId))) { + return false; + } + } + return true; + } + protected boolean compare(CompactedObjectBuilder builder1, CompactedObjectBuilder builder2) { if (builder1.type() != builder2.type()) { return false;