From e017ca10e46706e62c6ec13243f889d34bb61fe6 Mon Sep 17 00:00:00 2001 From: Robin Han Date: Wed, 26 Nov 2025 19:24:16 +0800 Subject: [PATCH] fix(metastream): Compaction may drop certain keys Signed-off-by: Robin Han --- core/src/main/scala/kafka/log/streamaspect/MetaStream.java | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/kafka/log/streamaspect/MetaStream.java b/core/src/main/scala/kafka/log/streamaspect/MetaStream.java index cf73e2c525..0818ab04cb 100644 --- a/core/src/main/scala/kafka/log/streamaspect/MetaStream.java +++ b/core/src/main/scala/kafka/log/streamaspect/MetaStream.java @@ -334,10 +334,9 @@ private synchronized CompletableFuture doCompaction(boolean force) { return CompletableFuture.completedFuture(null); } CompletableFuture overwriteCf = CompletableFuture.allOf(overwrite.stream().map(this::append).toArray(CompletableFuture[]::new)); - return overwriteCf.thenAccept(nil -> { - OptionalLong minOffset = metaCache.values().stream().mapToLong(v -> v.offset).min(); - minOffset.ifPresent(this::trim); - }); + OptionalLong minOffset = metaCache.values().stream().mapToLong(v -> v.offset).min(); + // await overwrite complete then trim to the minimum offset in metaCache + return overwriteCf.thenAccept(nil -> minOffset.ifPresent(this::trim)); } static class MetadataValue {