Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DATAGO-67282: NACK Support for Consumer bindings #270

Merged
merged 9 commits into from
Mar 15, 2024
1 change: 0 additions & 1 deletion .github/dependabot.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,3 @@ updates:
- dependency-name: "com.solace.*"
- dependency-name: "com.solacesystems:*"
- dependency-name: "org.springframework.*"
- dependency-name: "org.apache.logging.log4j:*"
5 changes: 5 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ For information about Solace Projects that are only for Spring Boot, please visi
* [Repository Contents](#repository-contents)
* [Building Locally](#building-locally)
* [Release Process](#release-process)
* [Samples](#solace-spring-cloud-samples)
* [Contributing](#contributing)
* [Authors](#authors)
* [License](#license)
Expand Down Expand Up @@ -102,7 +103,9 @@ Parallel test execution is disabled by default. Add the `-Djunit.jupiter.executi
```shell script
mvn -B release:prepare
```
## Solace Spring Cloud Samples

See [solace-samples-spring](https://github.com/SolaceSamples/solace-samples-spring) repository for samples.

## Contributing

Expand Down Expand Up @@ -144,11 +147,13 @@ For more information about Spring Cloud try these resources:

For more information about Solace technology for Spring Boot please visit these resources:
- [Solace Spring Boot](//github.com/SolaceProducts/solace-spring-boot)
- [Solace Spring Samples](https://github.com/SolaceSamples/solace-samples-spring)

For more information about Solace technology in general please visit these resources:

- The [Solace Developer Portal](https://solace.dev)
- Ask the [Solace community](https://solace.community/)
- Checkout the [Solace Blogs](https://solace.com/blog/?fwp_blog_search=spring%20cloud%20stream)

```
.......................HELLO FROM THE OTTER SIDE...........
Expand Down
8 changes: 4 additions & 4 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

<groupId>com.solace.spring.cloud</groupId>
<artifactId>solace-spring-cloud-build</artifactId>
<version>3.1.1-SNAPSHOT</version>
<version>4.0.0-SNAPSHOT</version>
<packaging>pom</packaging>

<name>Solace Spring Cloud Build</name>
Expand All @@ -21,7 +21,7 @@
<repoName>SolaceProducts</repoName>

<!-- This is the version of Spring Cloud we have targeted for this build -->
<spring.cloud.version>2022.0.4</spring.cloud.version>
<spring.cloud.version>2023.0.0</spring.cloud.version>

<!-- This is the version of Solace Spring Boot we have targeted for this build -->
<!-- This also dictates the expected version of Spring Boot -->
Expand All @@ -30,9 +30,9 @@

<!-- Override spring-boot version from solace-spring-boot to latest patch version -->
<!-- Remove this if the next version of solace-spring-boot works fine -->
<spring.boot.version>3.1.5</spring.boot.version>
<spring.boot.version>3.2.3</spring.boot.version>

<solace.spring.cloud.stream-starter.version>4.1.1-SNAPSHOT</solace.spring.cloud.stream-starter.version>
<solace.spring.cloud.stream-starter.version>5.0.0-SNAPSHOT</solace.spring.cloud.stream-starter.version>

<solace.integration.test.support.version>1.0.2</solace.integration.test.support.version>
<solace.integration.test.support.fetch_checkout.skip>false</solace.integration.test.support.fetch_checkout.skip>
Expand Down
4 changes: 2 additions & 2 deletions solace-spring-cloud-bom/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@
<parent>
<groupId>com.solace.spring.cloud</groupId>
<artifactId>solace-spring-cloud-build</artifactId>
<version>3.1.1-SNAPSHOT</version>
<version>4.0.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

<artifactId>solace-spring-cloud-bom</artifactId>
<packaging>pom</packaging>
<version>3.1.1-SNAPSHOT</version>
<version>4.0.0-SNAPSHOT</version>

<name>Solace Spring Cloud BOM</name>
<description>BOM for Solace Spring Cloud</description>
Expand Down
18 changes: 3 additions & 15 deletions solace-spring-cloud-parent/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<groupId>com.solace.spring.cloud</groupId>
<artifactId>solace-spring-cloud-build</artifactId>
<version>3.1.1-SNAPSHOT</version>
<version>4.0.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand All @@ -23,24 +23,12 @@
<maven.compiler.target>17</maven.compiler.target>
<maven.compiler.source>17</maven.compiler.source>

<solace.jcsmp.version>10.21.0</solace.jcsmp.version>
<solace.jms.version>10.21.0</solace.jms.version>
<solace.jcsmp.version>10.22.0</solace.jcsmp.version>
<solace.jms.version>10.22.0</solace.jms.version>
</properties>

<dependencyManagement>
<dependencies>
<!--
Workaround for CVE-2021-45046
Remove after upgrading to Spring Boot >= 2.6.2
-->
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-bom</artifactId>
<version>2.21.1</version>
<type>pom</type>
<scope>import</scope>
</dependency>

<dependency>
<!-- Import Solace Spring Boot dependency management -->
<groupId>com.solace.spring.boot</groupId>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
= Spring Cloud Stream Binder for Solace PubSub+
:revnumber: 4.1.0
:revnumber: 5.0.0
:toc: preamble
:toclevels: 3
:icons: font
:scst-version: 4.0.4
:scst-version: 4.1.x

// Github-Specific Settings
ifdef::env-github[]
Expand Down Expand Up @@ -139,6 +139,10 @@ spring:
----
<1> The latter half of this configuration where the Solace session is configured actually originates from the https://github.com/SolaceProducts/solace-spring-boot/tree/master/solace-spring-boot-starters/solace-java-spring-boot-starter#updating-your-application-properties[JCSMP Spring Boot Auto-Configuration project]. See <<Solace Session Properties>> for more info.

For more samples see https://github.com/SolaceSamples/solace-samples-spring[Solace Spring Cloud Samples] repository.

For step-by-step instructions refer https://tutorials.solace.dev/spring/spring-cloud-stream/[Solace Spring Cloud Stream tutorial] and check out the https://solace.com/blog/?fwp_blog_search=spring%20cloud%20stream[blogs].

== Configuration Options

=== Solace Binder Configuration Options
Expand Down Expand Up @@ -239,26 +243,6 @@ Only applicable when `batchMode` is `false`.
+
Default: `100`

flowPreRebindWaitTimeout::
The maximum time to wait for all unacknowledged messages to be acknowledged before a flow receiver rebind. Will wait forever if set to a value less than `0`.
+
Default: `10000`

flowRebindBackOffInitialInterval::
The initial interval (milliseconds) to back-off when rebinding a flow. +
+
Default: `1000`

flowRebindBackOffMaxInterval::
The maximum interval (milliseconds) to back-off when rebinding a flow. +
+
Default: `30000`

flowRebindBackOffMultiplier::
The multiplier to apply to the back-off interval between each rebind of a flow. +
+
Default: `1.5`

batchMaxSize::
The maximum number of messages per batch. +
Only applicable when `batchMode` is `true`.
Expand Down Expand Up @@ -904,7 +888,7 @@ public void consume(Message<?> message) {
<1> Get the message's acknowledgement callback header
<2> Disable auto-acknowledgement
<3> Acknowledge the message with the `ACCEPT` status
<4> Handle any acknowledgment exceptions (mostly `SolaceStaleMessageException`)
<4> Handle any acknowledgment exceptions

Refer to the https://docs.spring.io/spring-integration/api/org/springframework/integration/acks/AckUtils.html[AckUtils documentation] and https://javadoc.io/doc/org.springframework.integration/spring-integration-core/latest/org/springframework/integration/acks/AcknowledgmentCallback.html[AcknowledgmentCallback documentation] for more info on these objects.

Expand All @@ -921,12 +905,12 @@ For each acknowledgement status, the binder will perform the following actions:
| Acknowledge the message.

| REJECT
| If `autoBindErrorQueue` is `true`, then republish the message onto the error queue and `ACCEPT` it. Otherwise, if the consumer is in a defined consumer group, invoke `REQUEUE`. Otherwise, the consumer is in an anonymous group, and the message will be discarded.
| If `autoBindErrorQueue` is `true`, then republish the message onto the error queue and `ACCEPT` it. Otherwise, For both, the consumer in a defined consumer group or in an anonymous group, signal the Solace broker to discard/remove the message from queue.

Refer to <<Failed Consumer Message Error Handling>> for more info.

| REQUEUE
| If the consumer is in a defined consumer group, rebind the consumer flow. Otherwise, a `SolaceAcknowledgmentException` will be thrown.
| For both, the consumer in a defined consumer group or in an anonymous group, signal the Solace broker to requeue/redeliver the message. The message will be redelivered until it is `ACCEPTed` or the message’s max redelivery count is exceeded.
Copy link
Collaborator

@Nephery Nephery Mar 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The message will be redelivered until it is ACCEPTed or the message’s max redelivery count is exceeded.

I'm not sure if we should be this specific about what the broker does when we signal a REQUEUE settlement. Since how the broker handles settlements is beyond the binder's scope of responsibility.


Refer to <<Message Redelivery>> for more info.
|===
Expand All @@ -938,7 +922,7 @@ Acknowledgements may throw `SolaceAcknowledgmentException` depending on the curr
*Example:* +
(refer to <<Message Redelivery>> for background info)

A `SolaceAcknowledgmentException` with cause `SolaceStaleMessageException` may be thrown when trying to asynchronously `ACCEPT` a stale message after the timeout elapses for the `REQUEUE` of another message. Though for this particular example, since the message that failed to `ACCEPT` will be redelivered, this exception can be caught and ignored if you have no business logic to revert.
A `SolaceAcknowledgmentException` with cause `IllegalStateException` may be thrown when trying to asynchronously `ACCEPT` a message and consumer flow is closed. Though for this particular example, since the message that failed to `ACCEPT` will be redelivered, this exception can be caught and ignored if you have no business logic to revert.
====

NOTE: Manual acknowledgements do not support any application-internal error handling strategies (i.e. retry template, error channel forwarding, etc). Also, throwing an exception in the message handler will always acknowledge the message in some way regardless if auto-acknowledgment is disabled.
Expand Down Expand Up @@ -976,7 +960,7 @@ NOTE: Those 2 headers are cleared from the message before it is sent off to the

== Failed Consumer Message Error Handling

The Spring cloud stream framework already provides a number of application-internal reprocessing strategies for failed messages during message consumption such as. You can read more about that https://docs.spring.io/spring-cloud-stream/docs/{scst-version}/reference/html/spring-cloud-stream.html#spring-cloud-stream-overview-error-handling[here]:
The Spring cloud stream framework already provides a number of application-internal reprocessing strategies for failed messages during message consumption. You can read more about that https://docs.spring.io/spring-cloud-stream/docs/{scst-version}/reference/html/spring-cloud-stream.html#spring-cloud-stream-overview-error-handling[here]:

However, after all internal error handling strategies have been exhausted, the Solace implementation of the binder would either:

Expand All @@ -989,15 +973,15 @@ A simple error handling strategy in which failed messages are redelivered from t

[IMPORTANT]
====
The Solace API used in this binder implementation does not support individual message redelivery.
The internal implementation of redelivery has changed from Solace Binder v5.0.0.
Previously, redelivery was initiated by rebinding consumer flows; however, as of v5.0.0 and later, the Solace API now leverages the Solace broker's native NACK (Negative Acknowledgement) capabilities.

Here is what happens under the hood when this is triggered:

1. The Solace flow receiver is stopped.
2. Wait until all unacknowledged messages have been acknowledged with a maximum timeout of `flowPreRebindWaitTimeout`. If timed out, the remaining unacknowledged messages will be stale and redelivered from the broker.
3. Rebind the flow.
1. Say the current message is marked for 'REQUEUE'. Any subsequent messages that are currently spooled on the client side, despite having been acknowledged `ACCEPTed` by binder, the Solace broker will discard their ACK.
2. The Solace Broker will redeliver all messages starting with the one tagged as 'REQUEUE', if the message's max redelivery count is not exceeded.
Comment on lines +981 to +982
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same question as the other PS+ broker question: Should we be this specific about what the broker does when handling settlements?

IMO I think it might be better to have a short description about this and link to the official docs that describes these details (assuming it exists).


Meaning that if unacknowledged messages are not processed in a timely manner, this operation will stall and potentially cause unecessary message duplication.
The redelivery may result in message duplication, and the application should be designed to handle this.
====

=== Error Queue Republishing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@
<parent>
<groupId>com.solace.spring.cloud</groupId>
<artifactId>solace-spring-cloud-parent</artifactId>
<version>3.1.1-SNAPSHOT</version>
<version>4.0.0-SNAPSHOT</version>
<relativePath>../../solace-spring-cloud-parent/pom.xml</relativePath>
</parent>

<artifactId>spring-cloud-starter-stream-solace</artifactId>
<version>4.1.1-SNAPSHOT</version>
<version>5.0.0-SNAPSHOT</version>
<packaging>jar</packaging>


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@
<parent>
<groupId>com.solace.spring.cloud</groupId>
<artifactId>solace-spring-cloud-parent</artifactId>
<version>3.1.1-SNAPSHOT</version>
<version>4.0.0-SNAPSHOT</version>
<relativePath>../../solace-spring-cloud-parent/pom.xml</relativePath>
</parent>

<artifactId>spring-cloud-stream-binder-solace-core</artifactId>
<version>4.1.1-SNAPSHOT</version>
<version>5.0.0-SNAPSHOT</version>
<packaging>jar</packaging>

<name>Solace Spring Cloud Stream Binder Core</name>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.solace.spring.cloud.stream.binder.inbound;

import com.solace.spring.cloud.stream.binder.inbound.acknowledge.JCSMPAcknowledgementCallbackFactory;
import com.solace.spring.cloud.stream.binder.inbound.acknowledge.SolaceAckUtil;
import com.solace.spring.cloud.stream.binder.meter.SolaceMeterAccessor;
import com.solace.spring.cloud.stream.binder.properties.SolaceConsumerProperties;
import com.solace.spring.cloud.stream.binder.util.FlowReceiverContainer;
Expand Down Expand Up @@ -65,7 +66,9 @@ void handleMessage(Supplier<Message<?>> messageSupplier, Consumer<Message<?>> se
logger.warn(String.format("Failed to map %s to a Spring Message and no error channel " +
"was configured. Message will be rejected.", isBatched ? "a batch of XMLMessages" :
"an XMLMessage"), e);
AckUtils.reject(acknowledgmentCallback);
if (!SolaceAckUtil.republishToErrorQueue(acknowledgmentCallback)) {
AckUtils.requeue(acknowledgmentCallback);
}
}
return;
}
Expand Down
Loading
Loading