Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 18 additions & 4 deletions s3stream/src/main/java/com/automq/stream/s3/cache/LogCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.automq.stream.s3.model.StreamRecordBatch;
import com.automq.stream.s3.trace.context.TraceContext;
import com.automq.stream.s3.wal.RecordOffset;
import com.automq.stream.utils.Threads;
import com.automq.stream.utils.biniarysearch.StreamRecordBatchList;

import org.slf4j.Logger;
Expand All @@ -37,7 +38,9 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
Expand All @@ -59,6 +62,8 @@ public class LogCache {
private static final Consumer<LogCacheBlock> DEFAULT_BLOCK_FREE_LISTENER = block -> {
};
private static final int MAX_BLOCKS_COUNT = 64;
private static final ExecutorService LOG_CACHE_ASYNC_EXECUTOR = Threads.newFixedFastThreadLocalThreadPoolWithMonitor(
1, "LOG_CACHE_ASYNC", true, LOGGER);
Comment on lines +65 to +66
Copy link

Copilot AI Nov 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The LOG_CACHE_ASYNC_EXECUTOR is declared as a static field, which means it's shared across all LogCache instances in the JVM. This could lead to:

  1. Contention between multiple LogCache instances competing for the single thread
  2. Head-of-line blocking where a slow operation in one LogCache delays operations in another
  3. Difficulty in testing/isolation since the executor is shared globally

Consider making this executor an instance field, or if global sharing is intentional, add documentation explaining the rationale and implications.

Copilot uses AI. Check for mistakes.
static final int MERGE_BLOCK_THRESHOLD = 8;
final List<LogCacheBlock> blocks = new ArrayList<>();
final AtomicInteger blockCount = new AtomicInteger(1);
Expand Down Expand Up @@ -259,10 +264,19 @@ Optional<LogCacheBlock> archiveCurrentBlockIfContains0(long streamId) {

}

Copy link

Copilot AI Nov 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The markFree method lacks documentation about its asynchronous behavior. Consider adding JavaDoc that explains:

  1. That the method returns immediately after marking the block as free, with the merge operation happening asynchronously
  2. When callers should await the returned CompletableFuture (e.g., in tests or when merge completion is required)
  3. That exceptions from the async merge operation will complete the future exceptionally

Example:

/**
 * Marks a cache block as free and asynchronously attempts to merge adjacent free blocks.
 * 
 * @param block the block to mark as free
 * @return a CompletableFuture that completes when the merge operation finishes, or completes
 *         exceptionally if the merge fails
 */
Suggested change
/**
* Marks the given cache block as free and asynchronously attempts to merge adjacent free blocks.
* <p>
* This method returns immediately after marking the block as free. The merge operation is performed
* asynchronously on a background executor.
* <p>
* Callers should await the returned {@link CompletableFuture} if they require the merge operation to be
* completed before proceeding (e.g., in tests or when subsequent operations depend on merge completion).
* <p>
* If an exception occurs during the asynchronous merge, the returned future will be completed exceptionally.
*
* @param block the block to mark as free
* @return a {@link CompletableFuture} that completes when the merge operation finishes, or completes
* exceptionally if the merge fails
*/

Copilot uses AI. Check for mistakes.
public void markFree(LogCacheBlock block) {
public CompletableFuture<Void> markFree(LogCacheBlock block) {
block.free = true;
Copy link

Copilot AI Nov 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Setting block.free = true is not synchronized with the lock that protects the blocks list. This could lead to a race condition where:

  1. Thread A marks block as free (line 268)
  2. Thread B reads the block list and sees block.free = false (before the write is visible)
  3. Thread A's tryMerge checks block.free = true

While the locks in tryMerge() and tryRealFree() protect the list modifications, the initial block.free = true assignment should be either:

  1. Marked as volatile in the LogCacheBlock class
  2. Performed inside a lock
  3. Use an AtomicBoolean

This ensures visibility across threads.

Copilot uses AI. Check for mistakes.
tryRealFree();
tryMerge();
CompletableFuture<Void> cf = new CompletableFuture<>();
LOG_CACHE_ASYNC_EXECUTOR.execute(() -> {
try {
tryMerge();
cf.complete(null);
} catch (Throwable t) {
cf.completeExceptionally(t);
}
});
return cf;
Comment on lines +271 to +279
Copy link

Copilot AI Nov 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The markFree method uses a single-threaded executor (LOG_CACHE_ASYNC_EXECUTOR) for both tryMerge() and the cleanup operations in tryRealFree(). Since tryMerge() can be a long-running operation (it has an infinite loop), this could block cleanup operations from tryRealFree() that are queued after it. Consider:

  1. Using separate executors for merge and cleanup operations
  2. Adding timeout/cancellation mechanisms to prevent one operation from blocking others indefinitely
  3. Documenting the expected execution characteristics and ordering guarantees

Copilot uses AI. Check for mistakes.
Comment on lines +267 to +279
Copy link

Copilot AI Nov 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The signature of markFree has changed from void to CompletableFuture<Void>, which is a breaking API change. Existing callers that don't await this future may experience race conditions where they assume the merge operation has completed when it hasn't. Consider:

  1. Adding documentation about the async behavior and when callers need to wait for completion
  2. Reviewing all call sites to ensure they properly handle the returned future (note: S3Storage.freeCache() and SnapshotReadCache.put() currently ignore it)

Copilot uses AI. Check for mistakes.
}

private void tryRealFree() {
Expand Down Expand Up @@ -295,10 +309,10 @@ private void tryRealFree() {
writeLock.unlock();
}
size.addAndGet(-freeSize);
removed.forEach(b -> {
LOG_CACHE_ASYNC_EXECUTOR.execute(() -> removed.forEach(b -> {
blockFreeListener.accept(b);
b.free();
Comment on lines 313 to 314
Copy link

Copilot AI Nov 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The lambda removed.forEach(b -> { blockFreeListener.accept(b); b.free(); }) is executed asynchronously without error handling. If blockFreeListener.accept(b) or b.free() throws an exception, it will be silently swallowed by the executor. Consider wrapping the operations in a try-catch block to log any failures:

LOG_CACHE_ASYNC_EXECUTOR.execute(() -> removed.forEach(b -> {
    try {
        blockFreeListener.accept(b);
        b.free();
    } catch (Throwable t) {
        LOGGER.error("Failed to free block", t);
    }
}));
Suggested change
blockFreeListener.accept(b);
b.free();
try {
blockFreeListener.accept(b);
b.free();
} catch (Throwable t) {
LOGGER.error("Failed to free block", t);
}

Copilot uses AI. Check for mistakes.
});
}));
}

private void tryMerge() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
Expand Down Expand Up @@ -164,7 +165,7 @@ public void testMergeBlock() {
}

@Test
public void testTryMergeLogic() {
public void testTryMergeLogic() throws ExecutionException, InterruptedException {
LogCache logCache = new LogCache(Long.MAX_VALUE, 10_000L);
final long streamId = 233L;
final int blocksToCreate = LogCache.MERGE_BLOCK_THRESHOLD + 2;
Expand All @@ -187,8 +188,8 @@ public void testTryMergeLogic() {
assertEquals(leftCache.endOffset(), rightCache.startOffset());

// mark both blocks free to trigger tryMerge (called inside markFree)
logCache.markFree(left);
logCache.markFree(right);
logCache.markFree(left).get();
logCache.markFree(right).get();

int after = logCache.blocks.size();
assertEquals(before - 1, after, "two adjacent free contiguous blocks should be merged into one");
Expand Down
Loading