diff --git a/s3stream/src/main/java/com/automq/stream/s3/cache/LogCache.java b/s3stream/src/main/java/com/automq/stream/s3/cache/LogCache.java index 1477658b43..1fd7e07eb0 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/cache/LogCache.java +++ b/s3stream/src/main/java/com/automq/stream/s3/cache/LogCache.java @@ -59,6 +59,7 @@ public class LogCache { private static final Consumer DEFAULT_BLOCK_FREE_LISTENER = block -> { }; private static final int MAX_BLOCKS_COUNT = 64; + static final int MERGE_BLOCK_THRESHOLD = 8; final List blocks = new ArrayList<>(); final AtomicInteger blockCount = new AtomicInteger(1); private final long capacity; @@ -301,27 +302,49 @@ private void tryRealFree() { } private void tryMerge() { - writeLock.lock(); - try { - // merge blocks to speed up the get. - LogCacheBlock mergedBlock = null; - Iterator iter = blocks.iterator(); - while (iter.hasNext()) { - LogCacheBlock block = iter.next(); - if (!block.free) { - break; + // merge blocks to speed up the get. + int mergeStartIndex = 0; + for (; ; ) { + LogCacheBlock left; + LogCacheBlock right; + writeLock.lock(); + try { + if (blocks.size() <= MERGE_BLOCK_THRESHOLD || mergeStartIndex + 1 >= blocks.size()) { + return; + } + left = blocks.get(mergeStartIndex); + right = blocks.get(mergeStartIndex + 1); + if (!left.free || !right.free) { + return; } - if (mergedBlock == null - || mergedBlock.size() + block.size() >= cacheBlockMaxSize - || isDiscontinuous(mergedBlock, block)) { - mergedBlock = block; + if (left.size() + right.size() >= cacheBlockMaxSize) { + mergeStartIndex++; continue; } - mergeBlock(mergedBlock, block); - iter.remove(); + } finally { + writeLock.unlock(); + } + // Move costly operation(isDiscontinuous, mergeBlock) out of the lock. + if (isDiscontinuous(left, right)) { + mergeStartIndex++; + continue; + } + LogCacheBlock newBlock = new LogCacheBlock(Integer.MAX_VALUE); + mergeBlock(newBlock, left); + mergeBlock(newBlock, right); + newBlock.free = true; + writeLock.lock(); + try { + if (blocks.size() > mergeStartIndex + 1 + && blocks.get(mergeStartIndex) == left + && blocks.get(mergeStartIndex + 1) == right + ) { + blocks.set(mergeStartIndex, newBlock); + blocks.remove(mergeStartIndex + 1); + } + } finally { + writeLock.unlock(); } - } finally { - writeLock.unlock(); } } diff --git a/s3stream/src/test/java/com/automq/stream/s3/cache/LogCacheTest.java b/s3stream/src/test/java/com/automq/stream/s3/cache/LogCacheTest.java index 557c012736..de08088c4a 100644 --- a/s3stream/src/test/java/com/automq/stream/s3/cache/LogCacheTest.java +++ b/s3stream/src/test/java/com/automq/stream/s3/cache/LogCacheTest.java @@ -161,8 +161,47 @@ public void testMergeBlock() { assertEquals(201, stream235.endOffset()); assertEquals(1, stream235.records.size()); assertEquals(200, stream235.records.get(0).getBaseOffset()); + } + @Test + public void testTryMergeLogic() { + LogCache logCache = new LogCache(Long.MAX_VALUE, 10_000L); + final long streamId = 233L; + final int blocksToCreate = LogCache.MERGE_BLOCK_THRESHOLD + 2; + + // create multiple blocks, each containing one record for the same stream with contiguous offsets + for (int i = 0; i < blocksToCreate; i++) { + logCache.put(new StreamRecordBatch(streamId, 0L, i, 1, TestUtils.random(1))); + logCache.archiveCurrentBlock(); + } + int before = logCache.blocks.size(); + assertTrue(before > LogCache.MERGE_BLOCK_THRESHOLD, "need more than 8 blocks to exercise tryMerge"); + + LogCache.LogCacheBlock left = logCache.blocks.get(0); + LogCache.LogCacheBlock right = logCache.blocks.get(1); + + // verify contiguous condition before merge: left.end == right.start + LogCache.StreamCache leftCache = left.map.get(streamId); + LogCache.StreamCache rightCache = right.map.get(streamId); + assertEquals(leftCache.endOffset(), rightCache.startOffset()); + + // mark both blocks free to trigger tryMerge (called inside markFree) + logCache.markFree(left); + logCache.markFree(right); + + int after = logCache.blocks.size(); + assertEquals(before - 1, after, "two adjacent free contiguous blocks should be merged into one"); + + // verify merged block contains both records and correct range + LogCache.LogCacheBlock merged = logCache.blocks.get(0); + assertTrue(merged.free); + LogCache.StreamCache mergedCache = merged.map.get(streamId); + assertEquals(2, mergedCache.records.size()); + assertEquals(0L, mergedCache.startOffset()); + assertEquals(2L, mergedCache.endOffset()); + assertEquals(0L, mergedCache.records.get(0).getBaseOffset()); + assertEquals(1L, mergedCache.records.get(1).getBaseOffset()); } }