CAMEL-23299: propagate transacted flag in AggregateProcessor for synchronous executor#22512
Conversation
|
🌟 Thank you for your contribution to the Apache Camel project! 🌟 🐫 Apache Camel Committers, please review the following items:
|
|
🧪 CI tested the following changed modules:
Build reactor — dependencies compiled but only changed modules were tested (2 modules)
|
gnodet
left a comment
There was a problem hiding this comment.
Nice fix, Federico — clean and well-targeted. A couple of observations:
Missing TRANSACTION_CONTEXT_DATA propagation?
The Splitter (line 216–219), MulticastProcessor (line 980–983), and RecipientListProcessor (line 309–312) all follow a two-step pattern after createCorrelatedCopy:
copy.getExchangeExtension().setTransacted(exchange.isTransacted());
if (exchange.isTransacted() && copy.getProperty(Exchange.TRANSACTION_CONTEXT_DATA) == null) {
copy.setProperty(Exchange.TRANSACTION_CONTEXT_DATA, ...);
}This PR only does step 1 (setting the transacted flag) but not step 2 (propagating TRANSACTION_CONTEXT_DATA). For the specific deadlock fix with manually-transacted exchanges this is fine, but in scenarios with a real transaction manager, the missing context data could cause issues downstream. Is this intentionally scoped differently or an oversight?
Minor: JIRA link
The PR body references CAMEL-23281 and CAMEL-23030 but doesn't include a direct link to CAMEL-23299 at the top.
Otherwise the fix looks good — well-commented, properly guarded for SynchronousExecutorService, and the test coverage with @Timeout(30) is solid for deadlock reproducers.
Claude Code on behalf of Guillaume Nodet
|
good point @gnodet I'll add the |
…hronous executor Exchange.copy() does not preserve the transacted flag. When the aggregate uses SynchronousExecutorService, the completion runs on the caller thread and can legitimately be transactional. Propagate the flag so Pipeline.process() uses scheduleQueue instead of scheduleMain, avoiding a queue-swap deadlock in the reactive executor's executeFromQueue loop.
70aaf4e to
cd08ec6
Compare
Backport of #22511 to main.
Summary
Exchange.copy()does not preserve the transacted flag. When the aggregate usesSynchronousExecutorService, the completion runs on the caller thread and can legitimately be transactionalPipeline.process()usesscheduleQueueinstead ofscheduleMain, avoiding a queue-swap deadlock in the reactive executor'sexecuteFromQueueloopSplitter(line 216)Test plan
SplitAggregateDirectRouteTransactedDeadlockTest— transacted split + aggregate routed to a separatedirect:route (deadlocked before, passes now)SplitAggregateInChoiceSynchronousExecutorTest— existing CAMEL-23281 tests still passSplitAggregateSynchronousExecutorStackOverflowIssueTest— CAMEL-23030 regression guard still passes