DATAGO-134580: Recover error-queue producer from unsolicited CloseFlow#142
Merged
Conversation
The reactive + proactive recreate-on-stale logic added in PR #141 (commits 931f09c..134e7ef) protects each binding's per-binding XMLMessageProducer in JCSMPOutboundMessageHandler. The error-queue republish path in ErrorQueueInfrastructure has the same exposure but on a different producer: it borrows the session-default producer from JCSMPSessionProducerManager via producerManager.get(producerKey) and historically had no recovery logic when the broker tore that producer down via unsolicited CloseFlow. Failure mode without this fix: when the broker fans out CloseFlow (message-spool maintenance, DR failover, "503: Service Unavailable"), the shared session-default producer is marked closed by JCSMP. Every subsequent error-queue republish in ErrorQueueInfrastructure.send() throws StaleSessionException / JCSMPTransportException / ClosedFacilityException; ErrorQueueRepublishCorrelationKey.handleError() catches at the message-retry level and re-attempts up to maxDeliveryAttempts - all attempts re-using the same dead producer reference, all doomed to fail. After max attempts the message is re-queued onto the original consumer queue, the consumer redelivers it, fails again, hits the error-queue path again, fails again. Net effect: failed-consumer messages disappear from the system after a DR failover or spool maintenance event. The fix mirrors the outbound-handler approach: - Proactive: at the top of send(), after producerManager.get(...), check producer.isClosed(). If true, call the new producerManager.forceRecreate() to rebuild the shared producer before send is attempted. - Reactive: wrap producer.send(...) in a try-catch. On StaleSessionException, JCSMPTransportException, ClosedFacilityException, or post-failure producer.isClosed(), call forceRecreate() so the next ErrorQueueRepublishCorrelationKey retry-loop iteration picks up a fresh producer. The original exception still propagates so the retry caller can do its errorQueueDeliveryAttempt++ bookkeeping. The shared producer is reference-counted across the entire session (JCSMPOutboundMessageHandler also registers itself for ref-count purposes even though it uses its own per-binding producer for sends). release() + get() does NOT work as a recovery primitive in production because it only closes the resource when registeredIds.size() <= 1 - in any deployment with at least one outbound binding, the ref-count stays > 1 and release() leaves the dead resource in place. The new forceRecreate() in SharedResourceManager sidesteps the ref-count: it unconditionally closes the current resource and create()s a new one under the existing lock, leaving registrations intact so every already-registered caller picks up the fresh resource on their next get(). Added as a generic method on SharedResourceManager since the recovery contract is independent of the JCSMP specifics. Tests (ErrorQueueInfrastructureTest, new): - test_errorQueueProducerRecreatedProactivelyOnIsClosed: closed producer detected before send -> forceRecreate -> fresh producer services the publish; stale producer never sent through (Mockito.never()). - test_errorQueueProducerRecreatedReactivelyOnStaleSendException: @CartesianTest over Stale / JCSMPTransport / ClosedFacility - verifies all three exception types trigger forceRecreate AND propagate to the retry caller (so handleError can drive its loop). - test_errorQueueProducerNotRecreatedOnUnrelatedJCSMPException: negative control - a non-stale JCSMPException (e.g. malformed message) propagates normally and does NOT churn the shared producer, guarding against an over-broad reactive arm. 417 binder-core unit tests green (was 411 + 6 new from this commit). This branch is layered on DATAGO-134580 (PR #141) so the new SharedResourceManager.forceRecreate() and the ErrorQueueInfrastructure changes can be reviewed alongside the related outbound-handler work.
There was a problem hiding this comment.
Pull request overview
This PR extends the stale-flow recovery work (DATAGO-134580) to the error-queue republish path by detecting and recovering a stale/closed shared session-default XMLMessageProducer (e.g., after unsolicited CloseFlow during broker maintenance/DR failover), preventing repeated republish failures that would otherwise exhaust delivery attempts and requeue indefinitely.
Changes:
- Added
SharedResourceManager.forceRecreate()to unconditionally replace a shared resource instance while preserving registrations. - Updated
ErrorQueueInfrastructure.send()with proactiveproducer.isClosed()detection and reactive stale-exception handling that triggers producer recreation for the next retry attempt. - Added unit tests covering proactive recreation, reactive recreation on specific stale exception types, and a negative control for unrelated
JCSMPExceptions.
Reviewed changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 1 comment.
| File | Description |
|---|---|
.../util/ErrorQueueInfrastructure.java |
Adds proactive + reactive stale-producer detection and calls into forceRecreate() to recover the shared producer used for error-queue republish. |
.../util/SharedResourceManager.java |
Introduces forceRecreate() to close and recreate the shared resource irrespective of registration count. |
.../util/ErrorQueueInfrastructureTest.java |
Adds unit tests validating producer recreation behavior and exception propagation expectations. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Three PR #142 review items: C1 (Copilot) + C3 (mayur-solace) - race in forceRecreate(). The original unconditional implementation could have two callers both observe the same stale shared resource, both enter forceRecreate(), and have the second caller close a healthy replacement that the first caller just installed. Fix: compare-and-swap. forceRecreate now takes an `expected` argument - the reference the caller observed. Under the lock, the manager recreates only if `sharedResource == expected`; otherwise it returns whatever a concurrent caller already installed without closing or re-creating anything. The caller-visible contract is now: pass what you observed, use what's returned. C2 (mayur-solace) - Javadoc on SharedResourceManager.forceRecreate referenced the broker / CloseFlow concern specifically. Since SharedResourceManager is generic and could host non-broker resources in the future, the docs are rewritten to describe the CAS contract generically without naming the JCSMP/broker context. ErrorQueueInfrastructure.send() updated at both call sites to pass the observed producer reference and use the value returned by forceRecreate (which may be the fresh one we requested, or the already-installed replacement another caller put in place). New unit test testErrorQueueProducerUsesManagerReturnedReferenceAfterForceRecreate simulates the exact race C1 flagged: stale producer observed, manager's CAS returns an already-installed replacement, send must use the replacement rather than the locally-observed stale one. Existing tests updated to pass the observed reference and verify CAS arguments. Also aligned the test method names to drop the test_ underscore form, matching the no-underscore convention used elsewhere in the binder-core test suite (e.g. SolaceErrorMessageHandlerTest). 418 binder-core unit tests green (was 417 + 1 new CAS-race test).
Per PR #142 follow-up: the previous Javadoc (24 lines, two paragraphs of explanation) was verbose for an IDE hover. Reduced to a single sentence describing the CAS contract plus the standard param/return/throws.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
The reactive + proactive recreate-on-stale logic added in PR #141 (commits 931f09c..134e7ef) protects each binding's per-binding XMLMessageProducer in JCSMPOutboundMessageHandler. The error-queue republish path in ErrorQueueInfrastructure has the same exposure but on a different producer: it borrows the session-default producer from JCSMPSessionProducerManager via producerManager.get(producerKey) and historically had no recovery logic when the broker tore that producer down via unsolicited CloseFlow.
Failure mode without this fix: when the broker fans out CloseFlow (message-spool maintenance, DR failover, "503: Service Unavailable"), the shared session-default producer is marked closed by JCSMP. Every subsequent error-queue republish in ErrorQueueInfrastructure.send() throws StaleSessionException / JCSMPTransportException / ClosedFacilityException; ErrorQueueRepublishCorrelationKey.handleError() catches at the message-retry level and re-attempts up to maxDeliveryAttempts - all attempts re-using the same dead producer reference, all doomed to fail. After max attempts the message is re-queued onto the original consumer queue, the consumer redelivers it, fails again, hits the error-queue path again, fails again. Net effect: failed-consumer messages disappear from the system after a DR failover or spool maintenance event.
The fix mirrors the outbound-handler approach:
Proactive: at the top of send(), after producerManager.get(...), check producer.isClosed(). If true, call the new producerManager.forceRecreate() to rebuild the shared producer before send is attempted.
Reactive: wrap producer.send(...) in a try-catch. On StaleSessionException, JCSMPTransportException, ClosedFacilityException, or post-failure producer.isClosed(), call forceRecreate() so the next ErrorQueueRepublishCorrelationKey retry-loop iteration picks up a fresh producer. The original exception still propagates so the retry caller can do its errorQueueDeliveryAttempt++ bookkeeping.
The shared producer is reference-counted across the entire session (JCSMPOutboundMessageHandler also registers itself for ref-count purposes even though it uses its own per-binding producer for sends). release() + get() does NOT work as a recovery primitive in production because it only closes the resource when registeredIds.size() <= 1 - in any deployment with at least one outbound binding, the ref-count stays > 1 and release() leaves the dead resource in place. The new forceRecreate() in SharedResourceManager sidesteps the ref-count: it unconditionally closes the current resource and create()s a new one under the existing lock, leaving registrations intact so every already-registered caller picks up the fresh resource on their next get(). Added as a generic method on SharedResourceManager since the recovery contract is independent of the JCSMP specifics.
Tests (ErrorQueueInfrastructureTest, new):
417 binder-core unit tests green (was 411 + 6 new from this commit).
This branch is layered on DATAGO-134580 (PR #141) so the new SharedResourceManager.forceRecreate() and the ErrorQueueInfrastructure changes can be reviewed alongside the related outbound-handler work.