diff --git a/core/src/main/scala/kafka/log/streamaspect/MetaStream.java b/core/src/main/scala/kafka/log/streamaspect/MetaStream.java index 8e4cdb86b3..227837ff48 100644 --- a/core/src/main/scala/kafka/log/streamaspect/MetaStream.java +++ b/core/src/main/scala/kafka/log/streamaspect/MetaStream.java @@ -17,15 +17,15 @@ package kafka.log.streamaspect; -import com.automq.stream.s3.context.AppendContext; -import com.automq.stream.s3.context.FetchContext; -import io.netty.buffer.Unpooled; import com.automq.stream.DefaultRecordBatch; import com.automq.stream.api.AppendResult; import com.automq.stream.api.FetchResult; import com.automq.stream.api.RecordBatch; import com.automq.stream.api.RecordBatchWithContext; import com.automq.stream.api.Stream; +import com.automq.stream.s3.context.AppendContext; +import com.automq.stream.s3.context.FetchContext; +import io.netty.buffer.Unpooled; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -55,7 +55,7 @@ public class MetaStream implements Stream { public static final String LEADER_EPOCH_CHECKPOINT_KEY = "LEADER_EPOCH_CHECKPOINT"; public static final Logger LOGGER = LoggerFactory.getLogger(MetaStream.class); - private static final double COMPACTION_HOLLOW_RATE = 0.95; + private static final double COMPACTION_HOLLOW_RATE = 0.6; private static final long COMPACTION_THRESHOLD_MS = TimeUnit.MINUTES.toMillis(1); private final Stream innerStream; @@ -163,7 +163,7 @@ public CompletableFuture close() { if (compactionFuture != null) { compactionFuture.cancel(true); } - return doCompaction() + return doCompaction(true) .thenRun(innerStream::close) .thenRun(() -> fenced = true); } @@ -264,13 +264,16 @@ private Map getValidMetaMap() { private void tryCompaction() { if (compactionFuture != null) { - compactionFuture.cancel(true); + return; } // trigger after 10s to avoid compacting too quick - compactionFuture = scheduler.schedule(this::doCompaction, 10, TimeUnit.SECONDS); + compactionFuture = scheduler.schedule(() -> { + doCompaction(false); + this.compactionFuture = null; + }, 10, TimeUnit.SECONDS); } - private synchronized CompletableFuture doCompaction() { + private synchronized CompletableFuture doCompaction(boolean force) { if (!replayDone) { return CompletableFuture.completedFuture(null); } @@ -280,8 +283,8 @@ private synchronized CompletableFuture doCompaction() { if (size == 0) { return CompletableFuture.completedFuture(null); } - double hollowRate = (double) metaCache.size() / size; - if (hollowRate < COMPACTION_HOLLOW_RATE) { + double hollowRate = 1 - (double) metaCache.size() / size; + if (!force && hollowRate >= COMPACTION_HOLLOW_RATE) { return CompletableFuture.completedFuture(null); } MetadataValue last = null; @@ -294,11 +297,14 @@ private synchronized CompletableFuture doCompaction() { for (Map.Entry entry : metaCache.entrySet()) { String key = entry.getKey(); MetadataValue value = entry.getValue(); - if (value == last || last.timestamp - value.timestamp < COMPACTION_THRESHOLD_MS) { + if (value == last || (!force && last.timestamp - value.timestamp < COMPACTION_THRESHOLD_MS)) { continue; } overwrite.add(MetaKeyValue.of(key, value.value())); } + if (overwrite.isEmpty()) { + return CompletableFuture.completedFuture(null); + } CompletableFuture overwriteCf = CompletableFuture.allOf(overwrite.stream().map(this::append).toArray(CompletableFuture[]::new)); return overwriteCf.thenAccept(nil -> { OptionalLong minOffset = metaCache.values().stream().mapToLong(v -> v.offset).min();