Skip to content

NIFI-15483: Route PublishAMQP to failure on undeliverable messages instead of silent success#11213

Open
rakesh-rsky wants to merge 3 commits intoapache:mainfrom
rakesh-rsky:fix/NIFI-15483-publishamqp-route-on-failure
Open

NIFI-15483: Route PublishAMQP to failure on undeliverable messages instead of silent success#11213
rakesh-rsky wants to merge 3 commits intoapache:mainfrom
rakesh-rsky:fix/NIFI-15483-publishamqp-route-on-failure

Conversation

@rakesh-rsky
Copy link
Copy Markdown

@rakesh-rsky rakesh-rsky commented May 6, 2026

Summary

PublishAMQP silently routes FlowFiles to REL_SUCCESS even when the AMQP broker cannot deliver the message, causing silent data loss.

Two failure modes are addressed:

  1. Undeliverable message (basic.return) — broker returns the message when no queue is bound to the exchange/routing-key. The fix uses AMQP Publisher Confirms + basic.return to detect and surface delivery failures, routing the FlowFile to REL_FAILURE.

  2. Exchange not found (ShutdownSignalException) — when the exchange does not exist, the broker closes the channel with 404 NOT_FOUND, causing waitForConfirms() to throw ShutdownSignalException. This is now caught and converted to AMQPException so the FlowFile routes to REL_FAILURE instead of causing an unhandled processor failure.

Testing

  • Publish to an exchange with no bound queues → FlowFile routes to failure with the broker's return reason
  • Publish to a non-existent exchange → FlowFile routes to failure instead of unhandled processor error
  • Normal publish (queue bound) → still routes to success
  • Added regression tests for all failure scenarios — verified to fail against unfixed code

Fixes: https://issues.apache.org/jira/browse/NIFI-15483

…r cannot deliver message

PublishAMQP uses mandatory=true on basicPublish() so the broker returns
messages it cannot route to any queue. However, the return arrives
asynchronously via ReturnListener.handleReturn() on the AMQP I/O thread
while the publishing thread had already moved on to session.transfer(REL_SUCCESS).
The UndeliverableMessageLogger only logged a warning — it never signaled
failure back to publish() or onTrigger(), so every unroutable message was
silently counted as a success despite never reaching any consumer.

Fix:
- Enabled Publisher Confirms (channel.confirmSelect()) in the constructor.
  The broker's basic.return frame for an unroutable message is guaranteed
  to arrive before the corresponding confirm frame, so waitForConfirms()
  acts as a synchronization barrier that makes return detection reliable.
- Added an AtomicReference<String> field (undeliverableReturnReason) that
  UndeliverableMessageLogger.handleReturn() populates with exchange/routingKey/
  replyCode/replyText when a message is returned.
- publish() now: resets the field before each call, calls waitForConfirms(5s)
  to synchronize with the broker, then checks the field and throws AMQPException
  if the message was returned — causing onTrigger() to route to REL_FAILURE.
- Broker NACKs (e.g., resource alarm) are also now surfaced as AMQPException
  because waitForConfirms() returns false on NACK.

Co-authored-by: Rakesh Kumar Singh <rsky.rakesh@gmail.com>
@rakesh-rsky rakesh-rsky force-pushed the fix/NIFI-15483-publishamqp-route-on-failure branch from 5da2156 to ace38df Compare May 6, 2026 11:09
@turcsanyip
Copy link
Copy Markdown
Contributor

@rakesh-rsky Thanks for working on this issue. Please note the unit tests are failing.

I tried the new error handling in my local environment but I'm getting the following runtime error (instead of routing the FlowFile to failure):

2026-05-06 14:34:55,697 ERROR [Timer-Driven Process Thread-4] o.a.nifi.amqp.processors.PublishAMQP PublishAMQP[id=4fd4ee4e-8e3a-3969-7db4-953498193e4d] Processor failure
com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'dummy' in vhost '/', class-id=60, method-id=40)
	at com.rabbitmq.client.impl.ChannelN.waitForConfirms(ChannelN.java:219)
	at org.apache.nifi.amqp.processors.AMQPPublisher.publish(AMQPPublisher.java:108)
	at org.apache.nifi.amqp.processors.PublishAMQP.processResource(PublishAMQP.java:185)
	at org.apache.nifi.amqp.processors.PublishAMQP.processResource(PublishAMQP.java:52)
	at org.apache.nifi.amqp.processors.AbstractAMQPProcessor.onTrigger(AbstractAMQPProcessor.java:236)
	at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
	at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1292)
	at org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:229)
	at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:102)
	at org.apache.nifi.engine.FlowEngine.lambda$wrap$1(FlowEngine.java:105)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:572)
	at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:358)
	at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
	at java.base/java.lang.Thread.run(Thread.java:1583)
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'dummy' in vhost '/', class-id=60, method-id=40)
	at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:529)
	at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:350)
	at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:193)
	at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:125)
	at com.rabbitmq.client.impl.AMQConnection.readFrame(AMQConnection.java:768)
	at com.rabbitmq.client.impl.AMQConnection.access$400(AMQConnection.java:49)
	at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:695)
	... 1 common frames omitted

@turcsanyip turcsanyip self-requested a review May 6, 2026 12:43
…ssing exchange

When the broker closes the channel with a 404 NOT_FOUND error (e.g., exchange
does not exist), waitForConfirms() throws ShutdownSignalException instead of
returning normally. This was propagating as an unhandled processor failure
rather than routing the FlowFile to REL_FAILURE.

- Added ShutdownSignalException to the catch block in AMQPPublisher.publish()
- Converts the channel-close signal into AMQPException so PublishAMQP routes
  the FlowFile to REL_FAILURE with a descriptive error message
- Added ShutdownSignalException import

Co-authored-by: Rakesh Kumar Singh <rsky.rakesh@gmail.com>
…dling

Added regression tests to verify that AMQPPublisher and PublishAMQP correctly
route FlowFiles to REL_FAILURE for all broker-side failure modes:

- TestChannel: implemented confirmSelect() (no-op) and waitForConfirms(timeout)
  with simulation flags for ShutdownSignalException and NACK
- TestChannel: added simulateSynchronousReturn flag for deterministic
  undeliverable-message tests (fires ReturnListeners synchronously)
- TestConnection: exposed getTestChannel() for test configuration
- AMQPPublisherTest: added 3 new unit tests
    * failPublishWhenBrokerClosesChannelDuringConfirm
    * failPublishWhenBrokerNacksMessage
    * failPublishWhenMessageReturnedAsUndeliverable
- PublishAMQPTest: added 2 new integration tests
    * validateFlowFileRoutedToFailureWhenBrokerClosesChannel
    * validateFlowFileRoutedToFailureOnBrokerNack

All 45 tests pass. Tests were verified to FAIL without the corresponding fix.

Co-authored-by: Rakesh Kumar Singh <rsky.rakesh@gmail.com>
@rakesh-rsky
Copy link
Copy Markdown
Author

@turcsanyip Thank you for validating this.

Root cause: When the exchange does not exist, the broker closes the channel with 404 NOT_FOUND, causing waitForConfirms() to throw ShutdownSignalException — which was not caught, resulting in an unhandled processor failure instead of routing to REL_FAILURE.

This has been fixed in the latest commits — ShutdownSignalException is now caught and converted to AMQPException, along with regression tests covering this and related broker failure scenarios.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants