diff --git a/s3stream/src/main/java/com/automq/stream/s3/cache/LogCache.java b/s3stream/src/main/java/com/automq/stream/s3/cache/LogCache.java index 6a7954a253..22941e5836 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/cache/LogCache.java +++ b/s3stream/src/main/java/com/automq/stream/s3/cache/LogCache.java @@ -57,7 +57,9 @@ public class LogCache { private static final int DEFAULT_MAX_BLOCK_STREAM_COUNT = 10000; private static final Consumer DEFAULT_BLOCK_FREE_LISTENER = block -> { }; + private static final int MAX_BLOCKS_COUNT = 64; final List blocks = new ArrayList<>(); + final AtomicInteger blockCount = new AtomicInteger(1); private final long capacity; private final long cacheBlockMaxSize; private final int maxCacheBlockStreamCount; @@ -214,6 +216,7 @@ public LogCacheBlock archiveCurrentBlock() { block.lastRecordOffset = lastRecordOffset; activeBlock = new LogCacheBlock(cacheBlockMaxSize, maxCacheBlockStreamCount); blocks.add(activeBlock); + blockCount.set(blocks.size()); return block; } finally { writeLock.unlock(); @@ -249,11 +252,12 @@ Optional archiveCurrentBlockIfContains0(long streamId) { public void markFree(LogCacheBlock block) { block.free = true; tryRealFree(); + tryMerge(); } private void tryRealFree() { long currSize = size.get(); - if (currSize <= capacity * 0.9) { + if (currSize <= capacity * 0.9 && blockCount.get() <= MAX_BLOCKS_COUNT) { return; } List removed = new ArrayList<>(); @@ -264,21 +268,35 @@ private void tryRealFree() { currSize = size.get(); Iterator iter = blocks.iterator(); while (iter.hasNext()) { - if (currSize - freeSize <= capacity * 0.9) { + LogCacheBlock block = iter.next(); + if (blockCount.get() <= MAX_BLOCKS_COUNT && currSize - freeSize <= capacity * 0.9) { break; } - LogCacheBlock block = iter.next(); if (block.free) { iter.remove(); freeSize += block.size(); removed.add(block); + blockCount.decrementAndGet(); } else { break; } } + } finally { + writeLock.unlock(); + } + size.addAndGet(-freeSize); + removed.forEach(b -> { + blockFreeListener.accept(b); + b.free(); + }); + } + + private void tryMerge() { + writeLock.lock(); + try { // merge blocks to speed up the get. LogCacheBlock mergedBlock = null; - iter = blocks.iterator(); + Iterator iter = blocks.iterator(); while (iter.hasNext()) { LogCacheBlock block = iter.next(); if (!block.free) { @@ -296,36 +314,6 @@ private void tryRealFree() { } finally { writeLock.unlock(); } - size.addAndGet(-freeSize); - removed.forEach(b -> { - blockFreeListener.accept(b); - b.free(); - }); - } - - public int forceFree(int required) { - AtomicInteger freedBytes = new AtomicInteger(); - List removed = new ArrayList<>(); - writeLock.lock(); - try { - blocks.removeIf(block -> { - if (!block.free || freedBytes.get() >= required) { - return false; - } - long blockSize = block.size(); - size.addAndGet(-blockSize); - freedBytes.addAndGet((int) blockSize); - removed.add(block); - return true; - }); - } finally { - writeLock.unlock(); - } - removed.forEach(b -> { - blockFreeListener.accept(b); - b.free(); - }); - return freedBytes.get(); } public void setLastRecordOffset(RecordOffset lastRecordOffset) {