Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ public boolean configure(CamelContext camelContext, Object obj, String name, Obj
case "jetstreamName": getOrCreateConfiguration(target).setJetstreamName(property(camelContext, java.lang.String.class, value)); return true;
case "lazystartproducer":
case "lazyStartProducer": target.setLazyStartProducer(property(camelContext, boolean.class, value)); return true;
case "manualack":
case "manualAck": getOrCreateConfiguration(target).setManualAck(property(camelContext, boolean.class, value)); return true;
case "maxdeliver":
case "maxDeliver": getOrCreateConfiguration(target).setMaxDeliver(property(camelContext, long.class, value)); return true;
case "maxmessages":
Expand Down Expand Up @@ -148,6 +150,8 @@ public Class<?> getOptionType(String name, boolean ignoreCase) {
case "jetstreamName": return java.lang.String.class;
case "lazystartproducer":
case "lazyStartProducer": return boolean.class;
case "manualack":
case "manualAck": return boolean.class;
case "maxdeliver":
case "maxDeliver": return long.class;
case "maxmessages":
Expand Down Expand Up @@ -235,6 +239,8 @@ public Object getOptionValue(Object obj, String name, boolean ignoreCase) {
case "jetstreamName": return getOrCreateConfiguration(target).getJetstreamName();
case "lazystartproducer":
case "lazyStartProducer": return target.isLazyStartProducer();
case "manualack":
case "manualAck": return getOrCreateConfiguration(target).isManualAck();
case "maxdeliver":
case "maxDeliver": return getOrCreateConfiguration(target).getMaxDeliver();
case "maxmessages":
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ public boolean configure(CamelContext camelContext, Object obj, String name, Obj
case "jetstreamName": target.getConfiguration().setJetstreamName(property(camelContext, java.lang.String.class, value)); return true;
case "lazystartproducer":
case "lazyStartProducer": target.setLazyStartProducer(property(camelContext, boolean.class, value)); return true;
case "manualack":
case "manualAck": target.getConfiguration().setManualAck(property(camelContext, boolean.class, value)); return true;
case "maxdeliver":
case "maxDeliver": target.getConfiguration().setMaxDeliver(property(camelContext, long.class, value)); return true;
case "maxmessages":
Expand Down Expand Up @@ -141,6 +143,8 @@ public Class<?> getOptionType(String name, boolean ignoreCase) {
case "jetstreamName": return java.lang.String.class;
case "lazystartproducer":
case "lazyStartProducer": return boolean.class;
case "manualack":
case "manualAck": return boolean.class;
case "maxdeliver":
case "maxDeliver": return long.class;
case "maxmessages":
Expand Down Expand Up @@ -227,6 +231,8 @@ public Object getOptionValue(Object obj, String name, boolean ignoreCase) {
case "jetstreamName": return target.getConfiguration().getJetstreamName();
case "lazystartproducer":
case "lazyStartProducer": return target.isLazyStartProducer();
case "manualack":
case "manualAck": return target.getConfiguration().isManualAck();
case "maxdeliver":
case "maxDeliver": return target.getConfiguration().getMaxDeliver();
case "maxmessages":
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public class NatsEndpointUriFactory extends org.apache.camel.support.component.E
private static final Set<String> SECRET_PROPERTY_NAMES;
private static final Map<String, String> MULTI_VALUE_PREFIXES;
static {
Set<String> props = new HashSet<>(43);
Set<String> props = new HashSet<>(44);
props.add("ackPolicy");
props.add("ackWait");
props.add("bridgeErrorHandler");
Expand All @@ -41,6 +41,7 @@ public class NatsEndpointUriFactory extends org.apache.camel.support.component.E
props.add("jetstreamEnabled");
props.add("jetstreamName");
props.add("lazyStartProducer");
props.add("manualAck");
props.add("maxDeliver");
props.add("maxMessages");
props.add("maxPingsOut");
Expand Down

Large diffs are not rendered by default.

23 changes: 23 additions & 0 deletions components/camel-nats/src/main/docs/nats-component.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -125,5 +125,28 @@ from("nats:mytopic?maxMessages=5&queueName=myqueue")
.to("mock:result");
----

=== Manual Acknowledgment (JetStream)

When consuming from JetStream, by default messages are automatically acknowledged
after successful route processing, or negatively acknowledged (redelivered) on failure.

To take full control of acknowledgment, set `manualAck=true` on the consumer endpoint.
This disables automatic acknowledgment and exposes a `NatsManualAck` object as the
`CamelNatsManualAck` message header.

[source,java]
----
from("nats:mytopic?jetstreamEnabled=true&jetstreamName=mystream&durableName=myconsumer&pullSubscription=false&manualAck=true")
.process(exchange -> {
// do work ...

NatsManualAck manualAck = exchange.getIn().getHeader("CamelNatsManualAck", NatsManualAck.class);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor: prefer the constant over the string literal — matches how camel-kafka docs reference KafkaConstants.MANUAL_COMMIT and keeps the doc example aligned with NatsConstants.NATS_MANUAL_ACK.

Suggested change
NatsManualAck manualAck = exchange.getIn().getHeader("CamelNatsManualAck", NatsManualAck.class);
NatsManualAck manualAck = exchange.getIn().getHeader(NatsConstants.NATS_MANUAL_ACK, NatsManualAck.class);

manualAck.ack();
});
----

If the route completes without calling any NatsManualAck method, the message remains unacknowledged
and NATS will redeliver it after `ackWait` expires (default 30 seconds).


include::spring-boot:partial$starter.adoc[]
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.camel.component.nats;

import java.time.Duration;

import io.nats.client.Message;

class DefaultNatsManualAck implements NatsManualAck {

private final Message message;

DefaultNatsManualAck(Message message) {
this.message = message;
}

@Override
public void ack() {
message.ack();
}

@Override
public void nak() {
message.nak();
}

@Override
public void nakWithDelay(Duration delay) {
message.nakWithDelay(delay);
}

@Override
public void term() {
message.term();
}

@Override
public void inProgress() {
message.inProgress();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ public class NatsConfiguration implements Cloneable {
private long nackWait = 5000;
@UriParam(label = "consumer")
private long maxDeliver;
@UriParam(label = "consumer", defaultValue = "false")
private boolean manualAck;
@UriParam(label = "consumer", defaultValue = "10")
private int poolSize = 10;
@UriParam(label = "common", defaultValue = "true")
Expand Down Expand Up @@ -679,4 +681,25 @@ public long getMaxDeliver() {
public void setMaxDeliver(long maxDeliver) {
this.maxDeliver = maxDeliver;
}

public boolean isManualAck() {
return manualAck;
}

/**
* Whether to allow doing manual acknowledgment via {@link NatsManualAck}.
* <p/>
* If this option is enabled then an instance of {@link NatsManualAck} is stored on the
* {@link org.apache.camel.Exchange} message header, which allows end users to access this API and perform manual
* ack/nak/term operations via the JetStream consumer.
* <p/>
* When enabled, the automatic acknowledgment on exchange completion is disabled. If the user does not call any ack
* method, the message remains unacknowledged and NATS will redeliver it after the ackWait timeout expires.
* <p/>
* This option is only applicable when JetStream is enabled (jetstreamEnabled=true). It has no effect when
* ackPolicy=None since the server acknowledges messages automatically on delivery.
*/
public void setManualAck(boolean manualAck) {
this.manualAck = manualAck;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ public interface NatsConstants {
description = "Number of times this message has been delivered (1 = first, > 1 then message has been redelivered)",
javaType = "long", important = true)
String NATS_DELIVERY_COUNTER = "CamelNatsDeliveryCounter";
@Metadata(label = "consumer",
description = "The manual acknowledgment handle for JetStream messages (only set when manualAck=true).",
javaType = "org.apache.camel.component.nats.NatsManualAck")
String NATS_MANUAL_ACK = "CamelNatsManualAck";

String NATS_REQUEST_TIMEOUT_THREAD_PROFILE_NAME = "CamelNatsRequestTimeoutExecutor";
}
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,15 @@ protected void doStart() throws Exception {
? this.getEndpoint().getConfiguration().getConnection()
: this.getEndpoint().getConnection();

this.executor.submit(new NatsConsumingTask(this.connection, this.getEndpoint().getConfiguration()));
NatsConfiguration config = this.getEndpoint().getConfiguration();
if (config.isManualAck() && !config.isJetstreamEnabled()) {
LOG.warn("manualAck=true has no effect without jetstreamEnabled=true; standard NATS has no acknowledgment");
}
if (config.isManualAck() && config.getAckPolicy() == AckPolicy.None) {
LOG.warn(
"manualAck=true with ackPolicy=None: the server acknowledges automatically on delivery, manual ack/nak calls will have no effect");
}
this.executor.submit(new NatsConsumingTask(this.connection, config));
}

@Override
Expand Down Expand Up @@ -304,35 +312,38 @@ private void setupStandardNatsConsumer(String topic, String queueName, Integer m
class CamelNatsMessageHandler implements MessageHandler {

final boolean ackPolicyNone = configuration.getAckPolicy() == AckPolicy.None;
final boolean manualAckEnabled = configuration.isManualAck() && configuration.isJetstreamEnabled();

@Override
public void onMessage(Message msg) throws InterruptedException {
LOG.debug("Received Message: {}", msg);
final Exchange exchange = NatsConsumer.this.createExchange(false);

exchange.getExchangeExtension().addOnCompletion(new SynchronizationAdapter() {
@Override
public void onComplete(Exchange exchange) {
LOG.debug("ACK");
msg.ack();
}

@Override
public void onFailure(Exchange exchange) {
if (ackPolicyNone) {
// ACK policy is none which means that we should auto ACK even if the message processed failed in Camel
if (!manualAckEnabled) {
exchange.getExchangeExtension().addOnCompletion(new SynchronizationAdapter() {
@Override
public void onComplete(Exchange exchange) {
LOG.debug("ACK");
msg.ack();
} else {
LOG.debug("NACK (delay:{})", configuration.getNackWait());
if (configuration.getNackWait() <= 0) {
msg.nak();
}

@Override
public void onFailure(Exchange exchange) {
if (ackPolicyNone) {
// ACK policy is none which means that we should auto ACK even if the message processed failed in Camel
LOG.debug("ACK");
msg.ack();
} else {
msg.nakWithDelay(configuration.getNackWait());
LOG.debug("NACK (delay:{})", configuration.getNackWait());
if (configuration.getNackWait() <= 0) {
msg.nak();
} else {
msg.nakWithDelay(configuration.getNackWait());
}
}
}
}
});
});
}
try {
exchange.getIn().setBody(msg.getData());
exchange.getIn().setHeader(NatsConstants.NATS_REPLY_TO, msg.getReplyTo());
Expand Down Expand Up @@ -367,6 +378,10 @@ public void onFailure(Exchange exchange) {
exchange.getMessage().setPayloadForTrait(MessageTrait.REDELIVERY,
evalRedeliveryMessageTrait(msg, exchange));

if (manualAckEnabled) {
exchange.getIn().setHeader(NatsConstants.NATS_MANUAL_ACK, new DefaultNatsManualAck(msg));
}

NatsConsumer.this.processor.process(exchange);

// is there a reply?
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.camel.component.nats;

import java.time.Duration;

/**
* Allows manual acknowledgment of JetStream messages when using the NATS consumer with {@code manualAck=true}.
*
* @see NatsConstants#NATS_MANUAL_ACK
*/
public interface NatsManualAck {

/**
* Acknowledge the message.
*/
void ack();

/**
* Negative acknowledge the message. The message will be redelivered immediately.
*/
void nak();

/**
* Negative acknowledge the message with a delay before redelivery.
*/
void nakWithDelay(Duration delay);

/**
* Terminate delivery of this message. The server will stop redelivering it.
*/
void term();

/**
* Signal that processing is still in progress. Resets the {@code ackWait} timer to prevent the server from
* redelivering while long-running processing is ongoing.
*/
void inProgress();
}
Loading