-
Notifications
You must be signed in to change notification settings - Fork 554
perf(s3stream): async heavy log cache operation #3016
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Signed-off-by: Robin Han <hanxvdovehx@gmail.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR improves performance by making heavy log cache operations asynchronous. The markFree method now returns a CompletableFuture<Void> and executes the merge operation (tryMerge()) asynchronously on a dedicated single-threaded executor. Additionally, block cleanup operations in tryRealFree() are also executed asynchronously.
- Changed
markFree()return type fromvoidtoCompletableFuture<Void>for async merge operations - Introduced a static single-threaded executor
LOG_CACHE_ASYNC_EXECUTORfor async operations - Made block cleanup operations in
tryRealFree()async
Reviewed Changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 6 comments.
| File | Description |
|---|---|
| s3stream/src/main/java/com/automq/stream/s3/cache/LogCache.java | Introduced async executor and converted markFree() and tryRealFree() cleanup to async operations |
| s3stream/src/test/java/com/automq/stream/s3/cache/LogCacheTest.java | Updated test to handle the new async markFree() API by calling .get() on returned futures |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| blockFreeListener.accept(b); | ||
| b.free(); |
Copilot
AI
Nov 17, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The lambda removed.forEach(b -> { blockFreeListener.accept(b); b.free(); }) is executed asynchronously without error handling. If blockFreeListener.accept(b) or b.free() throws an exception, it will be silently swallowed by the executor. Consider wrapping the operations in a try-catch block to log any failures:
LOG_CACHE_ASYNC_EXECUTOR.execute(() -> removed.forEach(b -> {
try {
blockFreeListener.accept(b);
b.free();
} catch (Throwable t) {
LOGGER.error("Failed to free block", t);
}
}));| blockFreeListener.accept(b); | |
| b.free(); | |
| try { | |
| blockFreeListener.accept(b); | |
| b.free(); | |
| } catch (Throwable t) { | |
| LOGGER.error("Failed to free block", t); | |
| } |
|
|
||
| public void markFree(LogCacheBlock block) { | ||
| public CompletableFuture<Void> markFree(LogCacheBlock block) { | ||
| block.free = true; |
Copilot
AI
Nov 17, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Setting block.free = true is not synchronized with the lock that protects the blocks list. This could lead to a race condition where:
- Thread A marks block as free (line 268)
- Thread B reads the block list and sees block.free = false (before the write is visible)
- Thread A's tryMerge checks block.free = true
While the locks in tryMerge() and tryRealFree() protect the list modifications, the initial block.free = true assignment should be either:
- Marked as
volatilein the LogCacheBlock class - Performed inside a lock
- Use an AtomicBoolean
This ensures visibility across threads.
| } | ||
|
|
||
| } | ||
|
|
Copilot
AI
Nov 17, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The markFree method lacks documentation about its asynchronous behavior. Consider adding JavaDoc that explains:
- That the method returns immediately after marking the block as free, with the merge operation happening asynchronously
- When callers should await the returned CompletableFuture (e.g., in tests or when merge completion is required)
- That exceptions from the async merge operation will complete the future exceptionally
Example:
/**
* Marks a cache block as free and asynchronously attempts to merge adjacent free blocks.
*
* @param block the block to mark as free
* @return a CompletableFuture that completes when the merge operation finishes, or completes
* exceptionally if the merge fails
*/| /** | |
| * Marks the given cache block as free and asynchronously attempts to merge adjacent free blocks. | |
| * <p> | |
| * This method returns immediately after marking the block as free. The merge operation is performed | |
| * asynchronously on a background executor. | |
| * <p> | |
| * Callers should await the returned {@link CompletableFuture} if they require the merge operation to be | |
| * completed before proceeding (e.g., in tests or when subsequent operations depend on merge completion). | |
| * <p> | |
| * If an exception occurs during the asynchronous merge, the returned future will be completed exceptionally. | |
| * | |
| * @param block the block to mark as free | |
| * @return a {@link CompletableFuture} that completes when the merge operation finishes, or completes | |
| * exceptionally if the merge fails | |
| */ |
| LOG_CACHE_ASYNC_EXECUTOR.execute(() -> { | ||
| try { | ||
| tryMerge(); | ||
| cf.complete(null); | ||
| } catch (Throwable t) { | ||
| cf.completeExceptionally(t); | ||
| } | ||
| }); | ||
| return cf; |
Copilot
AI
Nov 17, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The markFree method uses a single-threaded executor (LOG_CACHE_ASYNC_EXECUTOR) for both tryMerge() and the cleanup operations in tryRealFree(). Since tryMerge() can be a long-running operation (it has an infinite loop), this could block cleanup operations from tryRealFree() that are queued after it. Consider:
- Using separate executors for merge and cleanup operations
- Adding timeout/cancellation mechanisms to prevent one operation from blocking others indefinitely
- Documenting the expected execution characteristics and ordering guarantees
| public CompletableFuture<Void> markFree(LogCacheBlock block) { | ||
| block.free = true; | ||
| tryRealFree(); | ||
| tryMerge(); | ||
| CompletableFuture<Void> cf = new CompletableFuture<>(); | ||
| LOG_CACHE_ASYNC_EXECUTOR.execute(() -> { | ||
| try { | ||
| tryMerge(); | ||
| cf.complete(null); | ||
| } catch (Throwable t) { | ||
| cf.completeExceptionally(t); | ||
| } | ||
| }); | ||
| return cf; |
Copilot
AI
Nov 17, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The signature of markFree has changed from void to CompletableFuture<Void>, which is a breaking API change. Existing callers that don't await this future may experience race conditions where they assume the merge operation has completed when it hasn't. Consider:
- Adding documentation about the async behavior and when callers need to wait for completion
- Reviewing all call sites to ensure they properly handle the returned future (note:
S3Storage.freeCache()andSnapshotReadCache.put()currently ignore it)
| private static final ExecutorService LOG_CACHE_ASYNC_EXECUTOR = Threads.newFixedFastThreadLocalThreadPoolWithMonitor( | ||
| 1, "LOG_CACHE_ASYNC", true, LOGGER); |
Copilot
AI
Nov 17, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The LOG_CACHE_ASYNC_EXECUTOR is declared as a static field, which means it's shared across all LogCache instances in the JVM. This could lead to:
- Contention between multiple
LogCacheinstances competing for the single thread - Head-of-line blocking where a slow operation in one
LogCachedelays operations in another - Difficulty in testing/isolation since the executor is shared globally
Consider making this executor an instance field, or if global sharing is intentional, add documentation explaining the rationale and implications.
No description provided.