Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion config/kraft/server.properties
Original file line number Diff line number Diff line change
Expand Up @@ -158,4 +158,10 @@ elasticstream.endpoint=s3://
s3.endpoint=https://s3.amazonaws.com
s3.region=us-east-1
s3.bucket=ko3
s3.wal.path=/tmp/kraft-combined-logs/s3wal
s3.wal.path=/tmp/kraft-combined-logs/s3wal
# 10 minutes
s3.object.compaction.interval.minutes=10
# 2G
s3.object.compaction.cache.size=2147483648
# 16MB
s3.object.compaction.stream.split.size=16777216
81 changes: 34 additions & 47 deletions core/src/main/scala/kafka/log/s3/compact/CompactionAnalyzer.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,35 +21,43 @@
import kafka.log.s3.compact.objects.CompactedObjectBuilder;
import kafka.log.s3.compact.objects.CompactionType;
import kafka.log.s3.compact.objects.StreamDataBlock;
import kafka.log.s3.compact.operator.DataBlockReader;
import kafka.log.s3.operator.S3Operator;
import org.apache.kafka.metadata.stream.S3WALObject;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.metadata.stream.S3WALObjectMetadata;
import org.slf4j.Logger;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;

public class CompactionAnalyzer {
private final static Logger LOGGER = org.slf4j.LoggerFactory.getLogger(CompactionAnalyzer.class);
private final Logger logger;
private final long compactionCacheSize;
private final double executionScoreThreshold;
private final long streamSplitSize;
private final S3Operator s3Operator;

public CompactionAnalyzer(long compactionCacheSize, double executionScoreThreshold, long streamSplitSize, S3Operator s3Operator) {
this(compactionCacheSize, executionScoreThreshold, streamSplitSize, s3Operator, new LogContext("[CompactionAnalyzer]"));
}

public CompactionAnalyzer(long compactionCacheSize, double executionScoreThreshold, long streamSplitSize, S3Operator s3Operator, LogContext logContext) {
this.logger = logContext.logger(CompactionAnalyzer.class);
this.compactionCacheSize = compactionCacheSize;
this.executionScoreThreshold = executionScoreThreshold;
this.streamSplitSize = streamSplitSize;
this.s3Operator = s3Operator;
}

public List<CompactionPlan> analyze(List<S3WALObjectMetadata> objectMetadataList) {
if (objectMetadataList.isEmpty()) {
return new ArrayList<>();
}
List<CompactionPlan> compactionPlans = new ArrayList<>();
try {
List<CompactedObjectBuilder> compactedObjectBuilders = buildCompactedObjects(objectMetadataList);
Expand Down Expand Up @@ -90,7 +98,7 @@ public List<CompactionPlan> analyze(List<S3WALObjectMetadata> objectMetadataList
}
return compactionPlans;
} catch (Exception e) {
LOGGER.error("Error while analyzing compaction plan", e);
logger.error("Error while analyzing compaction plan", e);
}
return compactionPlans;
}
Expand Down Expand Up @@ -133,40 +141,13 @@ private CompactionPlan generateCompactionPlan(List<CompactedObject> compactedObj
}

public List<CompactedObjectBuilder> buildCompactedObjects(List<S3WALObjectMetadata> objects) {
List<StreamDataBlock> streamDataBlocks = blockWaitObjectIndices(objects);
if (!shouldCompact(streamDataBlocks)) {
Map<Long, List<StreamDataBlock>> streamDataBlocksMap = CompactionUtils.blockWaitObjectIndices(objects, s3Operator);
Map<Long, List<StreamDataBlock>> filteredMap = filterBlocksToCompact(streamDataBlocksMap);
this.logger.info("{} WAL objects to compact after filter", filteredMap.size());
if (filteredMap.isEmpty()) {
return new ArrayList<>();
}
return compactObjects(sortStreamRangePositions(streamDataBlocks));
}

List<StreamDataBlock> blockWaitObjectIndices(List<S3WALObjectMetadata> objectMetadataList) {
Map<Long, S3WALObject> s3WALObjectMap = objectMetadataList.stream()
.collect(Collectors.toMap(e -> e.getWalObject().objectId(), S3WALObjectMetadata::getWalObject));
List<CompletableFuture<List<StreamDataBlock>>> objectStreamRangePositionFutures = new ArrayList<>();
for (S3WALObjectMetadata walObjectMetadata : objectMetadataList) {
DataBlockReader dataBlockReader = new DataBlockReader(walObjectMetadata.getObjectMetadata(), s3Operator);
dataBlockReader.parseDataBlockIndex();
objectStreamRangePositionFutures.add(dataBlockReader.getDataBlockIndex());
}
return objectStreamRangePositionFutures.stream().flatMap(f -> {
try {
List<StreamDataBlock> streamDataBlocks = f.join();
List<StreamDataBlock> validStreamDataBlocks = new ArrayList<>();
S3WALObject s3WALObject = s3WALObjectMap.get(streamDataBlocks.get(0).getObjectId());
// filter out invalid stream data blocks in case metadata is inconsistent with S3 index block
for (StreamDataBlock streamDataBlock : streamDataBlocks) {
if (s3WALObject.intersect(streamDataBlock.getStreamId(), streamDataBlock.getStartOffset(), streamDataBlock.getEndOffset())) {
validStreamDataBlocks.add(streamDataBlock);
}
}
return validStreamDataBlocks.stream();
} catch (Exception ex) {
// continue compaction without invalid object
LOGGER.error("Read on invalid object ", ex);
return null;
}
}).collect(Collectors.toList());
return compactObjects(sortStreamRangePositions(filteredMap));
}

private List<CompactedObjectBuilder> compactObjects(List<StreamDataBlock> streamDataBlocks) {
Expand All @@ -186,7 +167,7 @@ private List<CompactedObjectBuilder> compactObjects(List<StreamDataBlock> stream
builder.addStreamDataBlock(streamDataBlock);
} else {
// should not go there
LOGGER.error("FATAL ERROR: illegal stream range position, last offset: {}, curr: {}",
logger.error("FATAL ERROR: illegal stream range position, last offset: {}, curr: {}",
builder.lastOffset(), streamDataBlock);
return new ArrayList<>();
}
Expand All @@ -198,12 +179,18 @@ private List<CompactedObjectBuilder> compactObjects(List<StreamDataBlock> stream
return compactedObjectBuilders;
}

boolean shouldCompact(List<StreamDataBlock> streamDataBlocks) {
// do compact if there is any stream with data placed in more than one WAL objects
Map<Long, Integer> streamIdToDistinctObjectMap = streamDataBlocks.stream()
.collect(Collectors.groupingBy(StreamDataBlock::getStreamId, Collectors.mapping(StreamDataBlock::getObjectId, Collectors.toSet())))
.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().size()));
return streamIdToDistinctObjectMap.values().stream().filter(v -> v > 1).count() > 1;
Map<Long, List<StreamDataBlock>> filterBlocksToCompact(Map<Long, List<StreamDataBlock>> streamDataBlocksMap) {
Map<Long, Set<Long>> streamToObjectIds = streamDataBlocksMap.values().stream()
.flatMap(Collection::stream)
.collect(Collectors.groupingBy(StreamDataBlock::getStreamId, Collectors.mapping(StreamDataBlock::getObjectId, Collectors.toSet())));
Set<Long> objectIdsToCompact = streamToObjectIds
.entrySet().stream()
.filter(e -> e.getValue().size() > 1)
.flatMap(e -> e.getValue().stream())
.collect(Collectors.toSet());
return streamDataBlocksMap.entrySet().stream()
.filter(e -> objectIdsToCompact.contains(e.getKey()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}

private CompactedObjectBuilder splitAndAddBlock(CompactedObjectBuilder builder,
Expand All @@ -228,11 +215,11 @@ private CompactedObjectBuilder splitObject(CompactedObjectBuilder builder,
return builder;
}

List<StreamDataBlock> sortStreamRangePositions(List<StreamDataBlock> streamDataBlocks) {
List<StreamDataBlock> sortStreamRangePositions(Map<Long, List<StreamDataBlock>> streamDataBlocksMap) {
//TODO: use merge sort
Map<Long, List<StreamDataBlock>> sortedStreamObjectMap = new TreeMap<>();
for (StreamDataBlock streamDataBlock : streamDataBlocks) {
sortedStreamObjectMap.computeIfAbsent(streamDataBlock.getStreamId(), k -> new ArrayList<>()).add(streamDataBlock);
for (List<StreamDataBlock> streamDataBlocks : streamDataBlocksMap.values()) {
streamDataBlocks.forEach(e -> sortedStreamObjectMap.computeIfAbsent(e.getStreamId(), k -> new ArrayList<>()).add(e));
}
return sortedStreamObjectMap.values().stream().flatMap(list -> {
list.sort(StreamDataBlock.STREAM_OFFSET_COMPARATOR);
Expand Down
77 changes: 56 additions & 21 deletions core/src/main/scala/kafka/log/s3/compact/CompactionManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,27 +28,29 @@
import kafka.log.s3.objects.StreamObject;
import kafka.log.s3.operator.S3Operator;
import kafka.server.KafkaConfig;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.metadata.stream.S3ObjectMetadata;
import org.apache.kafka.metadata.stream.S3WALObjectMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

public class CompactionManager {
private final static Logger LOGGER = LoggerFactory.getLogger(CompactionManager.class);
private final Logger logger;
private final ObjectManager objectManager;
private final StreamMetadataManager streamMetadataManager;
private final S3Operator s3Operator;
private final CompactionAnalyzer compactionAnalyzer;
private final ScheduledExecutorService executorService;
private final ScheduledExecutorService scheduledExecutorService;
private final ExecutorService executorService;
private final CompactionUploader uploader;
private final long compactionCacheSize;
private final double executionScoreThreshold;
Expand All @@ -58,6 +60,7 @@ public class CompactionManager {
private final TokenBucketThrottle networkInThrottle;

public CompactionManager(KafkaConfig config, ObjectManager objectManager, StreamMetadataManager streamMetadataManager, S3Operator s3Operator) {
this.logger = new LogContext(String.format("[CompactionManager id=%d] ", config.brokerId())).logger(CompactionManager.class);
this.objectManager = objectManager;
this.streamMetadataManager = streamMetadataManager;
this.s3Operator = s3Operator;
Expand All @@ -68,51 +71,64 @@ public CompactionManager(KafkaConfig config, ObjectManager objectManager, Stream
this.forceSplitObjectPeriod = config.s3ObjectCompactionForceSplitPeriod();
this.networkInThrottle = new TokenBucketThrottle(config.s3ObjectCompactionNWInBandwidth());
this.uploader = new CompactionUploader(objectManager, s3Operator, config);
this.compactionAnalyzer = new CompactionAnalyzer(compactionCacheSize, executionScoreThreshold, streamSplitSize, s3Operator);
this.executorService = Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("object-compaction-manager"));
this.compactionAnalyzer = new CompactionAnalyzer(compactionCacheSize, executionScoreThreshold, streamSplitSize,
s3Operator, new LogContext(String.format("[CompactionAnalyzer id=%d] ", config.brokerId())));
this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("object-compaction-manager"));
this.executorService = Executors.newFixedThreadPool(8, new DefaultThreadFactory("force-split-executor"));
this.logger.info("Compaction manager initialized with config: compactionInterval: {} min, compactionCacheSize: {} bytes, " +
"executionScoreThreshold: {}, streamSplitSize: {} bytes, forceSplitObjectPeriod: {} min",
compactionInterval, compactionCacheSize, executionScoreThreshold, streamSplitSize, forceSplitObjectPeriod);
}

public void start() {
this.executorService.scheduleWithFixedDelay(() -> {
this.scheduledExecutorService.scheduleWithFixedDelay(() -> {
logger.info("Compaction started");
long start = System.currentTimeMillis();
this.compact()
.thenAccept(result -> LOGGER.info("Compaction complete, cost {} ms, result {}",
.thenAccept(result -> logger.info("Compaction complete, total cost {} ms, result {}",
System.currentTimeMillis() - start, result))
.exceptionally(ex -> {
LOGGER.error("Compaction failed, cost {} ms, ", System.currentTimeMillis() - start, ex);
logger.error("Compaction failed, cost {} ms, ", System.currentTimeMillis() - start, ex);
return null;
});
}, 0, this.compactionInterval, TimeUnit.MINUTES);
}, 1, this.compactionInterval, TimeUnit.MINUTES);
}

public void shutdown() {
this.executorService.shutdown();
this.scheduledExecutorService.shutdown();
this.networkInThrottle.stop();
this.uploader.stop();
}

public CompletableFuture<CompactResult> compact() {
List<S3WALObjectMetadata> s3ObjectMetadata = this.streamMetadataManager.getWALObjects();
logger.info("Get {} WAL objects from metadata", s3ObjectMetadata.size());
Map<Boolean, List<S3WALObjectMetadata>> objectMetadataFilterMap = s3ObjectMetadata.stream()
.collect(Collectors.partitioningBy(e -> (System.currentTimeMillis() - e.getWalObject().dataTimeInMs())
>= this.forceSplitObjectPeriod));
>= TimeUnit.MINUTES.toMillis(this.forceSplitObjectPeriod)));
// force split objects that exists for too long
splitWALObjects(objectMetadataFilterMap.get(true)).thenAccept(v -> LOGGER.info("Force split {} objects",
objectMetadataFilterMap.get(true).size()));
logger.info("{} WAL objects need force split", objectMetadataFilterMap.get(true).size());
long splitStart = System.currentTimeMillis();
splitWALObjects(objectMetadataFilterMap.get(true)).thenAccept(v ->
logger.info("Force split {} objects, time cost: {} ms", objectMetadataFilterMap.get(true).size(), System.currentTimeMillis() - splitStart));

try {
logger.info("{} WAL objects as compact candidates", objectMetadataFilterMap.get(false).size());
long compactionStart = System.currentTimeMillis();
List<CompactionPlan> compactionPlans = this.compactionAnalyzer.analyze(objectMetadataFilterMap.get(false));
logger.info("Analyze compaction plans complete, {} plans generated", compactionPlans.size());
if (compactionPlans.isEmpty()) {
return CompletableFuture.completedFuture(CompactResult.SKIPPED);
}
CommitWALObjectRequest request = buildCompactRequest(compactionPlans, s3ObjectMetadata);
logger.info("Build compact request complete, time cost: {} ms, start committing objects", System.currentTimeMillis() - compactionStart);
return objectManager.commitWALObject(request).thenApply(nil -> {
LOGGER.info("Commit compact request succeed, WAL object id: {}, size: {}, stream object num: {}",
request.getObjectId(), request.getObjectSize(), request.getStreamObjects().size());
logger.info("Commit compact request succeed, WAL object id: {}, size: {}, stream object num: {}, time cost: {} ms",
request.getObjectId(), request.getObjectSize(), request.getStreamObjects().size(), System.currentTimeMillis() - compactionStart);
return CompactResult.SUCCESS;
});
} catch (Exception e) {
LOGGER.error("Error while compaction objects", e);
logger.error("Error while compaction objects", e);
return CompletableFuture.failedFuture(e);
}

Expand All @@ -121,7 +137,7 @@ public CompletableFuture<CompactResult> compact() {
public CompletableFuture<Void> forceSplitAll() {
CompletableFuture<Void> cf = new CompletableFuture<>();
//TODO: deal with metadata delay
this.executorService.execute(() -> splitWALObjects(this.streamMetadataManager.getWALObjects())
this.scheduledExecutorService.execute(() -> splitWALObjects(this.streamMetadataManager.getWALObjects())
.thenAccept(v -> cf.complete(null))
.exceptionally(ex -> {
cf.completeExceptionally(ex);
Expand All @@ -132,7 +148,27 @@ public CompletableFuture<Void> forceSplitAll() {
}

CompletableFuture<Void> splitWALObjects(List<S3WALObjectMetadata> objectMetadataList) {
return CompletableFuture.completedFuture(null);
if (objectMetadataList.isEmpty()) {
return CompletableFuture.completedFuture(null);
}

Map<Long, List<StreamDataBlock>> streamDataBlocksMap = CompactionUtils.blockWaitObjectIndices(objectMetadataList, s3Operator);
List<CompletableFuture<Void>> splitFutureList = new ArrayList<>();
for (Map.Entry<Long, List<StreamDataBlock>> entry : streamDataBlocksMap.entrySet()) {
List<StreamDataBlock> streamDataBlocks = entry.getValue();
for (StreamDataBlock streamDataBlock : streamDataBlocks) {
splitFutureList.add(objectManager.prepareObject(1, TimeUnit.MINUTES.toMillis(30))
.thenAcceptAsync(objectId -> {
logger.info("Split {} to {}", streamDataBlock, objectId);
//TODO: implement this
}, executorService).exceptionally(ex -> {
logger.error("Prepare object failed", ex);
return null;
})
);
}
}
return CompletableFuture.allOf(splitFutureList.toArray(new CompletableFuture[0]));
}

CommitWALObjectRequest buildCompactRequest(List<CompactionPlan> compactionPlans, List<S3WALObjectMetadata> s3ObjectMetadata)
Expand All @@ -154,7 +190,7 @@ CommitWALObjectRequest buildCompactRequest(List<CompactionPlan> compactionPlans,
streamDataBlock.getDataCf().complete(dataBlocks.get(i).buffer());
}
}).exceptionally(ex -> {
LOGGER.error("read on invalid object {}, ex ", metadata.key(), ex);
logger.error("read on invalid object {}, ex ", metadata.key(), ex);
for (int i = 0; i < blockIndices.size(); i++) {
StreamDataBlock streamDataBlock = streamDataBlocks.get(i);
streamDataBlock.getDataCf().completeExceptionally(ex);
Expand Down Expand Up @@ -186,11 +222,10 @@ CommitWALObjectRequest buildCompactRequest(List<CompactionPlan> compactionPlans,
}
streamObjectCFList.stream().map(CompletableFuture::join).forEach(request::addStreamObject);
} catch (Exception ex) {
LOGGER.error("Error while uploading compaction objects", ex);
logger.error("Error while uploading compaction objects", ex);
uploader.reset();
throw new IllegalArgumentException("Error while uploading compaction objects", ex);
}
// compactionPlan.streamDataBlocksMap().values().forEach(e -> e.forEach(StreamDataBlock::free));
}
request.setObjectId(uploader.getWALObjectId());
// set wal object id to be the first object id of compacted objects
Expand Down
Loading