diff --git a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/nats.json b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/nats.json index a83b6b4b6373f..16a6c71fa2bbc 100644 --- a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/nats.json +++ b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/nats.json @@ -45,28 +45,29 @@ "ackWait": { "index": 18, "kind": "property", "displayName": "Ack Wait", "group": "consumer", "label": "consumer", "required": false, "type": "integer", "javaType": "long", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 30000, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "After a message is delivered to a consumer, the server waits 30 seconds (default) for an acknowledgement. If none arrives (timeout), the message becomes eligible for redelivery." }, "bridgeErrorHandler": { "index": 19, "kind": "property", "displayName": "Bridge Error Handler", "group": "consumer", "label": "consumer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Allows for bridging the consumer to the Camel routing Error Handler, which mean any exceptions (if possible) occurred while the Camel consumer is trying to pickup incoming messages, or the likes, will now be processed as a message and handled by the routing Error Handler. Important: This is only possible if the 3rd party component allows Camel to be alerted if an exception was thrown. Some components handle this internally only, and therefore bridgeErrorHandler is not possible. In other situations we may improve the Camel component to hook into the 3rd party component and make this possible for future releases. By default the consumer will use the org.apache.camel.spi.ExceptionHandler to deal with exceptions, that will be logged at WARN or ERROR level and ignored." }, "durableName": { "index": 20, "kind": "property", "displayName": "Durable Name", "group": "consumer", "label": "consumer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Sets the name to assign to the JetStream durable consumer. Setting this value makes the consumer durable. The value is used to set the durable() field in the underlying NATS ConsumerConfiguration.Builder." }, - "maxDeliver": { "index": 21, "kind": "property", "displayName": "Max Deliver", "group": "consumer", "label": "consumer", "required": false, "type": "integer", "javaType": "long", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Maximum number of attempts to deliver a message from Nats to a consumer. Once MaxDeliver is reached, the NATS server stops attempting to deliver that specific message. The message is not deleted, it remains in the stream but is simply skipped. It is recommended to set this option to a sensible value in case a message is poison and can not successfully be processed and would always keep failing." }, - "maxMessages": { "index": 22, "kind": "property", "displayName": "Max Messages", "group": "consumer", "label": "consumer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Stop receiving messages from a topic we are subscribing to after maxMessages" }, - "nackWait": { "index": 23, "kind": "property", "displayName": "Nack Wait", "group": "consumer", "label": "consumer", "required": false, "type": "integer", "javaType": "long", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 5000, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "For negative acknowledgements (NAK), redelivery is delayed by 5 seconds (default). Setting this to 0 or negative makes the redelivery immediately. Be careful as this can cause the consumer to keep re-processing the same message over and over again due to intermediate error that last a while." }, - "poolSize": { "index": 24, "kind": "property", "displayName": "Pool Size", "group": "consumer", "label": "consumer", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 10, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Consumer thread pool size (default is 10)" }, - "pullBatchSize": { "index": 25, "kind": "property", "displayName": "Pull Batch Size", "group": "consumer", "label": "consumer", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 10, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Maximum number of messages to fetch per pull request when using a JetStream Pull Subscription. Only used when {code pullSubscription=true}." }, - "pullFetchTimeout": { "index": 26, "kind": "property", "displayName": "Pull Fetch Timeout", "group": "consumer", "label": "consumer", "required": false, "type": "integer", "javaType": "long", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 1000, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Maximum time (in milliseconds) to wait for a batch of messages to be available on the server during a single fetch when using a JetStream Pull Subscription. Only used when {code pullSubscription=true}." }, - "pullSubscription": { "index": 27, "kind": "property", "displayName": "Pull Subscription", "group": "consumer", "label": "consumer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": true, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Sets the consumer subscription type for JetStream. Set to true to use a Pull Subscription (consumer explicitly requests messages). Set to false to use a Push Subscription (messages are automatically delivered)." }, - "queueName": { "index": 28, "kind": "property", "displayName": "Queue Name", "group": "consumer", "label": "consumer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "The Queue name if we are using nats for a queue configuration" }, - "replyToDisabled": { "index": 29, "kind": "property", "displayName": "Reply To Disabled", "group": "consumer", "label": "consumer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Can be used to turn off sending back reply message in the consumer." }, - "consumerConfiguration": { "index": 30, "kind": "property", "displayName": "Consumer Configuration", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "object", "javaType": "io.nats.client.api.ConsumerConfiguration", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Sets a custom ConsumerConfiguration object for the JetStream consumer. This is an advanced option typically used when you need to configure properties not exposed as simple Camel URI parameters. When set, this object will be used to build the final consumer subscription options." }, - "lazyStartProducer": { "index": 31, "kind": "property", "displayName": "Lazy Start Producer", "group": "producer", "label": "producer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Whether the producer should be started lazy (on the first message). By starting lazy you can use this to allow CamelContext and routes to startup in situations where a producer may otherwise fail during starting and cause the route to fail being started. By deferring this startup to be lazy then the startup failure can be handled during routing messages via Camel's routing error handlers. Beware that when the first message is processed then creating and starting the producer may take a little time and prolong the total processing time of the processing." }, - "replySubject": { "index": 32, "kind": "property", "displayName": "Reply Subject", "group": "producer", "label": "producer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "the subject to which subscribers should send response" }, - "requestTimeout": { "index": 33, "kind": "property", "displayName": "Request Timeout", "group": "producer", "label": "producer", "required": false, "type": "integer", "javaType": "long", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 20000, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Request timeout in milliseconds" }, - "autowiredEnabled": { "index": 34, "kind": "property", "displayName": "Autowired Enabled", "group": "advanced", "label": "advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": true, "description": "Whether autowiring is enabled. This is used for automatic autowiring options (the option must be marked as autowired) by looking up in the registry to find if there is a single instance of matching type, which then gets configured on the component. This can be used for automatic configuring JDBC data sources, JMS connection factories, AWS Clients, etc." }, - "connection": { "index": 35, "kind": "property", "displayName": "Connection", "group": "advanced", "label": "advanced", "required": false, "type": "object", "javaType": "io.nats.client.Connection", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Reference an already instantiated connection to Nats server" }, - "headerFilterStrategy": { "index": 36, "kind": "property", "displayName": "Header Filter Strategy", "group": "advanced", "label": "advanced", "required": false, "type": "object", "javaType": "org.apache.camel.spi.HeaderFilterStrategy", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "To use a custom header filter strategy." }, - "jetstreamAsync": { "index": 37, "kind": "property", "displayName": "Jetstream Async", "group": "advanced", "label": "advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": true, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Sets whether to operate JetStream requests asynchronously." }, - "traceConnection": { "index": 38, "kind": "property", "displayName": "Trace Connection", "group": "advanced", "label": "advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Whether or not connection trace messages should be printed to standard out for fine grained debugging of connection issues." }, - "credentialsFilePath": { "index": 39, "kind": "property", "displayName": "Credentials File Path", "group": "security", "label": "security", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "If we use useCredentialsFile to true we'll need to set the credentialsFilePath option. It can be loaded by default from classpath, but you can prefix with classpath:, file:, or http: to load the resource from different systems." }, - "secure": { "index": 40, "kind": "property", "displayName": "Secure", "group": "security", "label": "security", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Set secure option indicating TLS is required" }, - "sslContextParameters": { "index": 41, "kind": "property", "displayName": "Ssl Context Parameters", "group": "security", "label": "security", "required": false, "type": "object", "javaType": "org.apache.camel.support.jsse.SSLContextParameters", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "To configure security using SSLContextParameters" }, - "useGlobalSslContextParameters": { "index": 42, "kind": "property", "displayName": "Use Global Ssl Context Parameters", "group": "security", "label": "security", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Enable usage of global SSL context parameters." } + "manualAck": { "index": 21, "kind": "property", "displayName": "Manual Ack", "group": "consumer", "label": "consumer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Whether to allow doing manual acknowledgment via NatsManualAck. If this option is enabled then an instance of NatsManualAck is stored on the org.apache.camel.Exchange message header, which allows end users to access this API and perform manual ack\/nak\/term operations via the JetStream consumer. When enabled, the automatic acknowledgment on exchange completion is disabled. If the user does not call any ack method, the message remains unacknowledged and NATS will redeliver it after the ackWait timeout expires. This option is only applicable when JetStream is enabled (jetstreamEnabled=true). It has no effect when ackPolicy=None since the server acknowledges messages automatically on delivery." }, + "maxDeliver": { "index": 22, "kind": "property", "displayName": "Max Deliver", "group": "consumer", "label": "consumer", "required": false, "type": "integer", "javaType": "long", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Maximum number of attempts to deliver a message from Nats to a consumer. Once MaxDeliver is reached, the NATS server stops attempting to deliver that specific message. The message is not deleted, it remains in the stream but is simply skipped. It is recommended to set this option to a sensible value in case a message is poison and can not successfully be processed and would always keep failing." }, + "maxMessages": { "index": 23, "kind": "property", "displayName": "Max Messages", "group": "consumer", "label": "consumer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Stop receiving messages from a topic we are subscribing to after maxMessages" }, + "nackWait": { "index": 24, "kind": "property", "displayName": "Nack Wait", "group": "consumer", "label": "consumer", "required": false, "type": "integer", "javaType": "long", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 5000, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "For negative acknowledgements (NAK), redelivery is delayed by 5 seconds (default). Setting this to 0 or negative makes the redelivery immediately. Be careful as this can cause the consumer to keep re-processing the same message over and over again due to intermediate error that last a while." }, + "poolSize": { "index": 25, "kind": "property", "displayName": "Pool Size", "group": "consumer", "label": "consumer", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 10, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Consumer thread pool size (default is 10)" }, + "pullBatchSize": { "index": 26, "kind": "property", "displayName": "Pull Batch Size", "group": "consumer", "label": "consumer", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 10, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Maximum number of messages to fetch per pull request when using a JetStream Pull Subscription. Only used when {code pullSubscription=true}." }, + "pullFetchTimeout": { "index": 27, "kind": "property", "displayName": "Pull Fetch Timeout", "group": "consumer", "label": "consumer", "required": false, "type": "integer", "javaType": "long", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 1000, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Maximum time (in milliseconds) to wait for a batch of messages to be available on the server during a single fetch when using a JetStream Pull Subscription. Only used when {code pullSubscription=true}." }, + "pullSubscription": { "index": 28, "kind": "property", "displayName": "Pull Subscription", "group": "consumer", "label": "consumer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": true, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Sets the consumer subscription type for JetStream. Set to true to use a Pull Subscription (consumer explicitly requests messages). Set to false to use a Push Subscription (messages are automatically delivered)." }, + "queueName": { "index": 29, "kind": "property", "displayName": "Queue Name", "group": "consumer", "label": "consumer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "The Queue name if we are using nats for a queue configuration" }, + "replyToDisabled": { "index": 30, "kind": "property", "displayName": "Reply To Disabled", "group": "consumer", "label": "consumer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Can be used to turn off sending back reply message in the consumer." }, + "consumerConfiguration": { "index": 31, "kind": "property", "displayName": "Consumer Configuration", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "object", "javaType": "io.nats.client.api.ConsumerConfiguration", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Sets a custom ConsumerConfiguration object for the JetStream consumer. This is an advanced option typically used when you need to configure properties not exposed as simple Camel URI parameters. When set, this object will be used to build the final consumer subscription options." }, + "lazyStartProducer": { "index": 32, "kind": "property", "displayName": "Lazy Start Producer", "group": "producer", "label": "producer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Whether the producer should be started lazy (on the first message). By starting lazy you can use this to allow CamelContext and routes to startup in situations where a producer may otherwise fail during starting and cause the route to fail being started. By deferring this startup to be lazy then the startup failure can be handled during routing messages via Camel's routing error handlers. Beware that when the first message is processed then creating and starting the producer may take a little time and prolong the total processing time of the processing." }, + "replySubject": { "index": 33, "kind": "property", "displayName": "Reply Subject", "group": "producer", "label": "producer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "the subject to which subscribers should send response" }, + "requestTimeout": { "index": 34, "kind": "property", "displayName": "Request Timeout", "group": "producer", "label": "producer", "required": false, "type": "integer", "javaType": "long", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 20000, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Request timeout in milliseconds" }, + "autowiredEnabled": { "index": 35, "kind": "property", "displayName": "Autowired Enabled", "group": "advanced", "label": "advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": true, "description": "Whether autowiring is enabled. This is used for automatic autowiring options (the option must be marked as autowired) by looking up in the registry to find if there is a single instance of matching type, which then gets configured on the component. This can be used for automatic configuring JDBC data sources, JMS connection factories, AWS Clients, etc." }, + "connection": { "index": 36, "kind": "property", "displayName": "Connection", "group": "advanced", "label": "advanced", "required": false, "type": "object", "javaType": "io.nats.client.Connection", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Reference an already instantiated connection to Nats server" }, + "headerFilterStrategy": { "index": 37, "kind": "property", "displayName": "Header Filter Strategy", "group": "advanced", "label": "advanced", "required": false, "type": "object", "javaType": "org.apache.camel.spi.HeaderFilterStrategy", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "To use a custom header filter strategy." }, + "jetstreamAsync": { "index": 38, "kind": "property", "displayName": "Jetstream Async", "group": "advanced", "label": "advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": true, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Sets whether to operate JetStream requests asynchronously." }, + "traceConnection": { "index": 39, "kind": "property", "displayName": "Trace Connection", "group": "advanced", "label": "advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Whether or not connection trace messages should be printed to standard out for fine grained debugging of connection issues." }, + "credentialsFilePath": { "index": 40, "kind": "property", "displayName": "Credentials File Path", "group": "security", "label": "security", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "If we use useCredentialsFile to true we'll need to set the credentialsFilePath option. It can be loaded by default from classpath, but you can prefix with classpath:, file:, or http: to load the resource from different systems." }, + "secure": { "index": 41, "kind": "property", "displayName": "Secure", "group": "security", "label": "security", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Set secure option indicating TLS is required" }, + "sslContextParameters": { "index": 42, "kind": "property", "displayName": "Ssl Context Parameters", "group": "security", "label": "security", "required": false, "type": "object", "javaType": "org.apache.camel.support.jsse.SSLContextParameters", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "To configure security using SSLContextParameters" }, + "useGlobalSslContextParameters": { "index": 43, "kind": "property", "displayName": "Use Global Ssl Context Parameters", "group": "security", "label": "security", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Enable usage of global SSL context parameters." } }, "headers": { "CamelNatsMessageTimestamp": { "index": 0, "kind": "header", "displayName": "", "group": "common", "label": "", "required": false, "javaType": "long", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The timestamp of a consumed message.", "constantName": "org.apache.camel.component.nats.NatsConstants#NATS_MESSAGE_TIMESTAMP" }, @@ -76,7 +77,8 @@ "CamelNatsQueueName": { "index": 4, "kind": "header", "displayName": "", "group": "common", "label": "", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The Queue name of a consumed message (may be null).", "constantName": "org.apache.camel.component.nats.NatsConstants#NATS_QUEUE_NAME" }, "CamelNatsStatusCode": { "index": 5, "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "int", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "important": true, "description": "Status message code", "constantName": "org.apache.camel.component.nats.NatsConstants#NATS_STATUS_CODE" }, "CamelNatsStatusError": { "index": 6, "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "important": true, "description": "Status message error message", "constantName": "org.apache.camel.component.nats.NatsConstants#NATS_STATUS_ERROR" }, - "CamelNatsDeliveryCounter": { "index": 7, "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "long", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "important": true, "description": "Number of times this message has been delivered (1 = first, 1 then message has been redelivered)", "constantName": "org.apache.camel.component.nats.NatsConstants#NATS_DELIVERY_COUNTER" } + "CamelNatsDeliveryCounter": { "index": 7, "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "long", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "important": true, "description": "Number of times this message has been delivered (1 = first, 1 then message has been redelivered)", "constantName": "org.apache.camel.component.nats.NatsConstants#NATS_DELIVERY_COUNTER" }, + "CamelNatsManualAck": { "index": 8, "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "org.apache.camel.component.nats.NatsManualAck", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The manual acknowledgment handle for JetStream messages (only set when manualAck=true).", "constantName": "org.apache.camel.component.nats.NatsConstants#NATS_MANUAL_ACK" } }, "properties": { "topic": { "index": 0, "kind": "path", "displayName": "Topic", "group": "common", "label": "", "required": true, "type": "string", "javaType": "java.lang.String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "The name of topic we want to use" }, @@ -99,28 +101,29 @@ "ackPolicy": { "index": 17, "kind": "parameter", "displayName": "Ack Policy", "group": "consumer", "label": "consumer", "required": false, "type": "enum", "javaType": "io.nats.client.api.AckPolicy", "enum": [ "None", "All", "Explicit" ], "deprecated": false, "autowired": false, "secret": false, "defaultValue": "Explicit", "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Acknowledgement mode. none = Messages are acknowledged as soon as the server sends them (danger: messages that Camel failed to process is also ack). Clients do not need to ack. all = All messages with a sequence number less than the message acked are also acknowledged. E.g. reading a batch of messages 1..100. Ack on message 100 will acknowledge 1..99 as well. explicit (default) = Each message is acknowledged individually by Camel after the message has been processed, this ensures the message is only ack if success and nack if processing failed due to an exception during routing. Message can be acked out of sequence and create gaps of unacknowledged messages in the consumer." }, "ackWait": { "index": 18, "kind": "parameter", "displayName": "Ack Wait", "group": "consumer", "label": "consumer", "required": false, "type": "integer", "javaType": "long", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 30000, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "After a message is delivered to a consumer, the server waits 30 seconds (default) for an acknowledgement. If none arrives (timeout), the message becomes eligible for redelivery." }, "durableName": { "index": 19, "kind": "parameter", "displayName": "Durable Name", "group": "consumer", "label": "consumer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Sets the name to assign to the JetStream durable consumer. Setting this value makes the consumer durable. The value is used to set the durable() field in the underlying NATS ConsumerConfiguration.Builder." }, - "maxDeliver": { "index": 20, "kind": "parameter", "displayName": "Max Deliver", "group": "consumer", "label": "consumer", "required": false, "type": "integer", "javaType": "long", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Maximum number of attempts to deliver a message from Nats to a consumer. Once MaxDeliver is reached, the NATS server stops attempting to deliver that specific message. The message is not deleted, it remains in the stream but is simply skipped. It is recommended to set this option to a sensible value in case a message is poison and can not successfully be processed and would always keep failing." }, - "maxMessages": { "index": 21, "kind": "parameter", "displayName": "Max Messages", "group": "consumer", "label": "consumer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Stop receiving messages from a topic we are subscribing to after maxMessages" }, - "nackWait": { "index": 22, "kind": "parameter", "displayName": "Nack Wait", "group": "consumer", "label": "consumer", "required": false, "type": "integer", "javaType": "long", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 5000, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "For negative acknowledgements (NAK), redelivery is delayed by 5 seconds (default). Setting this to 0 or negative makes the redelivery immediately. Be careful as this can cause the consumer to keep re-processing the same message over and over again due to intermediate error that last a while." }, - "poolSize": { "index": 23, "kind": "parameter", "displayName": "Pool Size", "group": "consumer", "label": "consumer", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 10, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Consumer thread pool size (default is 10)" }, - "pullBatchSize": { "index": 24, "kind": "parameter", "displayName": "Pull Batch Size", "group": "consumer", "label": "consumer", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 10, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Maximum number of messages to fetch per pull request when using a JetStream Pull Subscription. Only used when {code pullSubscription=true}." }, - "pullFetchTimeout": { "index": 25, "kind": "parameter", "displayName": "Pull Fetch Timeout", "group": "consumer", "label": "consumer", "required": false, "type": "integer", "javaType": "long", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 1000, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Maximum time (in milliseconds) to wait for a batch of messages to be available on the server during a single fetch when using a JetStream Pull Subscription. Only used when {code pullSubscription=true}." }, - "pullSubscription": { "index": 26, "kind": "parameter", "displayName": "Pull Subscription", "group": "consumer", "label": "consumer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": true, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Sets the consumer subscription type for JetStream. Set to true to use a Pull Subscription (consumer explicitly requests messages). Set to false to use a Push Subscription (messages are automatically delivered)." }, - "queueName": { "index": 27, "kind": "parameter", "displayName": "Queue Name", "group": "consumer", "label": "consumer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "The Queue name if we are using nats for a queue configuration" }, - "replyToDisabled": { "index": 28, "kind": "parameter", "displayName": "Reply To Disabled", "group": "consumer", "label": "consumer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Can be used to turn off sending back reply message in the consumer." }, - "bridgeErrorHandler": { "index": 29, "kind": "parameter", "displayName": "Bridge Error Handler", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Allows for bridging the consumer to the Camel routing Error Handler, which mean any exceptions (if possible) occurred while the Camel consumer is trying to pickup incoming messages, or the likes, will now be processed as a message and handled by the routing Error Handler. Important: This is only possible if the 3rd party component allows Camel to be alerted if an exception was thrown. Some components handle this internally only, and therefore bridgeErrorHandler is not possible. In other situations we may improve the Camel component to hook into the 3rd party component and make this possible for future releases. By default the consumer will use the org.apache.camel.spi.ExceptionHandler to deal with exceptions, that will be logged at WARN or ERROR level and ignored." }, - "consumerConfiguration": { "index": 30, "kind": "parameter", "displayName": "Consumer Configuration", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "object", "javaType": "io.nats.client.api.ConsumerConfiguration", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Sets a custom ConsumerConfiguration object for the JetStream consumer. This is an advanced option typically used when you need to configure properties not exposed as simple Camel URI parameters. When set, this object will be used to build the final consumer subscription options." }, - "exceptionHandler": { "index": 31, "kind": "parameter", "displayName": "Exception Handler", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "object", "javaType": "org.apache.camel.spi.ExceptionHandler", "optionalPrefix": "consumer.", "deprecated": false, "autowired": false, "secret": false, "description": "To let the consumer use a custom ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this option is not in use. By default the consumer will deal with exceptions, that will be logged at WARN or ERROR level and ignored." }, - "exchangePattern": { "index": 32, "kind": "parameter", "displayName": "Exchange Pattern", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "enum", "javaType": "org.apache.camel.ExchangePattern", "enum": [ "InOnly", "InOut" ], "deprecated": false, "autowired": false, "secret": false, "description": "Sets the exchange pattern when the consumer creates an exchange." }, - "replySubject": { "index": 33, "kind": "parameter", "displayName": "Reply Subject", "group": "producer", "label": "producer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "the subject to which subscribers should send response" }, - "requestTimeout": { "index": 34, "kind": "parameter", "displayName": "Request Timeout", "group": "producer", "label": "producer", "required": false, "type": "integer", "javaType": "long", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 20000, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Request timeout in milliseconds" }, - "lazyStartProducer": { "index": 35, "kind": "parameter", "displayName": "Lazy Start Producer", "group": "producer (advanced)", "label": "producer,advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Whether the producer should be started lazy (on the first message). By starting lazy you can use this to allow CamelContext and routes to startup in situations where a producer may otherwise fail during starting and cause the route to fail being started. By deferring this startup to be lazy then the startup failure can be handled during routing messages via Camel's routing error handlers. Beware that when the first message is processed then creating and starting the producer may take a little time and prolong the total processing time of the processing." }, - "connection": { "index": 36, "kind": "parameter", "displayName": "Connection", "group": "advanced", "label": "advanced", "required": false, "type": "object", "javaType": "io.nats.client.Connection", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Reference an already instantiated connection to Nats server" }, - "headerFilterStrategy": { "index": 37, "kind": "parameter", "displayName": "Header Filter Strategy", "group": "advanced", "label": "advanced", "required": false, "type": "object", "javaType": "org.apache.camel.spi.HeaderFilterStrategy", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "To use a custom header filter strategy." }, - "jetstreamAsync": { "index": 38, "kind": "parameter", "displayName": "Jetstream Async", "group": "advanced", "label": "advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": true, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Sets whether to operate JetStream requests asynchronously." }, - "traceConnection": { "index": 39, "kind": "parameter", "displayName": "Trace Connection", "group": "advanced", "label": "advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Whether or not connection trace messages should be printed to standard out for fine grained debugging of connection issues." }, - "credentialsFilePath": { "index": 40, "kind": "parameter", "displayName": "Credentials File Path", "group": "security", "label": "security", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "If we use useCredentialsFile to true we'll need to set the credentialsFilePath option. It can be loaded by default from classpath, but you can prefix with classpath:, file:, or http: to load the resource from different systems." }, - "secure": { "index": 41, "kind": "parameter", "displayName": "Secure", "group": "security", "label": "security", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Set secure option indicating TLS is required" }, - "sslContextParameters": { "index": 42, "kind": "parameter", "displayName": "Ssl Context Parameters", "group": "security", "label": "security", "required": false, "type": "object", "javaType": "org.apache.camel.support.jsse.SSLContextParameters", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "To configure security using SSLContextParameters" } + "manualAck": { "index": 20, "kind": "parameter", "displayName": "Manual Ack", "group": "consumer", "label": "consumer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Whether to allow doing manual acknowledgment via NatsManualAck. If this option is enabled then an instance of NatsManualAck is stored on the org.apache.camel.Exchange message header, which allows end users to access this API and perform manual ack\/nak\/term operations via the JetStream consumer. When enabled, the automatic acknowledgment on exchange completion is disabled. If the user does not call any ack method, the message remains unacknowledged and NATS will redeliver it after the ackWait timeout expires. This option is only applicable when JetStream is enabled (jetstreamEnabled=true). It has no effect when ackPolicy=None since the server acknowledges messages automatically on delivery." }, + "maxDeliver": { "index": 21, "kind": "parameter", "displayName": "Max Deliver", "group": "consumer", "label": "consumer", "required": false, "type": "integer", "javaType": "long", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Maximum number of attempts to deliver a message from Nats to a consumer. Once MaxDeliver is reached, the NATS server stops attempting to deliver that specific message. The message is not deleted, it remains in the stream but is simply skipped. It is recommended to set this option to a sensible value in case a message is poison and can not successfully be processed and would always keep failing." }, + "maxMessages": { "index": 22, "kind": "parameter", "displayName": "Max Messages", "group": "consumer", "label": "consumer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Stop receiving messages from a topic we are subscribing to after maxMessages" }, + "nackWait": { "index": 23, "kind": "parameter", "displayName": "Nack Wait", "group": "consumer", "label": "consumer", "required": false, "type": "integer", "javaType": "long", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 5000, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "For negative acknowledgements (NAK), redelivery is delayed by 5 seconds (default). Setting this to 0 or negative makes the redelivery immediately. Be careful as this can cause the consumer to keep re-processing the same message over and over again due to intermediate error that last a while." }, + "poolSize": { "index": 24, "kind": "parameter", "displayName": "Pool Size", "group": "consumer", "label": "consumer", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 10, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Consumer thread pool size (default is 10)" }, + "pullBatchSize": { "index": 25, "kind": "parameter", "displayName": "Pull Batch Size", "group": "consumer", "label": "consumer", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 10, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Maximum number of messages to fetch per pull request when using a JetStream Pull Subscription. Only used when {code pullSubscription=true}." }, + "pullFetchTimeout": { "index": 26, "kind": "parameter", "displayName": "Pull Fetch Timeout", "group": "consumer", "label": "consumer", "required": false, "type": "integer", "javaType": "long", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 1000, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Maximum time (in milliseconds) to wait for a batch of messages to be available on the server during a single fetch when using a JetStream Pull Subscription. Only used when {code pullSubscription=true}." }, + "pullSubscription": { "index": 27, "kind": "parameter", "displayName": "Pull Subscription", "group": "consumer", "label": "consumer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": true, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Sets the consumer subscription type for JetStream. Set to true to use a Pull Subscription (consumer explicitly requests messages). Set to false to use a Push Subscription (messages are automatically delivered)." }, + "queueName": { "index": 28, "kind": "parameter", "displayName": "Queue Name", "group": "consumer", "label": "consumer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "The Queue name if we are using nats for a queue configuration" }, + "replyToDisabled": { "index": 29, "kind": "parameter", "displayName": "Reply To Disabled", "group": "consumer", "label": "consumer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Can be used to turn off sending back reply message in the consumer." }, + "bridgeErrorHandler": { "index": 30, "kind": "parameter", "displayName": "Bridge Error Handler", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Allows for bridging the consumer to the Camel routing Error Handler, which mean any exceptions (if possible) occurred while the Camel consumer is trying to pickup incoming messages, or the likes, will now be processed as a message and handled by the routing Error Handler. Important: This is only possible if the 3rd party component allows Camel to be alerted if an exception was thrown. Some components handle this internally only, and therefore bridgeErrorHandler is not possible. In other situations we may improve the Camel component to hook into the 3rd party component and make this possible for future releases. By default the consumer will use the org.apache.camel.spi.ExceptionHandler to deal with exceptions, that will be logged at WARN or ERROR level and ignored." }, + "consumerConfiguration": { "index": 31, "kind": "parameter", "displayName": "Consumer Configuration", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "object", "javaType": "io.nats.client.api.ConsumerConfiguration", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Sets a custom ConsumerConfiguration object for the JetStream consumer. This is an advanced option typically used when you need to configure properties not exposed as simple Camel URI parameters. When set, this object will be used to build the final consumer subscription options." }, + "exceptionHandler": { "index": 32, "kind": "parameter", "displayName": "Exception Handler", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "object", "javaType": "org.apache.camel.spi.ExceptionHandler", "optionalPrefix": "consumer.", "deprecated": false, "autowired": false, "secret": false, "description": "To let the consumer use a custom ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this option is not in use. By default the consumer will deal with exceptions, that will be logged at WARN or ERROR level and ignored." }, + "exchangePattern": { "index": 33, "kind": "parameter", "displayName": "Exchange Pattern", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "enum", "javaType": "org.apache.camel.ExchangePattern", "enum": [ "InOnly", "InOut" ], "deprecated": false, "autowired": false, "secret": false, "description": "Sets the exchange pattern when the consumer creates an exchange." }, + "replySubject": { "index": 34, "kind": "parameter", "displayName": "Reply Subject", "group": "producer", "label": "producer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "the subject to which subscribers should send response" }, + "requestTimeout": { "index": 35, "kind": "parameter", "displayName": "Request Timeout", "group": "producer", "label": "producer", "required": false, "type": "integer", "javaType": "long", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 20000, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Request timeout in milliseconds" }, + "lazyStartProducer": { "index": 36, "kind": "parameter", "displayName": "Lazy Start Producer", "group": "producer (advanced)", "label": "producer,advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Whether the producer should be started lazy (on the first message). By starting lazy you can use this to allow CamelContext and routes to startup in situations where a producer may otherwise fail during starting and cause the route to fail being started. By deferring this startup to be lazy then the startup failure can be handled during routing messages via Camel's routing error handlers. Beware that when the first message is processed then creating and starting the producer may take a little time and prolong the total processing time of the processing." }, + "connection": { "index": 37, "kind": "parameter", "displayName": "Connection", "group": "advanced", "label": "advanced", "required": false, "type": "object", "javaType": "io.nats.client.Connection", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Reference an already instantiated connection to Nats server" }, + "headerFilterStrategy": { "index": 38, "kind": "parameter", "displayName": "Header Filter Strategy", "group": "advanced", "label": "advanced", "required": false, "type": "object", "javaType": "org.apache.camel.spi.HeaderFilterStrategy", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "To use a custom header filter strategy." }, + "jetstreamAsync": { "index": 39, "kind": "parameter", "displayName": "Jetstream Async", "group": "advanced", "label": "advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": true, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Sets whether to operate JetStream requests asynchronously." }, + "traceConnection": { "index": 40, "kind": "parameter", "displayName": "Trace Connection", "group": "advanced", "label": "advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Whether or not connection trace messages should be printed to standard out for fine grained debugging of connection issues." }, + "credentialsFilePath": { "index": 41, "kind": "parameter", "displayName": "Credentials File Path", "group": "security", "label": "security", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "If we use useCredentialsFile to true we'll need to set the credentialsFilePath option. It can be loaded by default from classpath, but you can prefix with classpath:, file:, or http: to load the resource from different systems." }, + "secure": { "index": 42, "kind": "parameter", "displayName": "Secure", "group": "security", "label": "security", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Set secure option indicating TLS is required" }, + "sslContextParameters": { "index": 43, "kind": "parameter", "displayName": "Ssl Context Parameters", "group": "security", "label": "security", "required": false, "type": "object", "javaType": "org.apache.camel.support.jsse.SSLContextParameters", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "To configure security using SSLContextParameters" } } } diff --git a/components/camel-nats/src/generated/java/org/apache/camel/component/nats/NatsComponentConfigurer.java b/components/camel-nats/src/generated/java/org/apache/camel/component/nats/NatsComponentConfigurer.java index d0891a119914d..b9f2658bec574 100644 --- a/components/camel-nats/src/generated/java/org/apache/camel/component/nats/NatsComponentConfigurer.java +++ b/components/camel-nats/src/generated/java/org/apache/camel/component/nats/NatsComponentConfigurer.java @@ -62,6 +62,8 @@ public boolean configure(CamelContext camelContext, Object obj, String name, Obj case "jetstreamName": getOrCreateConfiguration(target).setJetstreamName(property(camelContext, java.lang.String.class, value)); return true; case "lazystartproducer": case "lazyStartProducer": target.setLazyStartProducer(property(camelContext, boolean.class, value)); return true; + case "manualack": + case "manualAck": getOrCreateConfiguration(target).setManualAck(property(camelContext, boolean.class, value)); return true; case "maxdeliver": case "maxDeliver": getOrCreateConfiguration(target).setMaxDeliver(property(camelContext, long.class, value)); return true; case "maxmessages": @@ -148,6 +150,8 @@ public Class getOptionType(String name, boolean ignoreCase) { case "jetstreamName": return java.lang.String.class; case "lazystartproducer": case "lazyStartProducer": return boolean.class; + case "manualack": + case "manualAck": return boolean.class; case "maxdeliver": case "maxDeliver": return long.class; case "maxmessages": @@ -235,6 +239,8 @@ public Object getOptionValue(Object obj, String name, boolean ignoreCase) { case "jetstreamName": return getOrCreateConfiguration(target).getJetstreamName(); case "lazystartproducer": case "lazyStartProducer": return target.isLazyStartProducer(); + case "manualack": + case "manualAck": return getOrCreateConfiguration(target).isManualAck(); case "maxdeliver": case "maxDeliver": return getOrCreateConfiguration(target).getMaxDeliver(); case "maxmessages": diff --git a/components/camel-nats/src/generated/java/org/apache/camel/component/nats/NatsEndpointConfigurer.java b/components/camel-nats/src/generated/java/org/apache/camel/component/nats/NatsEndpointConfigurer.java index 923781a8bc8a5..162644fb2792f 100644 --- a/components/camel-nats/src/generated/java/org/apache/camel/component/nats/NatsEndpointConfigurer.java +++ b/components/camel-nats/src/generated/java/org/apache/camel/component/nats/NatsEndpointConfigurer.java @@ -56,6 +56,8 @@ public boolean configure(CamelContext camelContext, Object obj, String name, Obj case "jetstreamName": target.getConfiguration().setJetstreamName(property(camelContext, java.lang.String.class, value)); return true; case "lazystartproducer": case "lazyStartProducer": target.setLazyStartProducer(property(camelContext, boolean.class, value)); return true; + case "manualack": + case "manualAck": target.getConfiguration().setManualAck(property(camelContext, boolean.class, value)); return true; case "maxdeliver": case "maxDeliver": target.getConfiguration().setMaxDeliver(property(camelContext, long.class, value)); return true; case "maxmessages": @@ -141,6 +143,8 @@ public Class getOptionType(String name, boolean ignoreCase) { case "jetstreamName": return java.lang.String.class; case "lazystartproducer": case "lazyStartProducer": return boolean.class; + case "manualack": + case "manualAck": return boolean.class; case "maxdeliver": case "maxDeliver": return long.class; case "maxmessages": @@ -227,6 +231,8 @@ public Object getOptionValue(Object obj, String name, boolean ignoreCase) { case "jetstreamName": return target.getConfiguration().getJetstreamName(); case "lazystartproducer": case "lazyStartProducer": return target.isLazyStartProducer(); + case "manualack": + case "manualAck": return target.getConfiguration().isManualAck(); case "maxdeliver": case "maxDeliver": return target.getConfiguration().getMaxDeliver(); case "maxmessages": diff --git a/components/camel-nats/src/generated/java/org/apache/camel/component/nats/NatsEndpointUriFactory.java b/components/camel-nats/src/generated/java/org/apache/camel/component/nats/NatsEndpointUriFactory.java index 542551087298c..fd6be14547dd6 100644 --- a/components/camel-nats/src/generated/java/org/apache/camel/component/nats/NatsEndpointUriFactory.java +++ b/components/camel-nats/src/generated/java/org/apache/camel/component/nats/NatsEndpointUriFactory.java @@ -23,7 +23,7 @@ public class NatsEndpointUriFactory extends org.apache.camel.support.component.E private static final Set SECRET_PROPERTY_NAMES; private static final Map MULTI_VALUE_PREFIXES; static { - Set props = new HashSet<>(43); + Set props = new HashSet<>(44); props.add("ackPolicy"); props.add("ackWait"); props.add("bridgeErrorHandler"); @@ -41,6 +41,7 @@ public class NatsEndpointUriFactory extends org.apache.camel.support.component.E props.add("jetstreamEnabled"); props.add("jetstreamName"); props.add("lazyStartProducer"); + props.add("manualAck"); props.add("maxDeliver"); props.add("maxMessages"); props.add("maxPingsOut"); diff --git a/components/camel-nats/src/generated/resources/META-INF/org/apache/camel/component/nats/nats.json b/components/camel-nats/src/generated/resources/META-INF/org/apache/camel/component/nats/nats.json index a83b6b4b6373f..16a6c71fa2bbc 100644 --- a/components/camel-nats/src/generated/resources/META-INF/org/apache/camel/component/nats/nats.json +++ b/components/camel-nats/src/generated/resources/META-INF/org/apache/camel/component/nats/nats.json @@ -45,28 +45,29 @@ "ackWait": { "index": 18, "kind": "property", "displayName": "Ack Wait", "group": "consumer", "label": "consumer", "required": false, "type": "integer", "javaType": "long", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 30000, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "After a message is delivered to a consumer, the server waits 30 seconds (default) for an acknowledgement. If none arrives (timeout), the message becomes eligible for redelivery." }, "bridgeErrorHandler": { "index": 19, "kind": "property", "displayName": "Bridge Error Handler", "group": "consumer", "label": "consumer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Allows for bridging the consumer to the Camel routing Error Handler, which mean any exceptions (if possible) occurred while the Camel consumer is trying to pickup incoming messages, or the likes, will now be processed as a message and handled by the routing Error Handler. Important: This is only possible if the 3rd party component allows Camel to be alerted if an exception was thrown. Some components handle this internally only, and therefore bridgeErrorHandler is not possible. In other situations we may improve the Camel component to hook into the 3rd party component and make this possible for future releases. By default the consumer will use the org.apache.camel.spi.ExceptionHandler to deal with exceptions, that will be logged at WARN or ERROR level and ignored." }, "durableName": { "index": 20, "kind": "property", "displayName": "Durable Name", "group": "consumer", "label": "consumer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Sets the name to assign to the JetStream durable consumer. Setting this value makes the consumer durable. The value is used to set the durable() field in the underlying NATS ConsumerConfiguration.Builder." }, - "maxDeliver": { "index": 21, "kind": "property", "displayName": "Max Deliver", "group": "consumer", "label": "consumer", "required": false, "type": "integer", "javaType": "long", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Maximum number of attempts to deliver a message from Nats to a consumer. Once MaxDeliver is reached, the NATS server stops attempting to deliver that specific message. The message is not deleted, it remains in the stream but is simply skipped. It is recommended to set this option to a sensible value in case a message is poison and can not successfully be processed and would always keep failing." }, - "maxMessages": { "index": 22, "kind": "property", "displayName": "Max Messages", "group": "consumer", "label": "consumer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Stop receiving messages from a topic we are subscribing to after maxMessages" }, - "nackWait": { "index": 23, "kind": "property", "displayName": "Nack Wait", "group": "consumer", "label": "consumer", "required": false, "type": "integer", "javaType": "long", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 5000, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "For negative acknowledgements (NAK), redelivery is delayed by 5 seconds (default). Setting this to 0 or negative makes the redelivery immediately. Be careful as this can cause the consumer to keep re-processing the same message over and over again due to intermediate error that last a while." }, - "poolSize": { "index": 24, "kind": "property", "displayName": "Pool Size", "group": "consumer", "label": "consumer", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 10, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Consumer thread pool size (default is 10)" }, - "pullBatchSize": { "index": 25, "kind": "property", "displayName": "Pull Batch Size", "group": "consumer", "label": "consumer", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 10, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Maximum number of messages to fetch per pull request when using a JetStream Pull Subscription. Only used when {code pullSubscription=true}." }, - "pullFetchTimeout": { "index": 26, "kind": "property", "displayName": "Pull Fetch Timeout", "group": "consumer", "label": "consumer", "required": false, "type": "integer", "javaType": "long", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 1000, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Maximum time (in milliseconds) to wait for a batch of messages to be available on the server during a single fetch when using a JetStream Pull Subscription. Only used when {code pullSubscription=true}." }, - "pullSubscription": { "index": 27, "kind": "property", "displayName": "Pull Subscription", "group": "consumer", "label": "consumer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": true, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Sets the consumer subscription type for JetStream. Set to true to use a Pull Subscription (consumer explicitly requests messages). Set to false to use a Push Subscription (messages are automatically delivered)." }, - "queueName": { "index": 28, "kind": "property", "displayName": "Queue Name", "group": "consumer", "label": "consumer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "The Queue name if we are using nats for a queue configuration" }, - "replyToDisabled": { "index": 29, "kind": "property", "displayName": "Reply To Disabled", "group": "consumer", "label": "consumer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Can be used to turn off sending back reply message in the consumer." }, - "consumerConfiguration": { "index": 30, "kind": "property", "displayName": "Consumer Configuration", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "object", "javaType": "io.nats.client.api.ConsumerConfiguration", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Sets a custom ConsumerConfiguration object for the JetStream consumer. This is an advanced option typically used when you need to configure properties not exposed as simple Camel URI parameters. When set, this object will be used to build the final consumer subscription options." }, - "lazyStartProducer": { "index": 31, "kind": "property", "displayName": "Lazy Start Producer", "group": "producer", "label": "producer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Whether the producer should be started lazy (on the first message). By starting lazy you can use this to allow CamelContext and routes to startup in situations where a producer may otherwise fail during starting and cause the route to fail being started. By deferring this startup to be lazy then the startup failure can be handled during routing messages via Camel's routing error handlers. Beware that when the first message is processed then creating and starting the producer may take a little time and prolong the total processing time of the processing." }, - "replySubject": { "index": 32, "kind": "property", "displayName": "Reply Subject", "group": "producer", "label": "producer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "the subject to which subscribers should send response" }, - "requestTimeout": { "index": 33, "kind": "property", "displayName": "Request Timeout", "group": "producer", "label": "producer", "required": false, "type": "integer", "javaType": "long", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 20000, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Request timeout in milliseconds" }, - "autowiredEnabled": { "index": 34, "kind": "property", "displayName": "Autowired Enabled", "group": "advanced", "label": "advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": true, "description": "Whether autowiring is enabled. This is used for automatic autowiring options (the option must be marked as autowired) by looking up in the registry to find if there is a single instance of matching type, which then gets configured on the component. This can be used for automatic configuring JDBC data sources, JMS connection factories, AWS Clients, etc." }, - "connection": { "index": 35, "kind": "property", "displayName": "Connection", "group": "advanced", "label": "advanced", "required": false, "type": "object", "javaType": "io.nats.client.Connection", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Reference an already instantiated connection to Nats server" }, - "headerFilterStrategy": { "index": 36, "kind": "property", "displayName": "Header Filter Strategy", "group": "advanced", "label": "advanced", "required": false, "type": "object", "javaType": "org.apache.camel.spi.HeaderFilterStrategy", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "To use a custom header filter strategy." }, - "jetstreamAsync": { "index": 37, "kind": "property", "displayName": "Jetstream Async", "group": "advanced", "label": "advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": true, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Sets whether to operate JetStream requests asynchronously." }, - "traceConnection": { "index": 38, "kind": "property", "displayName": "Trace Connection", "group": "advanced", "label": "advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Whether or not connection trace messages should be printed to standard out for fine grained debugging of connection issues." }, - "credentialsFilePath": { "index": 39, "kind": "property", "displayName": "Credentials File Path", "group": "security", "label": "security", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "If we use useCredentialsFile to true we'll need to set the credentialsFilePath option. It can be loaded by default from classpath, but you can prefix with classpath:, file:, or http: to load the resource from different systems." }, - "secure": { "index": 40, "kind": "property", "displayName": "Secure", "group": "security", "label": "security", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Set secure option indicating TLS is required" }, - "sslContextParameters": { "index": 41, "kind": "property", "displayName": "Ssl Context Parameters", "group": "security", "label": "security", "required": false, "type": "object", "javaType": "org.apache.camel.support.jsse.SSLContextParameters", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "To configure security using SSLContextParameters" }, - "useGlobalSslContextParameters": { "index": 42, "kind": "property", "displayName": "Use Global Ssl Context Parameters", "group": "security", "label": "security", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Enable usage of global SSL context parameters." } + "manualAck": { "index": 21, "kind": "property", "displayName": "Manual Ack", "group": "consumer", "label": "consumer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Whether to allow doing manual acknowledgment via NatsManualAck. If this option is enabled then an instance of NatsManualAck is stored on the org.apache.camel.Exchange message header, which allows end users to access this API and perform manual ack\/nak\/term operations via the JetStream consumer. When enabled, the automatic acknowledgment on exchange completion is disabled. If the user does not call any ack method, the message remains unacknowledged and NATS will redeliver it after the ackWait timeout expires. This option is only applicable when JetStream is enabled (jetstreamEnabled=true). It has no effect when ackPolicy=None since the server acknowledges messages automatically on delivery." }, + "maxDeliver": { "index": 22, "kind": "property", "displayName": "Max Deliver", "group": "consumer", "label": "consumer", "required": false, "type": "integer", "javaType": "long", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Maximum number of attempts to deliver a message from Nats to a consumer. Once MaxDeliver is reached, the NATS server stops attempting to deliver that specific message. The message is not deleted, it remains in the stream but is simply skipped. It is recommended to set this option to a sensible value in case a message is poison and can not successfully be processed and would always keep failing." }, + "maxMessages": { "index": 23, "kind": "property", "displayName": "Max Messages", "group": "consumer", "label": "consumer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Stop receiving messages from a topic we are subscribing to after maxMessages" }, + "nackWait": { "index": 24, "kind": "property", "displayName": "Nack Wait", "group": "consumer", "label": "consumer", "required": false, "type": "integer", "javaType": "long", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 5000, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "For negative acknowledgements (NAK), redelivery is delayed by 5 seconds (default). Setting this to 0 or negative makes the redelivery immediately. Be careful as this can cause the consumer to keep re-processing the same message over and over again due to intermediate error that last a while." }, + "poolSize": { "index": 25, "kind": "property", "displayName": "Pool Size", "group": "consumer", "label": "consumer", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 10, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Consumer thread pool size (default is 10)" }, + "pullBatchSize": { "index": 26, "kind": "property", "displayName": "Pull Batch Size", "group": "consumer", "label": "consumer", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 10, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Maximum number of messages to fetch per pull request when using a JetStream Pull Subscription. Only used when {code pullSubscription=true}." }, + "pullFetchTimeout": { "index": 27, "kind": "property", "displayName": "Pull Fetch Timeout", "group": "consumer", "label": "consumer", "required": false, "type": "integer", "javaType": "long", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 1000, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Maximum time (in milliseconds) to wait for a batch of messages to be available on the server during a single fetch when using a JetStream Pull Subscription. Only used when {code pullSubscription=true}." }, + "pullSubscription": { "index": 28, "kind": "property", "displayName": "Pull Subscription", "group": "consumer", "label": "consumer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": true, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Sets the consumer subscription type for JetStream. Set to true to use a Pull Subscription (consumer explicitly requests messages). Set to false to use a Push Subscription (messages are automatically delivered)." }, + "queueName": { "index": 29, "kind": "property", "displayName": "Queue Name", "group": "consumer", "label": "consumer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "The Queue name if we are using nats for a queue configuration" }, + "replyToDisabled": { "index": 30, "kind": "property", "displayName": "Reply To Disabled", "group": "consumer", "label": "consumer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Can be used to turn off sending back reply message in the consumer." }, + "consumerConfiguration": { "index": 31, "kind": "property", "displayName": "Consumer Configuration", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "object", "javaType": "io.nats.client.api.ConsumerConfiguration", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Sets a custom ConsumerConfiguration object for the JetStream consumer. This is an advanced option typically used when you need to configure properties not exposed as simple Camel URI parameters. When set, this object will be used to build the final consumer subscription options." }, + "lazyStartProducer": { "index": 32, "kind": "property", "displayName": "Lazy Start Producer", "group": "producer", "label": "producer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Whether the producer should be started lazy (on the first message). By starting lazy you can use this to allow CamelContext and routes to startup in situations where a producer may otherwise fail during starting and cause the route to fail being started. By deferring this startup to be lazy then the startup failure can be handled during routing messages via Camel's routing error handlers. Beware that when the first message is processed then creating and starting the producer may take a little time and prolong the total processing time of the processing." }, + "replySubject": { "index": 33, "kind": "property", "displayName": "Reply Subject", "group": "producer", "label": "producer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "the subject to which subscribers should send response" }, + "requestTimeout": { "index": 34, "kind": "property", "displayName": "Request Timeout", "group": "producer", "label": "producer", "required": false, "type": "integer", "javaType": "long", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 20000, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Request timeout in milliseconds" }, + "autowiredEnabled": { "index": 35, "kind": "property", "displayName": "Autowired Enabled", "group": "advanced", "label": "advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": true, "description": "Whether autowiring is enabled. This is used for automatic autowiring options (the option must be marked as autowired) by looking up in the registry to find if there is a single instance of matching type, which then gets configured on the component. This can be used for automatic configuring JDBC data sources, JMS connection factories, AWS Clients, etc." }, + "connection": { "index": 36, "kind": "property", "displayName": "Connection", "group": "advanced", "label": "advanced", "required": false, "type": "object", "javaType": "io.nats.client.Connection", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Reference an already instantiated connection to Nats server" }, + "headerFilterStrategy": { "index": 37, "kind": "property", "displayName": "Header Filter Strategy", "group": "advanced", "label": "advanced", "required": false, "type": "object", "javaType": "org.apache.camel.spi.HeaderFilterStrategy", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "To use a custom header filter strategy." }, + "jetstreamAsync": { "index": 38, "kind": "property", "displayName": "Jetstream Async", "group": "advanced", "label": "advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": true, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Sets whether to operate JetStream requests asynchronously." }, + "traceConnection": { "index": 39, "kind": "property", "displayName": "Trace Connection", "group": "advanced", "label": "advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Whether or not connection trace messages should be printed to standard out for fine grained debugging of connection issues." }, + "credentialsFilePath": { "index": 40, "kind": "property", "displayName": "Credentials File Path", "group": "security", "label": "security", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "If we use useCredentialsFile to true we'll need to set the credentialsFilePath option. It can be loaded by default from classpath, but you can prefix with classpath:, file:, or http: to load the resource from different systems." }, + "secure": { "index": 41, "kind": "property", "displayName": "Secure", "group": "security", "label": "security", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Set secure option indicating TLS is required" }, + "sslContextParameters": { "index": 42, "kind": "property", "displayName": "Ssl Context Parameters", "group": "security", "label": "security", "required": false, "type": "object", "javaType": "org.apache.camel.support.jsse.SSLContextParameters", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "To configure security using SSLContextParameters" }, + "useGlobalSslContextParameters": { "index": 43, "kind": "property", "displayName": "Use Global Ssl Context Parameters", "group": "security", "label": "security", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Enable usage of global SSL context parameters." } }, "headers": { "CamelNatsMessageTimestamp": { "index": 0, "kind": "header", "displayName": "", "group": "common", "label": "", "required": false, "javaType": "long", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The timestamp of a consumed message.", "constantName": "org.apache.camel.component.nats.NatsConstants#NATS_MESSAGE_TIMESTAMP" }, @@ -76,7 +77,8 @@ "CamelNatsQueueName": { "index": 4, "kind": "header", "displayName": "", "group": "common", "label": "", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The Queue name of a consumed message (may be null).", "constantName": "org.apache.camel.component.nats.NatsConstants#NATS_QUEUE_NAME" }, "CamelNatsStatusCode": { "index": 5, "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "int", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "important": true, "description": "Status message code", "constantName": "org.apache.camel.component.nats.NatsConstants#NATS_STATUS_CODE" }, "CamelNatsStatusError": { "index": 6, "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "important": true, "description": "Status message error message", "constantName": "org.apache.camel.component.nats.NatsConstants#NATS_STATUS_ERROR" }, - "CamelNatsDeliveryCounter": { "index": 7, "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "long", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "important": true, "description": "Number of times this message has been delivered (1 = first, 1 then message has been redelivered)", "constantName": "org.apache.camel.component.nats.NatsConstants#NATS_DELIVERY_COUNTER" } + "CamelNatsDeliveryCounter": { "index": 7, "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "long", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "important": true, "description": "Number of times this message has been delivered (1 = first, 1 then message has been redelivered)", "constantName": "org.apache.camel.component.nats.NatsConstants#NATS_DELIVERY_COUNTER" }, + "CamelNatsManualAck": { "index": 8, "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "org.apache.camel.component.nats.NatsManualAck", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The manual acknowledgment handle for JetStream messages (only set when manualAck=true).", "constantName": "org.apache.camel.component.nats.NatsConstants#NATS_MANUAL_ACK" } }, "properties": { "topic": { "index": 0, "kind": "path", "displayName": "Topic", "group": "common", "label": "", "required": true, "type": "string", "javaType": "java.lang.String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "The name of topic we want to use" }, @@ -99,28 +101,29 @@ "ackPolicy": { "index": 17, "kind": "parameter", "displayName": "Ack Policy", "group": "consumer", "label": "consumer", "required": false, "type": "enum", "javaType": "io.nats.client.api.AckPolicy", "enum": [ "None", "All", "Explicit" ], "deprecated": false, "autowired": false, "secret": false, "defaultValue": "Explicit", "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Acknowledgement mode. none = Messages are acknowledged as soon as the server sends them (danger: messages that Camel failed to process is also ack). Clients do not need to ack. all = All messages with a sequence number less than the message acked are also acknowledged. E.g. reading a batch of messages 1..100. Ack on message 100 will acknowledge 1..99 as well. explicit (default) = Each message is acknowledged individually by Camel after the message has been processed, this ensures the message is only ack if success and nack if processing failed due to an exception during routing. Message can be acked out of sequence and create gaps of unacknowledged messages in the consumer." }, "ackWait": { "index": 18, "kind": "parameter", "displayName": "Ack Wait", "group": "consumer", "label": "consumer", "required": false, "type": "integer", "javaType": "long", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 30000, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "After a message is delivered to a consumer, the server waits 30 seconds (default) for an acknowledgement. If none arrives (timeout), the message becomes eligible for redelivery." }, "durableName": { "index": 19, "kind": "parameter", "displayName": "Durable Name", "group": "consumer", "label": "consumer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Sets the name to assign to the JetStream durable consumer. Setting this value makes the consumer durable. The value is used to set the durable() field in the underlying NATS ConsumerConfiguration.Builder." }, - "maxDeliver": { "index": 20, "kind": "parameter", "displayName": "Max Deliver", "group": "consumer", "label": "consumer", "required": false, "type": "integer", "javaType": "long", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Maximum number of attempts to deliver a message from Nats to a consumer. Once MaxDeliver is reached, the NATS server stops attempting to deliver that specific message. The message is not deleted, it remains in the stream but is simply skipped. It is recommended to set this option to a sensible value in case a message is poison and can not successfully be processed and would always keep failing." }, - "maxMessages": { "index": 21, "kind": "parameter", "displayName": "Max Messages", "group": "consumer", "label": "consumer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Stop receiving messages from a topic we are subscribing to after maxMessages" }, - "nackWait": { "index": 22, "kind": "parameter", "displayName": "Nack Wait", "group": "consumer", "label": "consumer", "required": false, "type": "integer", "javaType": "long", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 5000, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "For negative acknowledgements (NAK), redelivery is delayed by 5 seconds (default). Setting this to 0 or negative makes the redelivery immediately. Be careful as this can cause the consumer to keep re-processing the same message over and over again due to intermediate error that last a while." }, - "poolSize": { "index": 23, "kind": "parameter", "displayName": "Pool Size", "group": "consumer", "label": "consumer", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 10, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Consumer thread pool size (default is 10)" }, - "pullBatchSize": { "index": 24, "kind": "parameter", "displayName": "Pull Batch Size", "group": "consumer", "label": "consumer", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 10, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Maximum number of messages to fetch per pull request when using a JetStream Pull Subscription. Only used when {code pullSubscription=true}." }, - "pullFetchTimeout": { "index": 25, "kind": "parameter", "displayName": "Pull Fetch Timeout", "group": "consumer", "label": "consumer", "required": false, "type": "integer", "javaType": "long", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 1000, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Maximum time (in milliseconds) to wait for a batch of messages to be available on the server during a single fetch when using a JetStream Pull Subscription. Only used when {code pullSubscription=true}." }, - "pullSubscription": { "index": 26, "kind": "parameter", "displayName": "Pull Subscription", "group": "consumer", "label": "consumer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": true, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Sets the consumer subscription type for JetStream. Set to true to use a Pull Subscription (consumer explicitly requests messages). Set to false to use a Push Subscription (messages are automatically delivered)." }, - "queueName": { "index": 27, "kind": "parameter", "displayName": "Queue Name", "group": "consumer", "label": "consumer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "The Queue name if we are using nats for a queue configuration" }, - "replyToDisabled": { "index": 28, "kind": "parameter", "displayName": "Reply To Disabled", "group": "consumer", "label": "consumer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Can be used to turn off sending back reply message in the consumer." }, - "bridgeErrorHandler": { "index": 29, "kind": "parameter", "displayName": "Bridge Error Handler", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Allows for bridging the consumer to the Camel routing Error Handler, which mean any exceptions (if possible) occurred while the Camel consumer is trying to pickup incoming messages, or the likes, will now be processed as a message and handled by the routing Error Handler. Important: This is only possible if the 3rd party component allows Camel to be alerted if an exception was thrown. Some components handle this internally only, and therefore bridgeErrorHandler is not possible. In other situations we may improve the Camel component to hook into the 3rd party component and make this possible for future releases. By default the consumer will use the org.apache.camel.spi.ExceptionHandler to deal with exceptions, that will be logged at WARN or ERROR level and ignored." }, - "consumerConfiguration": { "index": 30, "kind": "parameter", "displayName": "Consumer Configuration", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "object", "javaType": "io.nats.client.api.ConsumerConfiguration", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Sets a custom ConsumerConfiguration object for the JetStream consumer. This is an advanced option typically used when you need to configure properties not exposed as simple Camel URI parameters. When set, this object will be used to build the final consumer subscription options." }, - "exceptionHandler": { "index": 31, "kind": "parameter", "displayName": "Exception Handler", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "object", "javaType": "org.apache.camel.spi.ExceptionHandler", "optionalPrefix": "consumer.", "deprecated": false, "autowired": false, "secret": false, "description": "To let the consumer use a custom ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this option is not in use. By default the consumer will deal with exceptions, that will be logged at WARN or ERROR level and ignored." }, - "exchangePattern": { "index": 32, "kind": "parameter", "displayName": "Exchange Pattern", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "enum", "javaType": "org.apache.camel.ExchangePattern", "enum": [ "InOnly", "InOut" ], "deprecated": false, "autowired": false, "secret": false, "description": "Sets the exchange pattern when the consumer creates an exchange." }, - "replySubject": { "index": 33, "kind": "parameter", "displayName": "Reply Subject", "group": "producer", "label": "producer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "the subject to which subscribers should send response" }, - "requestTimeout": { "index": 34, "kind": "parameter", "displayName": "Request Timeout", "group": "producer", "label": "producer", "required": false, "type": "integer", "javaType": "long", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 20000, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Request timeout in milliseconds" }, - "lazyStartProducer": { "index": 35, "kind": "parameter", "displayName": "Lazy Start Producer", "group": "producer (advanced)", "label": "producer,advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Whether the producer should be started lazy (on the first message). By starting lazy you can use this to allow CamelContext and routes to startup in situations where a producer may otherwise fail during starting and cause the route to fail being started. By deferring this startup to be lazy then the startup failure can be handled during routing messages via Camel's routing error handlers. Beware that when the first message is processed then creating and starting the producer may take a little time and prolong the total processing time of the processing." }, - "connection": { "index": 36, "kind": "parameter", "displayName": "Connection", "group": "advanced", "label": "advanced", "required": false, "type": "object", "javaType": "io.nats.client.Connection", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Reference an already instantiated connection to Nats server" }, - "headerFilterStrategy": { "index": 37, "kind": "parameter", "displayName": "Header Filter Strategy", "group": "advanced", "label": "advanced", "required": false, "type": "object", "javaType": "org.apache.camel.spi.HeaderFilterStrategy", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "To use a custom header filter strategy." }, - "jetstreamAsync": { "index": 38, "kind": "parameter", "displayName": "Jetstream Async", "group": "advanced", "label": "advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": true, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Sets whether to operate JetStream requests asynchronously." }, - "traceConnection": { "index": 39, "kind": "parameter", "displayName": "Trace Connection", "group": "advanced", "label": "advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Whether or not connection trace messages should be printed to standard out for fine grained debugging of connection issues." }, - "credentialsFilePath": { "index": 40, "kind": "parameter", "displayName": "Credentials File Path", "group": "security", "label": "security", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "If we use useCredentialsFile to true we'll need to set the credentialsFilePath option. It can be loaded by default from classpath, but you can prefix with classpath:, file:, or http: to load the resource from different systems." }, - "secure": { "index": 41, "kind": "parameter", "displayName": "Secure", "group": "security", "label": "security", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Set secure option indicating TLS is required" }, - "sslContextParameters": { "index": 42, "kind": "parameter", "displayName": "Ssl Context Parameters", "group": "security", "label": "security", "required": false, "type": "object", "javaType": "org.apache.camel.support.jsse.SSLContextParameters", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "To configure security using SSLContextParameters" } + "manualAck": { "index": 20, "kind": "parameter", "displayName": "Manual Ack", "group": "consumer", "label": "consumer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Whether to allow doing manual acknowledgment via NatsManualAck. If this option is enabled then an instance of NatsManualAck is stored on the org.apache.camel.Exchange message header, which allows end users to access this API and perform manual ack\/nak\/term operations via the JetStream consumer. When enabled, the automatic acknowledgment on exchange completion is disabled. If the user does not call any ack method, the message remains unacknowledged and NATS will redeliver it after the ackWait timeout expires. This option is only applicable when JetStream is enabled (jetstreamEnabled=true). It has no effect when ackPolicy=None since the server acknowledges messages automatically on delivery." }, + "maxDeliver": { "index": 21, "kind": "parameter", "displayName": "Max Deliver", "group": "consumer", "label": "consumer", "required": false, "type": "integer", "javaType": "long", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Maximum number of attempts to deliver a message from Nats to a consumer. Once MaxDeliver is reached, the NATS server stops attempting to deliver that specific message. The message is not deleted, it remains in the stream but is simply skipped. It is recommended to set this option to a sensible value in case a message is poison and can not successfully be processed and would always keep failing." }, + "maxMessages": { "index": 22, "kind": "parameter", "displayName": "Max Messages", "group": "consumer", "label": "consumer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Stop receiving messages from a topic we are subscribing to after maxMessages" }, + "nackWait": { "index": 23, "kind": "parameter", "displayName": "Nack Wait", "group": "consumer", "label": "consumer", "required": false, "type": "integer", "javaType": "long", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 5000, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "For negative acknowledgements (NAK), redelivery is delayed by 5 seconds (default). Setting this to 0 or negative makes the redelivery immediately. Be careful as this can cause the consumer to keep re-processing the same message over and over again due to intermediate error that last a while." }, + "poolSize": { "index": 24, "kind": "parameter", "displayName": "Pool Size", "group": "consumer", "label": "consumer", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 10, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Consumer thread pool size (default is 10)" }, + "pullBatchSize": { "index": 25, "kind": "parameter", "displayName": "Pull Batch Size", "group": "consumer", "label": "consumer", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 10, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Maximum number of messages to fetch per pull request when using a JetStream Pull Subscription. Only used when {code pullSubscription=true}." }, + "pullFetchTimeout": { "index": 26, "kind": "parameter", "displayName": "Pull Fetch Timeout", "group": "consumer", "label": "consumer", "required": false, "type": "integer", "javaType": "long", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 1000, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Maximum time (in milliseconds) to wait for a batch of messages to be available on the server during a single fetch when using a JetStream Pull Subscription. Only used when {code pullSubscription=true}." }, + "pullSubscription": { "index": 27, "kind": "parameter", "displayName": "Pull Subscription", "group": "consumer", "label": "consumer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": true, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Sets the consumer subscription type for JetStream. Set to true to use a Pull Subscription (consumer explicitly requests messages). Set to false to use a Push Subscription (messages are automatically delivered)." }, + "queueName": { "index": 28, "kind": "parameter", "displayName": "Queue Name", "group": "consumer", "label": "consumer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "The Queue name if we are using nats for a queue configuration" }, + "replyToDisabled": { "index": 29, "kind": "parameter", "displayName": "Reply To Disabled", "group": "consumer", "label": "consumer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Can be used to turn off sending back reply message in the consumer." }, + "bridgeErrorHandler": { "index": 30, "kind": "parameter", "displayName": "Bridge Error Handler", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Allows for bridging the consumer to the Camel routing Error Handler, which mean any exceptions (if possible) occurred while the Camel consumer is trying to pickup incoming messages, or the likes, will now be processed as a message and handled by the routing Error Handler. Important: This is only possible if the 3rd party component allows Camel to be alerted if an exception was thrown. Some components handle this internally only, and therefore bridgeErrorHandler is not possible. In other situations we may improve the Camel component to hook into the 3rd party component and make this possible for future releases. By default the consumer will use the org.apache.camel.spi.ExceptionHandler to deal with exceptions, that will be logged at WARN or ERROR level and ignored." }, + "consumerConfiguration": { "index": 31, "kind": "parameter", "displayName": "Consumer Configuration", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "object", "javaType": "io.nats.client.api.ConsumerConfiguration", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Sets a custom ConsumerConfiguration object for the JetStream consumer. This is an advanced option typically used when you need to configure properties not exposed as simple Camel URI parameters. When set, this object will be used to build the final consumer subscription options." }, + "exceptionHandler": { "index": 32, "kind": "parameter", "displayName": "Exception Handler", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "object", "javaType": "org.apache.camel.spi.ExceptionHandler", "optionalPrefix": "consumer.", "deprecated": false, "autowired": false, "secret": false, "description": "To let the consumer use a custom ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this option is not in use. By default the consumer will deal with exceptions, that will be logged at WARN or ERROR level and ignored." }, + "exchangePattern": { "index": 33, "kind": "parameter", "displayName": "Exchange Pattern", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "enum", "javaType": "org.apache.camel.ExchangePattern", "enum": [ "InOnly", "InOut" ], "deprecated": false, "autowired": false, "secret": false, "description": "Sets the exchange pattern when the consumer creates an exchange." }, + "replySubject": { "index": 34, "kind": "parameter", "displayName": "Reply Subject", "group": "producer", "label": "producer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "the subject to which subscribers should send response" }, + "requestTimeout": { "index": 35, "kind": "parameter", "displayName": "Request Timeout", "group": "producer", "label": "producer", "required": false, "type": "integer", "javaType": "long", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 20000, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Request timeout in milliseconds" }, + "lazyStartProducer": { "index": 36, "kind": "parameter", "displayName": "Lazy Start Producer", "group": "producer (advanced)", "label": "producer,advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Whether the producer should be started lazy (on the first message). By starting lazy you can use this to allow CamelContext and routes to startup in situations where a producer may otherwise fail during starting and cause the route to fail being started. By deferring this startup to be lazy then the startup failure can be handled during routing messages via Camel's routing error handlers. Beware that when the first message is processed then creating and starting the producer may take a little time and prolong the total processing time of the processing." }, + "connection": { "index": 37, "kind": "parameter", "displayName": "Connection", "group": "advanced", "label": "advanced", "required": false, "type": "object", "javaType": "io.nats.client.Connection", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Reference an already instantiated connection to Nats server" }, + "headerFilterStrategy": { "index": 38, "kind": "parameter", "displayName": "Header Filter Strategy", "group": "advanced", "label": "advanced", "required": false, "type": "object", "javaType": "org.apache.camel.spi.HeaderFilterStrategy", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "To use a custom header filter strategy." }, + "jetstreamAsync": { "index": 39, "kind": "parameter", "displayName": "Jetstream Async", "group": "advanced", "label": "advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": true, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Sets whether to operate JetStream requests asynchronously." }, + "traceConnection": { "index": 40, "kind": "parameter", "displayName": "Trace Connection", "group": "advanced", "label": "advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Whether or not connection trace messages should be printed to standard out for fine grained debugging of connection issues." }, + "credentialsFilePath": { "index": 41, "kind": "parameter", "displayName": "Credentials File Path", "group": "security", "label": "security", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "If we use useCredentialsFile to true we'll need to set the credentialsFilePath option. It can be loaded by default from classpath, but you can prefix with classpath:, file:, or http: to load the resource from different systems." }, + "secure": { "index": 42, "kind": "parameter", "displayName": "Secure", "group": "security", "label": "security", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Set secure option indicating TLS is required" }, + "sslContextParameters": { "index": 43, "kind": "parameter", "displayName": "Ssl Context Parameters", "group": "security", "label": "security", "required": false, "type": "object", "javaType": "org.apache.camel.support.jsse.SSLContextParameters", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "To configure security using SSLContextParameters" } } } diff --git a/components/camel-nats/src/main/docs/nats-component.adoc b/components/camel-nats/src/main/docs/nats-component.adoc index 7ad6f601835ef..368adc97304eb 100644 --- a/components/camel-nats/src/main/docs/nats-component.adoc +++ b/components/camel-nats/src/main/docs/nats-component.adoc @@ -125,5 +125,28 @@ from("nats:mytopic?maxMessages=5&queueName=myqueue") .to("mock:result"); ---- +=== Manual Acknowledgment (JetStream) + +When consuming from JetStream, by default messages are automatically acknowledged +after successful route processing, or negatively acknowledged (redelivered) on failure. + +To take full control of acknowledgment, set `manualAck=true` on the consumer endpoint. +This disables automatic acknowledgment and exposes a `NatsManualAck` object as the +`CamelNatsManualAck` message header. + +[source,java] +---- +from("nats:mytopic?jetstreamEnabled=true&jetstreamName=mystream&durableName=myconsumer&pullSubscription=false&manualAck=true") + .process(exchange -> { + // do work ... + + NatsManualAck manualAck = exchange.getIn().getHeader("CamelNatsManualAck", NatsManualAck.class); + manualAck.ack(); + }); +---- + +If the route completes without calling any NatsManualAck method, the message remains unacknowledged +and NATS will redeliver it after `ackWait` expires (default 30 seconds). + include::spring-boot:partial$starter.adoc[] diff --git a/components/camel-nats/src/main/java/org/apache/camel/component/nats/DefaultNatsManualAck.java b/components/camel-nats/src/main/java/org/apache/camel/component/nats/DefaultNatsManualAck.java new file mode 100644 index 0000000000000..7d3b011663ba7 --- /dev/null +++ b/components/camel-nats/src/main/java/org/apache/camel/component/nats/DefaultNatsManualAck.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.nats; + +import java.time.Duration; + +import io.nats.client.Message; + +class DefaultNatsManualAck implements NatsManualAck { + + private final Message message; + + DefaultNatsManualAck(Message message) { + this.message = message; + } + + @Override + public void ack() { + message.ack(); + } + + @Override + public void nak() { + message.nak(); + } + + @Override + public void nakWithDelay(Duration delay) { + message.nakWithDelay(delay); + } + + @Override + public void term() { + message.term(); + } + + @Override + public void inProgress() { + message.inProgress(); + } +} diff --git a/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConfiguration.java b/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConfiguration.java index ea2fa55136a50..00b489098c7bf 100644 --- a/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConfiguration.java +++ b/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConfiguration.java @@ -83,6 +83,8 @@ public class NatsConfiguration implements Cloneable { private long nackWait = 5000; @UriParam(label = "consumer") private long maxDeliver; + @UriParam(label = "consumer", defaultValue = "false") + private boolean manualAck; @UriParam(label = "consumer", defaultValue = "10") private int poolSize = 10; @UriParam(label = "common", defaultValue = "true") @@ -679,4 +681,25 @@ public long getMaxDeliver() { public void setMaxDeliver(long maxDeliver) { this.maxDeliver = maxDeliver; } + + public boolean isManualAck() { + return manualAck; + } + + /** + * Whether to allow doing manual acknowledgment via {@link NatsManualAck}. + *

+ * If this option is enabled then an instance of {@link NatsManualAck} is stored on the + * {@link org.apache.camel.Exchange} message header, which allows end users to access this API and perform manual + * ack/nak/term operations via the JetStream consumer. + *

+ * When enabled, the automatic acknowledgment on exchange completion is disabled. If the user does not call any ack + * method, the message remains unacknowledged and NATS will redeliver it after the ackWait timeout expires. + *

+ * This option is only applicable when JetStream is enabled (jetstreamEnabled=true). It has no effect when + * ackPolicy=None since the server acknowledges messages automatically on delivery. + */ + public void setManualAck(boolean manualAck) { + this.manualAck = manualAck; + } } diff --git a/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConstants.java b/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConstants.java index a897652ca6d8e..a0a1c883614f5 100644 --- a/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConstants.java +++ b/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConstants.java @@ -38,6 +38,10 @@ public interface NatsConstants { description = "Number of times this message has been delivered (1 = first, > 1 then message has been redelivered)", javaType = "long", important = true) String NATS_DELIVERY_COUNTER = "CamelNatsDeliveryCounter"; + @Metadata(label = "consumer", + description = "The manual acknowledgment handle for JetStream messages (only set when manualAck=true).", + javaType = "org.apache.camel.component.nats.NatsManualAck") + String NATS_MANUAL_ACK = "CamelNatsManualAck"; String NATS_REQUEST_TIMEOUT_THREAD_PROFILE_NAME = "CamelNatsRequestTimeoutExecutor"; } diff --git a/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConsumer.java b/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConsumer.java index c36d0be1fdccf..2a1ce6df61138 100644 --- a/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConsumer.java +++ b/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConsumer.java @@ -78,7 +78,15 @@ protected void doStart() throws Exception { ? this.getEndpoint().getConfiguration().getConnection() : this.getEndpoint().getConnection(); - this.executor.submit(new NatsConsumingTask(this.connection, this.getEndpoint().getConfiguration())); + NatsConfiguration config = this.getEndpoint().getConfiguration(); + if (config.isManualAck() && !config.isJetstreamEnabled()) { + LOG.warn("manualAck=true has no effect without jetstreamEnabled=true; standard NATS has no acknowledgment"); + } + if (config.isManualAck() && config.getAckPolicy() == AckPolicy.None) { + LOG.warn( + "manualAck=true with ackPolicy=None: the server acknowledges automatically on delivery, manual ack/nak calls will have no effect"); + } + this.executor.submit(new NatsConsumingTask(this.connection, config)); } @Override @@ -304,35 +312,38 @@ private void setupStandardNatsConsumer(String topic, String queueName, Integer m class CamelNatsMessageHandler implements MessageHandler { final boolean ackPolicyNone = configuration.getAckPolicy() == AckPolicy.None; + final boolean manualAckEnabled = configuration.isManualAck() && configuration.isJetstreamEnabled(); @Override public void onMessage(Message msg) throws InterruptedException { LOG.debug("Received Message: {}", msg); final Exchange exchange = NatsConsumer.this.createExchange(false); - exchange.getExchangeExtension().addOnCompletion(new SynchronizationAdapter() { - @Override - public void onComplete(Exchange exchange) { - LOG.debug("ACK"); - msg.ack(); - } - - @Override - public void onFailure(Exchange exchange) { - if (ackPolicyNone) { - // ACK policy is none which means that we should auto ACK even if the message processed failed in Camel + if (!manualAckEnabled) { + exchange.getExchangeExtension().addOnCompletion(new SynchronizationAdapter() { + @Override + public void onComplete(Exchange exchange) { LOG.debug("ACK"); msg.ack(); - } else { - LOG.debug("NACK (delay:{})", configuration.getNackWait()); - if (configuration.getNackWait() <= 0) { - msg.nak(); + } + + @Override + public void onFailure(Exchange exchange) { + if (ackPolicyNone) { + // ACK policy is none which means that we should auto ACK even if the message processed failed in Camel + LOG.debug("ACK"); + msg.ack(); } else { - msg.nakWithDelay(configuration.getNackWait()); + LOG.debug("NACK (delay:{})", configuration.getNackWait()); + if (configuration.getNackWait() <= 0) { + msg.nak(); + } else { + msg.nakWithDelay(configuration.getNackWait()); + } } } - } - }); + }); + } try { exchange.getIn().setBody(msg.getData()); exchange.getIn().setHeader(NatsConstants.NATS_REPLY_TO, msg.getReplyTo()); @@ -367,6 +378,10 @@ public void onFailure(Exchange exchange) { exchange.getMessage().setPayloadForTrait(MessageTrait.REDELIVERY, evalRedeliveryMessageTrait(msg, exchange)); + if (manualAckEnabled) { + exchange.getIn().setHeader(NatsConstants.NATS_MANUAL_ACK, new DefaultNatsManualAck(msg)); + } + NatsConsumer.this.processor.process(exchange); // is there a reply? diff --git a/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsManualAck.java b/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsManualAck.java new file mode 100644 index 0000000000000..e50ecd463911a --- /dev/null +++ b/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsManualAck.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.nats; + +import java.time.Duration; + +/** + * Allows manual acknowledgment of JetStream messages when using the NATS consumer with {@code manualAck=true}. + * + * @see NatsConstants#NATS_MANUAL_ACK + */ +public interface NatsManualAck { + + /** + * Acknowledge the message. + */ + void ack(); + + /** + * Negative acknowledge the message. The message will be redelivered immediately. + */ + void nak(); + + /** + * Negative acknowledge the message with a delay before redelivery. + */ + void nakWithDelay(Duration delay); + + /** + * Terminate delivery of this message. The server will stop redelivering it. + */ + void term(); + + /** + * Signal that processing is still in progress. Resets the {@code ackWait} timer to prevent the server from + * redelivering while long-running processing is ongoing. + */ + void inProgress(); +} diff --git a/components/camel-nats/src/test/java/org/apache/camel/component/nats/jetstream/NatsJetstreamConsumerManualAckIT.java b/components/camel-nats/src/test/java/org/apache/camel/component/nats/jetstream/NatsJetstreamConsumerManualAckIT.java new file mode 100644 index 0000000000000..84ad28d4863f4 --- /dev/null +++ b/components/camel-nats/src/test/java/org/apache/camel/component/nats/jetstream/NatsJetstreamConsumerManualAckIT.java @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.nats.jetstream; + +import java.time.Duration; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.camel.EndpointInject; +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.component.nats.NatsConstants; +import org.apache.camel.component.nats.NatsManualAck; +import org.apache.camel.component.nats.integration.NatsITSupport; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.parallel.Isolated; + +import static org.junit.jupiter.api.Assertions.assertInstanceOf; +import static org.junit.jupiter.api.Assertions.assertNotNull; + +@Isolated +public class NatsJetstreamConsumerManualAckIT extends NatsITSupport { + + @EndpointInject("mock:result") + protected MockEndpoint mockResultEndpoint; + + @EndpointInject("mock:input") + protected MockEndpoint mockInputEndpoint; + + @Test + public void testManualAck() throws Exception { + mockResultEndpoint.expectedBodiesReceived("Hello World"); + mockResultEndpoint.expectedHeaderReceived(NatsConstants.NATS_SUBJECT, "mytopic-manualack"); + + template.sendBody("direct:send", "Hello World"); + + mockResultEndpoint.setAssertPeriod(5000); + MockEndpoint.assertIsSatisfied(context); + } + + @Test + public void testManualNakWithDelay() throws Exception { + mockResultEndpoint.expectedBodiesReceived("Hello World"); + mockResultEndpoint.expectedHeaderReceived("counter", 2); + mockResultEndpoint.expectedHeaderReceived(NatsConstants.NATS_DELIVERY_COUNTER, 2); + + mockInputEndpoint.expectedMessageCount(2); + mockInputEndpoint.message(0).header(NatsConstants.NATS_DELIVERY_COUNTER).isEqualTo(1); + mockInputEndpoint.message(1).header(NatsConstants.NATS_DELIVERY_COUNTER).isEqualTo(2); + + template.sendBody("direct:send-nakdelay", "Hello World"); + + mockResultEndpoint.setAssertPeriod(5000); + mockInputEndpoint.setAssertPeriod(5000); + MockEndpoint.assertIsSatisfied(context); + } + + @Test + public void testManualTerm() throws Exception { + mockInputEndpoint.expectedMessageCount(1); + mockInputEndpoint.expectedHeaderReceived(NatsConstants.NATS_DELIVERY_COUNTER, 1); + + template.sendBody("direct:send-term", "Hello World"); + + mockInputEndpoint.setAssertPeriod(5000); + MockEndpoint.assertIsSatisfied(context); + } + + @Test + public void testManualNak() throws Exception { + mockResultEndpoint.expectedBodiesReceived("Hello World"); + mockResultEndpoint.expectedHeaderReceived("counter", 2); + mockResultEndpoint.expectedHeaderReceived(NatsConstants.NATS_DELIVERY_COUNTER, 2); + + mockInputEndpoint.expectedMessageCount(2); + mockInputEndpoint.message(0).header(NatsConstants.NATS_DELIVERY_COUNTER).isEqualTo(1); + mockInputEndpoint.message(1).header(NatsConstants.NATS_DELIVERY_COUNTER).isEqualTo(2); + + template.sendBody("direct:send-nak", "Hello World"); + + mockResultEndpoint.setAssertPeriod(5000); + mockInputEndpoint.setAssertPeriod(5000); + MockEndpoint.assertIsSatisfied(context); + } + + @Override + protected RouteBuilder createRouteBuilder() { + return new RouteBuilder() { + @Override + public void configure() { + String ackUri + = "nats:mytopic-manualack?jetstreamEnabled=true&jetstreamName=mystream-manualack&jetstreamAsync=false&durableName=camel-manualack&pullSubscription=false&manualAck=true"; + String nakUri + = "nats:mytopic-manualack-nak?jetstreamEnabled=true&jetstreamName=mystream-manualack-nak&jetstreamAsync=false&durableName=camel-manualack-nak&pullSubscription=false&manualAck=true&nackWait=10"; + String nakDelayUri + = "nats:mytopic-manualack-nakdelay?jetstreamEnabled=true&jetstreamName=mystream-manualack-nakdelay&jetstreamAsync=false&durableName=camel-manualack-nakdelay&pullSubscription=false&manualAck=true"; + String termUri + = "nats:mytopic-manualack-term?jetstreamEnabled=true&jetstreamName=mystream-manualack-term&jetstreamAsync=false&durableName=camel-manualack-term&pullSubscription=false&manualAck=true&nackWait=10&maxDeliver=5"; + + from("direct:send") + .errorHandler(defaultErrorHandler().maximumRedeliveries(5)) + .to(ackUri); + + from(ackUri) + .process(new Processor() { + @Override + public void process(Exchange exchange) throws Exception { + NatsManualAck manualAck + = exchange.getIn().getHeader(NatsConstants.NATS_MANUAL_ACK, NatsManualAck.class); + assertNotNull(manualAck); + assertInstanceOf(NatsManualAck.class, manualAck); + manualAck.ack(); + } + }) + .to(mockResultEndpoint); + + from("direct:send-nakdelay") + .errorHandler(defaultErrorHandler().maximumRedeliveries(5)) + .to(nakDelayUri); + + final AtomicInteger nakDelayCounter = new AtomicInteger(); + from(nakDelayUri) + .to("mock:input") + .process(new Processor() { + @Override + public void process(Exchange exchange) throws Exception { + NatsManualAck manualAck + = exchange.getIn().getHeader(NatsConstants.NATS_MANUAL_ACK, NatsManualAck.class); + if (nakDelayCounter.incrementAndGet() < 2) { + manualAck.nakWithDelay(Duration.ofMillis(100)); + exchange.setRouteStop(true); + } else { + manualAck.ack(); + exchange.getMessage().setHeader("counter", nakDelayCounter.intValue()); + } + } + }) + .to(mockResultEndpoint); + + from("direct:send-term") + .errorHandler(defaultErrorHandler().maximumRedeliveries(5)) + .to(termUri); + + from(termUri) + .to("mock:input") + .process(new Processor() { + @Override + public void process(Exchange exchange) throws Exception { + NatsManualAck manualAck + = exchange.getIn().getHeader(NatsConstants.NATS_MANUAL_ACK, NatsManualAck.class); + manualAck.term(); + exchange.setRouteStop(true); + } + }); + + from("direct:send-nak") + .errorHandler(defaultErrorHandler().maximumRedeliveries(5)) + .to(nakUri); + + final AtomicInteger counter = new AtomicInteger(); + from(nakUri) + .to("mock:input") + .process(new Processor() { + @Override + public void process(Exchange exchange) throws Exception { + NatsManualAck manualAck + = exchange.getIn().getHeader(NatsConstants.NATS_MANUAL_ACK, NatsManualAck.class); + if (counter.incrementAndGet() < 2) { + manualAck.nak(); + exchange.setRouteStop(true); + } else { + manualAck.ack(); + exchange.getMessage().setHeader("counter", counter.intValue()); + } + } + }) + .to(mockResultEndpoint); + } + }; + } +} diff --git a/components/camel-nats/src/test/java/org/apache/camel/component/nats/jetstream/NatsJetstreamConsumerManualAckPullIT.java b/components/camel-nats/src/test/java/org/apache/camel/component/nats/jetstream/NatsJetstreamConsumerManualAckPullIT.java new file mode 100644 index 0000000000000..4360130405c83 --- /dev/null +++ b/components/camel-nats/src/test/java/org/apache/camel/component/nats/jetstream/NatsJetstreamConsumerManualAckPullIT.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.nats.jetstream; + +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.camel.EndpointInject; +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.component.nats.NatsConstants; +import org.apache.camel.component.nats.NatsManualAck; +import org.apache.camel.component.nats.integration.NatsITSupport; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.parallel.Isolated; + +import static org.junit.jupiter.api.Assertions.assertNotNull; + +@Isolated +public class NatsJetstreamConsumerManualAckPullIT extends NatsITSupport { + + @EndpointInject("mock:result") + protected MockEndpoint mockResultEndpoint; + + @EndpointInject("mock:input") + protected MockEndpoint mockInputEndpoint; + + @Test + public void testManualAck() throws Exception { + mockResultEndpoint.expectedBodiesReceived("Hello World"); + mockResultEndpoint.expectedHeaderReceived(NatsConstants.NATS_SUBJECT, "mytopic-manualack-pull"); + + template.sendBody("direct:send", "Hello World"); + + mockResultEndpoint.setAssertPeriod(5000); + MockEndpoint.assertIsSatisfied(context); + } + + @Test + public void testManualNak() throws Exception { + mockResultEndpoint.expectedBodiesReceived("Hello World"); + mockResultEndpoint.expectedHeaderReceived("counter", 2); + mockResultEndpoint.expectedHeaderReceived(NatsConstants.NATS_DELIVERY_COUNTER, 2); + + mockInputEndpoint.expectedMessageCount(2); + mockInputEndpoint.message(0).header(NatsConstants.NATS_DELIVERY_COUNTER).isEqualTo(1); + mockInputEndpoint.message(1).header(NatsConstants.NATS_DELIVERY_COUNTER).isEqualTo(2); + + template.sendBody("direct:send-nak", "Hello World"); + + mockResultEndpoint.setAssertPeriod(5000); + mockInputEndpoint.setAssertPeriod(5000); + MockEndpoint.assertIsSatisfied(context); + } + + @Override + protected RouteBuilder createRouteBuilder() { + return new RouteBuilder() { + @Override + public void configure() { + String ackUri + = "nats:mytopic-manualack-pull?jetstreamEnabled=true&jetstreamName=mystream-manualack-pull&jetstreamAsync=false&durableName=camel-manualack-pull&pullSubscription=true&pullFetchTimeout=500&manualAck=true"; + String nakUri + = "nats:mytopic-manualack-pull-nak?jetstreamEnabled=true&jetstreamName=mystream-manualack-pull-nak&jetstreamAsync=false&durableName=camel-manualack-pull-nak&pullSubscription=true&pullFetchTimeout=500&manualAck=true&nackWait=10"; + + from("direct:send") + .errorHandler(defaultErrorHandler().maximumRedeliveries(5)) + .to(ackUri); + + from(ackUri) + .process(new Processor() { + @Override + public void process(Exchange exchange) throws Exception { + NatsManualAck manualAck + = exchange.getIn().getHeader(NatsConstants.NATS_MANUAL_ACK, NatsManualAck.class); + assertNotNull(manualAck); + manualAck.ack(); + } + }) + .to(mockResultEndpoint); + + from("direct:send-nak") + .errorHandler(defaultErrorHandler().maximumRedeliveries(5)) + .to(nakUri); + + final AtomicInteger counter = new AtomicInteger(); + from(nakUri) + .to("mock:input") + .process(new Processor() { + @Override + public void process(Exchange exchange) throws Exception { + NatsManualAck manualAck + = exchange.getIn().getHeader(NatsConstants.NATS_MANUAL_ACK, NatsManualAck.class); + if (counter.incrementAndGet() < 2) { + manualAck.nak(); + exchange.setRouteStop(true); + } else { + manualAck.ack(); + exchange.getMessage().setHeader("counter", counter.intValue()); + } + } + }) + .to(mockResultEndpoint); + } + }; + } +} diff --git a/dsl/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/NatsComponentBuilderFactory.java b/dsl/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/NatsComponentBuilderFactory.java index 74051188ea21f..e7a855382e1cc 100644 --- a/dsl/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/NatsComponentBuilderFactory.java +++ b/dsl/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/NatsComponentBuilderFactory.java @@ -433,6 +433,33 @@ default NatsComponentBuilder durableName(java.lang.String durableName) { return this; } + + /** + * Whether to allow doing manual acknowledgment via NatsManualAck. If + * this option is enabled then an instance of NatsManualAck is stored on + * the org.apache.camel.Exchange message header, which allows end users + * to access this API and perform manual ack/nak/term operations via the + * JetStream consumer. When enabled, the automatic acknowledgment on + * exchange completion is disabled. If the user does not call any ack + * method, the message remains unacknowledged and NATS will redeliver it + * after the ackWait timeout expires. This option is only applicable + * when JetStream is enabled (jetstreamEnabled=true). It has no effect + * when ackPolicy=None since the server acknowledges messages + * automatically on delivery. + * + * The option is a: <code>boolean</code> type. + * + * Default: false + * Group: consumer + * + * @param manualAck the value to set + * @return the dsl builder + */ + default NatsComponentBuilder manualAck(boolean manualAck) { + doSetProperty("manualAck", manualAck); + return this; + } + /** * Maximum number of attempts to deliver a message from Nats to a * consumer. Once MaxDeliver is reached, the NATS server stops @@ -875,6 +902,7 @@ protected boolean setPropertyOnComponent( case "ackWait": getOrCreateConfiguration((NatsComponent) component).setAckWait((long) value); return true; case "bridgeErrorHandler": ((NatsComponent) component).setBridgeErrorHandler((boolean) value); return true; case "durableName": getOrCreateConfiguration((NatsComponent) component).setDurableName((java.lang.String) value); return true; + case "manualAck": getOrCreateConfiguration((NatsComponent) component).setManualAck((boolean) value); return true; case "maxDeliver": getOrCreateConfiguration((NatsComponent) component).setMaxDeliver((long) value); return true; case "maxMessages": getOrCreateConfiguration((NatsComponent) component).setMaxMessages((java.lang.String) value); return true; case "nackWait": getOrCreateConfiguration((NatsComponent) component).setNackWait((long) value); return true; diff --git a/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/NatsEndpointBuilderFactory.java b/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/NatsEndpointBuilderFactory.java index 85a1442fb6474..ac2af7e2fc0e1 100644 --- a/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/NatsEndpointBuilderFactory.java +++ b/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/NatsEndpointBuilderFactory.java @@ -605,6 +605,56 @@ default NatsEndpointConsumerBuilder durableName(String durableName) { doSetProperty("durableName", durableName); return this; } + /** + * Whether to allow doing manual acknowledgment via NatsManualAck. If + * this option is enabled then an instance of NatsManualAck is stored on + * the org.apache.camel.Exchange message header, which allows end users + * to access this API and perform manual ack/nak/term operations via the + * JetStream consumer. When enabled, the automatic acknowledgment on + * exchange completion is disabled. If the user does not call any ack + * method, the message remains unacknowledged and NATS will redeliver it + * after the ackWait timeout expires. This option is only applicable + * when JetStream is enabled (jetstreamEnabled=true). It has no effect + * when ackPolicy=None since the server acknowledges messages + * automatically on delivery. + * + * The option is a: boolean type. + * + * Default: false + * Group: consumer + * + * @param manualAck the value to set + * @return the dsl builder + */ + default NatsEndpointConsumerBuilder manualAck(boolean manualAck) { + doSetProperty("manualAck", manualAck); + return this; + } + /** + * Whether to allow doing manual acknowledgment via NatsManualAck. If + * this option is enabled then an instance of NatsManualAck is stored on + * the org.apache.camel.Exchange message header, which allows end users + * to access this API and perform manual ack/nak/term operations via the + * JetStream consumer. When enabled, the automatic acknowledgment on + * exchange completion is disabled. If the user does not call any ack + * method, the message remains unacknowledged and NATS will redeliver it + * after the ackWait timeout expires. This option is only applicable + * when JetStream is enabled (jetstreamEnabled=true). It has no effect + * when ackPolicy=None since the server acknowledges messages + * automatically on delivery. + * + * The option will be converted to a boolean type. + * + * Default: false + * Group: consumer + * + * @param manualAck the value to set + * @return the dsl builder + */ + default NatsEndpointConsumerBuilder manualAck(String manualAck) { + doSetProperty("manualAck", manualAck); + return this; + } /** * Maximum number of attempts to deliver a message from Nats to a * consumer. Once MaxDeliver is reached, the NATS server stops @@ -2861,6 +2911,20 @@ public String natsStatusError() { public String natsDeliveryCounter() { return "CamelNatsDeliveryCounter"; } + /** + * The manual acknowledgment handle for JetStream messages (only set + * when manualAck=true). + * + * The option is a: {@code + * org.apache.camel.component.nats.NatsManualAck} type. + * + * Group: consumer + * + * @return the name of the header {@code NatsManualAck}. + */ + public String natsManualAck() { + return "CamelNatsManualAck"; + } } static NatsEndpointBuilder endpointBuilder(String componentName, String path) { class NatsEndpointBuilderImpl extends AbstractEndpointBuilder implements NatsEndpointBuilder, AdvancedNatsEndpointBuilder { diff --git a/test-infra/camel-test-infra-nats/src/main/java/org/apache/camel/test/infra/nats/services/NatsLocalContainerAuthService.java b/test-infra/camel-test-infra-nats/src/main/java/org/apache/camel/test/infra/nats/services/NatsLocalContainerAuthService.java index 1c8cd672dd48c..eb322a363becb 100644 --- a/test-infra/camel-test-infra-nats/src/main/java/org/apache/camel/test/infra/nats/services/NatsLocalContainerAuthService.java +++ b/test-infra/camel-test-infra-nats/src/main/java/org/apache/camel/test/infra/nats/services/NatsLocalContainerAuthService.java @@ -29,6 +29,7 @@ protected GenericContainer initContainer(String imageName, String containerName) container .waitingFor(Wait.forLogMessage(".*Server.*is.*ready.*", 1)) + .waitingFor(Wait.forListeningPort()) .withCommand("--jetstream", "-DV", "--user", USERNAME, "--pass", PASSWORD); return container; diff --git a/test-infra/camel-test-infra-nats/src/main/java/org/apache/camel/test/infra/nats/services/NatsLocalContainerAuthTokenService.java b/test-infra/camel-test-infra-nats/src/main/java/org/apache/camel/test/infra/nats/services/NatsLocalContainerAuthTokenService.java index e1d8a3bc45d09..5f804a61b3910 100644 --- a/test-infra/camel-test-infra-nats/src/main/java/org/apache/camel/test/infra/nats/services/NatsLocalContainerAuthTokenService.java +++ b/test-infra/camel-test-infra-nats/src/main/java/org/apache/camel/test/infra/nats/services/NatsLocalContainerAuthTokenService.java @@ -28,6 +28,7 @@ protected GenericContainer initContainer(String imageName, String containerName) container .waitingFor(Wait.forLogMessage(".*Server.*is.*ready.*", 1)) + .waitingFor(Wait.forListeningPort()) .withCommand("--jetstream", "-DV", "-auth", TOKEN); return container; diff --git a/test-infra/camel-test-infra-nats/src/main/java/org/apache/camel/test/infra/nats/services/NatsLocalContainerInfraService.java b/test-infra/camel-test-infra-nats/src/main/java/org/apache/camel/test/infra/nats/services/NatsLocalContainerInfraService.java index ab052d5b1540d..7fb420363b3f3 100644 --- a/test-infra/camel-test-infra-nats/src/main/java/org/apache/camel/test/infra/nats/services/NatsLocalContainerInfraService.java +++ b/test-infra/camel-test-infra-nats/src/main/java/org/apache/camel/test/infra/nats/services/NatsLocalContainerInfraService.java @@ -55,6 +55,7 @@ public NatsContainer(boolean fixedPort) { withNetworkAliases(containerName) .waitingFor(Wait.forLogMessage(".*Server.*is.*ready.*", 1)) + .waitingFor(Wait.forListeningPort()) .withCommand("--jetstream"); ContainerEnvironmentUtil.configurePort(this, fixedPort, PORT); diff --git a/test-infra/camel-test-infra-nats/src/main/java/org/apache/camel/test/infra/nats/services/NatsLocalContainerTLSAuthService.java b/test-infra/camel-test-infra-nats/src/main/java/org/apache/camel/test/infra/nats/services/NatsLocalContainerTLSAuthService.java index 4e165c874db4d..24bd5e836b892 100644 --- a/test-infra/camel-test-infra-nats/src/main/java/org/apache/camel/test/infra/nats/services/NatsLocalContainerTLSAuthService.java +++ b/test-infra/camel-test-infra-nats/src/main/java/org/apache/camel/test/infra/nats/services/NatsLocalContainerTLSAuthService.java @@ -28,6 +28,7 @@ protected GenericContainer initContainer(String imageName, String containerName) container .waitingFor(Wait.forLogMessage(".*Server.*is.*ready.*", 1)) + .waitingFor(Wait.forListeningPort()) .withClasspathResourceMapping("org/apache/camel/test/infra/nats/services", "/nats", BindMode.READ_ONLY) .withCommand("--jetstream", "--tls",