diff --git a/core/src/main/scala/kafka/log/streamaspect/MetaStream.java b/core/src/main/scala/kafka/log/streamaspect/MetaStream.java index cf73e2c525..0818ab04cb 100644 --- a/core/src/main/scala/kafka/log/streamaspect/MetaStream.java +++ b/core/src/main/scala/kafka/log/streamaspect/MetaStream.java @@ -334,10 +334,9 @@ private synchronized CompletableFuture doCompaction(boolean force) { return CompletableFuture.completedFuture(null); } CompletableFuture overwriteCf = CompletableFuture.allOf(overwrite.stream().map(this::append).toArray(CompletableFuture[]::new)); - return overwriteCf.thenAccept(nil -> { - OptionalLong minOffset = metaCache.values().stream().mapToLong(v -> v.offset).min(); - minOffset.ifPresent(this::trim); - }); + OptionalLong minOffset = metaCache.values().stream().mapToLong(v -> v.offset).min(); + // await overwrite complete then trim to the minimum offset in metaCache + return overwriteCf.thenAccept(nil -> minOffset.ifPresent(this::trim)); } static class MetadataValue {