CAMEL-23483: add manual acknowledgment support for camel-nats JetStream consumer#23144
Conversation
|
🌟 Thank you for your contribution to the Apache Camel project! 🌟 🐫 Apache Camel Committers, please review the following items:
|
|
Changes not staged for commit: |
386fea4 to
9413a6a
Compare
oscerd
left a comment
There was a problem hiding this comment.
Claude Code on behalf of Andrea Cosentino — this review checks the PR against the project's CLAUDE.md / contribution rules and does NOT replace CodeRabbit, Sourcery, or SonarCloud.
Overall this is a clean, well-scoped addition that follows the camel-kafka KafkaManualCommit precedent and reuses the existing SynchronizationAdapter plumbing — nice work. A few items below; (1) is blocking because CI is already failing on it.
Blocking
1. Missing regenerated files (CI failure cause).
The Build and test job on Java 21/25 fails on the "Fail if there are uncommitted changes" step. Three files were regenerated locally during a full reactor build but never committed:
modified: catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/nats.json
modified: dsl/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/NatsComponentBuilderFactory.java
modified: dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/NatsEndpointBuilderFactory.java
CLAUDE.md: "All generated files must be regenerated and committed (CI checks for uncommitted changes)." Run a full reactor build (or mvn -B install -DskipTests -pl :camel-endpointdsl,:camel-componentdsl,:camel-catalog -am) and add the three files to this PR. CAMEL-23032 (#21607), which introduced ackPolicy, updated these same three files — that is the established pattern for new options.
Suggested
2. Missing upgrade-guide entry — docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide-4_21.adoc.
The 4.21 upgrade guide already documents recent camel-nats additions under === camel-nats (e.g. pullBatchSize / pullFetchTimeout). A new public-API surface (NatsManualAck interface + manualAck option + CamelNatsManualAck header) is worth a short entry so users browsing what's new in 4.21 can discover it.
3. Test coverage gap on the new public interface.
The ITs cover ack() and nak() only. nakWithDelay(Duration), term() and inProgress() are part of the new public NatsManualAck API but have no tests. Two options that don't add much code:
- Extend the existing IT to exercise
nakWithDelay(Duration)(the redelivery-counter assertion already intestManualNakcovers this with one extra path) andterm()(assert no further delivery within a window). - Add a small unit test against a stub
io.nats.client.Messageto verifyDefaultNatsManualAckdelegation — analogous to how camel-kafka testsKafkaManualCommit.
4. ackPolicy=None interaction is silent.
With ackPolicy=None, the server side has no ack tracking, so calling manualAck.nak() / term() is effectively a no-op. A one-line note in the setManualAck Javadoc — e.g. "Has no effect when ackPolicy=None since the server acknowledges automatically." — would save users a debugging session.
5. JavaDoc says "only applicable when JetStream is enabled" but the consumer doesn't enforce it.
NatsConfiguration.setManualAck documents that the option is JetStream-only, but NatsConsumer.CamelNatsMessageHandler.onMessage sets the CamelNatsManualAck header (and bypasses auto-ack) for non-JetStream too. Either drop the Javadoc claim, or guard the header/bypass behind configuration.isJetstreamEnabled() to keep behavior consistent with the documentation.
Open question
- Should the consumer log a
WARNwhenmanualAck=trueis combined withackPolicy=Noneor withjetstreamEnabled=false, since both make manual ack meaningless? Not strictly required, but it would help users catch misconfigurations early.
This review was generated by an AI agent and may contain inaccuracies. Please verify all suggestions before applying.
| .process(exchange -> { | ||
| // do work ... | ||
|
|
||
| NatsManualAck manualAck = exchange.getIn().getHeader("CamelNatsManualAck", NatsManualAck.class); |
There was a problem hiding this comment.
Minor: prefer the constant over the string literal — matches how camel-kafka docs reference KafkaConstants.MANUAL_COMMIT and keeps the doc example aligned with NatsConstants.NATS_MANUAL_ACK.
| NatsManualAck manualAck = exchange.getIn().getHeader("CamelNatsManualAck", NatsManualAck.class); | |
| NatsManualAck manualAck = exchange.getIn().getHeader(NatsConstants.NATS_MANUAL_ACK, NatsManualAck.class); |
| NatsManualAck manualAck | ||
| = exchange.getIn().getHeader(NatsConstants.NATS_MANUAL_ACK, NatsManualAck.class); | ||
| assertNotNull(manualAck); | ||
| assertInstanceOf(NatsManualAck.class, manualAck); |
There was a problem hiding this comment.
Nit: this assertion is redundant — manualAck is already typed as NatsManualAck by the typed getHeader(..., NatsManualAck.class) call above, so the cast cannot succeed with a different type. The preceding assertNotNull(manualAck) is sufficient; the assertInstanceOf import (line 32) can be removed too.
| assertInstanceOf(NatsManualAck.class, manualAck); |
9413a6a to
fe39ee0
Compare
|
🧪 CI tested the following changed modules:
All tested modules (13 modules)
|
|
information in the upgrade guide should ONLY be added if it affects existing users that UPGRADE. New features like manual ack blah blah should only be documented in its regular doc |
fe39ee0 to
17ed795
Compare
Summary
Claude Code on behalf of Federico Mariani
Add manual ACK support for the camel-nats JetStream consumer, allowing users to control when messages are acknowledged instead of relying on automatic ACK on exchange completion.
manualAckboolean endpoint parameter (defaultfalse)NatsManualAckinterface exposed asCamelNatsManualAckmessage header withack(),nak(),nakWithDelay(Duration),term(), andinProgress()methodsSynchronizationAdapterACK/NAK is skipped; if the user doesn't call any method, NATS redelivers afterackWaitexpiresFollows the same pattern as camel-kafka's
allowManualCommit/KafkaManualCommit.Test plan
NatsJetstreamConsumerManualAckIT— push subscription: manual ack + manual nak with redeliveryNatsJetstreamConsumerManualAckPullIT— pull subscription: same scenariosJira: https://issues.apache.org/jira/browse/CAMEL-23483