[fix][broker] Fix race condition in ServerCnx producer/consumer async callbacks#25352
Merged
lhotari merged 2 commits intoapache:masterfrom Mar 19, 2026
Merged
Conversation
… callbacks The ServerCnx producers and consumers ConcurrentLongHashMap maps are designed with concurrencyLevel=1, assuming all accesses happen on the same Netty IO thread (ctx.executor()). However, most async callbacks in the producer and consumer lifecycle used synchronous variants (thenAccept, thenCompose, etc.) instead of async variants with ctx.executor(). The issue is how CompletableFuture chaining works without an explicit executor: - If the upstream future is NOT yet completed when thenCompose(fn) is called, fn is queued as a dependent and runs when the completing thread calls complete(). Multiple chained stages execute in registration order since the completing thread walks the chain sequentially. - If the upstream future is ALREADY completed when thenCompose(fn) is called, fn runs IMMEDIATELY on the calling thread, right there in the thenCompose call. It skips the queue entirely. This distinction causes the create/close/create producer race: Both createProducer1 and createProducer2 call getOrCreateTopic() and chain thenCompose(topic -> addProducer(...)). When the topic future is not yet completed, both callbacks are queued as dependents and execute in order. But if the topic future is already completed when createProducer2 chains on it, producer2's addProducer() runs immediately inline - potentially before producer1's cleanup from the close command has finished. Producer1 wins the race to register, producer2's addProducer() fails with "already connected", and the client never gets a success response. The fix: using thenComposeAsync(fn, ctx.executor()) forces fn to always be submitted to the executor's task queue, regardless of whether the future is already completed. This guarantees FIFO ordering - all stages go through the same queue, so the close handler runs before producer2's creation, and producer1's cleanup completes before producer2 tries to register. Changes: - handleProducer chain: thenApplyAsync, thenComposeAsync, thenRunAsync, exceptionallyAsync with ctx.executor() - buildProducerAndAddTopic: thenAcceptAsync, thenRunAsync - handleSubscribe chain: thenApplyAsync, thenAcceptAsync, exceptionallyAsync - handleCloseProducer: thenAcceptAsync - safelyRemoveProducer/Consumer: whenCompleteAsync - ServerCnxTest: add channel.runPendingTasks() in getResponse() polling loop so EmbeddedChannel executor tasks are processed
…-callback-threading
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## master #25352 +/- ##
=============================================
+ Coverage 37.45% 72.73% +35.27%
+ Complexity 13161 2457 -10704
=============================================
Files 1897 1954 +57
Lines 150557 154768 +4211
Branches 17215 17718 +503
=============================================
+ Hits 56398 112572 +56174
+ Misses 86428 33146 -53282
- Partials 7731 9050 +1319
Flags with carried forward coverage won't be shown. Click here to find out more.
🚀 New features to boost your workflow:
|
lhotari
approved these changes
Mar 19, 2026
Member
lhotari
left a comment
There was a problem hiding this comment.
LGTM. This explains some related issues that we have seen in the past.
Technoboy-
pushed a commit
to Technoboy-/pulsar
that referenced
this pull request
Mar 19, 2026
Technoboy-
pushed a commit
that referenced
this pull request
Mar 19, 2026
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Flaky test failure
Summary
Fix a race condition in
ServerCnxwhere producer/consumer lifecycle callbacks could execute out of order, causing the create/close/create producer sequence to fail intermittently.Root Cause
The
ServerCnxproducers and consumersConcurrentLongHashMapmaps are designed withconcurrencyLevel=1, assuming all accesses happen on the same Netty IO thread (ctx.executor()). However, most async callbacks used synchronousCompletableFuturevariants (thenAccept,thenCompose, etc.) instead of async variants withctx.executor().The issue is how
CompletableFuturechaining works without an explicit executor:If the upstream future is NOT yet completed when
thenCompose(fn)is called,fnis queued as a dependent and runs when the completing thread callscomplete(). Multiple chained stages execute in registration order since the completing thread walks the chain sequentially.If the upstream future is ALREADY completed when
thenCompose(fn)is called,fnruns IMMEDIATELY on the calling thread, right there in thethenComposecall. It skips the queue entirely.This distinction causes the create/close/create producer race: both
createProducer1andcreateProducer2callgetOrCreateTopic()and chainthenCompose(topic -> addProducer(...)). When the topic future is not yet completed, both callbacks are queued as dependents and execute in order. But if the topic future is already completed whencreateProducer2chains on it, producer2'saddProducer()runs immediately inline — potentially before producer1's cleanup from the close command has finished. Producer1 wins the race to register, producer2'saddProducer()fails with "already connected", and the client never gets a success response.Fix
Using
thenComposeAsync(fn, ctx.executor())forcesfnto always be submitted to the executor's task queue, regardless of whether the future is already completed. This guarantees FIFO ordering — all stages go through the same queue, so the close handler runs before producer2's creation, and producer1's cleanup completes before producer2 tries to register.Changes
handleProducerchain:thenApplyAsync,thenComposeAsync,thenRunAsync,exceptionallyAsyncwithctx.executor()buildProducerAndAddTopic:thenAcceptAsync,thenRunAsynchandleSubscribechain:thenApplyAsync,thenAcceptAsync,exceptionallyAsynchandleCloseProducer:thenAcceptAsyncsafelyRemoveProducer/Consumer:whenCompleteAsyncServerCnxTest: addchannel.runPendingTasks()ingetResponse()polling loop soEmbeddedChannelexecutor tasks are processedDocumentation
doc-not-needed(Your PR doesn't need any doc update)
Matching PR in forked repository
No response
Tip
Add the labels
ready-to-testandarea/testto trigger the CI.