Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
132 changes: 39 additions & 93 deletions docs/src/main/asciidoc/sqs.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -1348,9 +1348,8 @@ public SqsMessageListenerContainerFactory<Object> defaultSqsListenerContainerFac
----

==== Exponential Backoff Error Handler
This error handler implements an exponential backoff strategy for retrying failed SQS message processing.

The backoff duration is computed using the `ApproximateReceiveCount` message attribute, applying an exponential function to determine the delay. Once calculated, the handler sets the message's visibility timeout to the computed backoff value, postponing the message's reprocessing accordingly.
Implements exponential backoff for retrying failed SQS message processing. The base backoff is computed from `ApproximateReceiveCount` as
`initialVisibilityTimeoutSeconds * multiplier^(receiveCount - 1)`, capped by `maxVisibilityTimeoutSeconds`.

[cols="2,3,1,1"]
|===
Expand All @@ -1362,7 +1361,7 @@ The backoff duration is computed using the `ApproximateReceiveCount` message att

NOTE: The maximum visibility timeout allowed by SQS is 43200 seconds (12 hours). If the value provided to the `maxVisibilityTimeoutSeconds` parameter exceeds this limit, an `IllegalArgumentException` will be thrown.

When using auto-configured factory, simply declare a `@Bean` and the error handler will be set
When using the auto-configured factory, a single `@Bean` can be declared and the error handler will be set:

[source, java]
----
Expand All @@ -1377,32 +1376,23 @@ public ExponentialBackoffErrorHandler<Object> asyncErrorHandler() {
}
----

Alternatively, `ExponentialBackoffErrorHandler` can be set in the `MessageListenerContainerFactory` or directly in the `MessageListenerContainer`:
Alternatively, set it via `SqsMessageListenerContainerFactory#errorHandler(...)` or directly on a container if configuring programmatically.

[source, java]
----
@Bean
public SqsMessageListenerContainerFactory<Object> defaultSqsListenerContainerFactory() {
return SqsMessageListenerContainerFactory
.builder()
.sqsAsyncClientSupplier(BaseSqsIntegrationTest::createAsyncClient)
.errorHandler(ExponentialBackoffErrorHandler
.builder()
.initialVisibilityTimeoutSeconds(1)
.multiplier(2)
.maxVisibilityTimeoutSeconds(10)
.build())
.build();
}
----
===== Jitter
Jitter randomizes the final visibility timeout derived from the exponential calculation to avoid synchronized retries ("thundering herd"). It can be configured on the builder via `jitter(...)`.

`ExponentialBackoffErrorHandler` supports the following strategies:

- `Jitter.NONE` (default): no randomization; uses the exact exponential timeout.
- `Jitter.FULL`: picks a random value uniformly in [0, timeout].
- `Jitter.HALF`: picks a random value uniformly in [ceil(timeout/2), timeout].

==== Exponential Backoff Full Jitter Error Handler
This error handler applies an exponential backoff strategy with *full jitter*
when retrying failed message processing. After calculating the exponential
visibility timeout using the `ApproximateReceiveCount` message attribute, a
random value between zero and the computed timeout is selected. The selected
value becomes the new visibility timeout, spreading retries and helping to
avoid spikes caused by synchronized retries.
The random source can be overridden with `randomSupplier(...)`.

====== Full Jitter
Applies exponential backoff with *full jitter* when retrying failed message processing. After calculating the exponential
visibility timeout using the `ApproximateReceiveCount` message attribute, a random value between zero and the computed timeout is selected.
The selected value becomes the new visibility timeout, spreading retries and helping to avoid spikes caused by synchronized retries.

[cols="2,3,1,1"]
|===
Expand All @@ -1421,47 +1411,24 @@ NOTE: The maximum visibility timeout allowed by SQS is 43200 seconds (12 hours).
If the value provided to the `maxVisibilityTimeoutSeconds` parameter exceeds
this limit, an `IllegalArgumentException` will be thrown.

When using auto-configured factory, simply declare a `@Bean` and the error
handler will be set
To enable full jitter, `jitter(Jitter.FULL)` should be set on the builder:

[source, java]
----
@Bean
public ExponentialBackoffErrorHandlerWithFullJitter<Object> asyncErrorHandler() {
return ExponentialBackoffErrorHandlerWithFullJitter
.builder()
.initialVisibilityTimeoutSeconds(1)
.multiplier(2)
.maxVisibilityTimeoutSeconds(10)
.build();
}
ExponentialBackoffErrorHandler<Object> handler = ExponentialBackoffErrorHandler
.builder()
.initialVisibilityTimeoutSeconds(1)
.multiplier(2)
.maxVisibilityTimeoutSeconds(10)
.jitter(Jitter.FULL)
.build();
----

Alternatively, `ExponentialBackoffErrorHandlerWithFullJitter` can be set in the
`MessageListenerContainerFactory` or directly in the `MessageListenerContainer`:

[source, java]
----
@Bean
public SqsMessageListenerContainerFactory<Object> defaultSqsListenerContainerFactory() {
return SqsMessageListenerContainerFactory
.builder()
.sqsAsyncClientSupplier(BaseSqsIntegrationTest::createAsyncClient)
.errorHandler(ExponentialBackoffErrorHandlerWithFullJitter
.builder()
.initialVisibilityTimeoutSeconds(1)
.multiplier(2)
.maxVisibilityTimeoutSeconds(10)
.build())
.build();
}
----
The handler can then be passed to a factory or container as needed.

==== Exponential Backoff Half Jitter Error Handler
This variant also computes the visibility timeout exponentially but applies
*half jitter*. The exponential delay is halved and a random value between zero
and this half is added to it. The resulting timeout is then used to change the
message visibility.
====== Half Jitter
Computes the visibility timeout exponentially and applies *half jitter*. A random value is selected uniformly between ceil(timeout/2) and timeout.
The selected value becomes the new visibility timeout, providing moderate randomization while maintaining a reasonable minimum delay.

[cols="2,3,1,1"]
|===
Expand All @@ -1480,41 +1447,20 @@ NOTE: The maximum visibility timeout allowed by SQS is 43200 seconds (12 hours).
If the value provided to the `maxVisibilityTimeoutSeconds` parameter exceeds
this limit, an `IllegalArgumentException` will be thrown.

When using auto-configured factory, simply declare a `@Bean` and the error
handler will be set
To enable half jitter, `jitter(Jitter.HALF)` on the builder:

[source, java]
----
@Bean
public ExponentialBackoffErrorHandlerWithHalfJitter<Object> asyncErrorHandler() {
return ExponentialBackoffErrorHandlerWithHalfJitter
.builder()
.initialVisibilityTimeoutSeconds(1)
.multiplier(2)
.maxVisibilityTimeoutSeconds(10)
.build();
}
ExponentialBackoffErrorHandler<Object> handler = ExponentialBackoffErrorHandler
.builder()
.initialVisibilityTimeoutSeconds(1)
.multiplier(2)
.maxVisibilityTimeoutSeconds(10)
.jitter(Jitter.HALF)
.build();
----

Alternatively, `ExponentialBackoffErrorHandlerWithHalfJitter` can be set in the
`MessageListenerContainerFactory` or directly in the `MessageListenerContainer`:

[source, java]
----
@Bean
public SqsMessageListenerContainerFactory<Object> defaultSqsListenerContainerFactory() {
return SqsMessageListenerContainerFactory
.builder()
.sqsAsyncClientSupplier(BaseSqsIntegrationTest::createAsyncClient)
.errorHandler(ExponentialBackoffErrorHandlerWithHalfJitter
.builder()
.initialVisibilityTimeoutSeconds(1)
.multiplier(2)
.maxVisibilityTimeoutSeconds(10)
.build())
.build();
}
----
The handler can then be passed to a factory or container as needed.

==== Linear Backoff Error Handler
`LinearBackoffErrorHandler` increases the visibility timeout linearly whenever a
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@
import io.awspring.cloud.sqs.listener.BatchVisibility;
import io.awspring.cloud.sqs.listener.Visibility;
import java.util.Collection;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.messaging.Message;
Expand All @@ -38,6 +41,8 @@
*
* @author Bruno Garcia
* @author Rafael Pavarini
* @author Tomaz Fernandes
*
*/

public class ExponentialBackoffErrorHandler<T> implements AsyncErrorHandler<T> {
Expand All @@ -46,12 +51,16 @@ public class ExponentialBackoffErrorHandler<T> implements AsyncErrorHandler<T> {
private final int initialVisibilityTimeoutSeconds;
private final double multiplier;
private final int maxVisibilityTimeoutSeconds;
private final Supplier<Random> randomSupplier;
private final Jitter jitter;

private ExponentialBackoffErrorHandler(int initialVisibilityTimeoutSeconds, double multiplier,
int maxVisibilityTimeoutSeconds) {
int maxVisibilityTimeoutSeconds, Supplier<Random> randomSupplier, Jitter jitter) {
this.initialVisibilityTimeoutSeconds = initialVisibilityTimeoutSeconds;
this.multiplier = multiplier;
this.maxVisibilityTimeoutSeconds = maxVisibilityTimeoutSeconds;
this.randomSupplier = randomSupplier;
this.jitter = jitter;
}

@Override
Expand Down Expand Up @@ -104,8 +113,12 @@ private int calculateTimeout(Message<T> message) {
}

private int calculateTimeout(long receiveMessageCount) {
return ErrorHandlerVisibilityHelper.calculateVisibilityTimeoutExponentially(receiveMessageCount,
int timeout = ErrorHandlerVisibilityHelper.calculateVisibilityTimeoutExponentially(receiveMessageCount,
initialVisibilityTimeoutSeconds, multiplier, maxVisibilityTimeoutSeconds);
int jitteredTimeout = jitter.applyJitter(new Jitter.Context(timeout, randomSupplier));
logger.debug("Exponential backoff jitter applied: original={}, jittered={}, receiveCount={}, jitterType={}",
timeout, jitteredTimeout, receiveMessageCount, jitter.getClass().getSimpleName());
return jitteredTimeout;
}

public static <T> Builder<T> builder() {
Expand All @@ -117,6 +130,8 @@ public static class Builder<T> {
private int initialVisibilityTimeoutSeconds = BackoffVisibilityConstants.DEFAULT_INITIAL_VISIBILITY_TIMEOUT_SECONDS;
private double multiplier = BackoffVisibilityConstants.DEFAULT_MULTIPLIER;
private int maxVisibilityTimeoutSeconds = BackoffVisibilityConstants.DEFAULT_MAX_VISIBILITY_TIMEOUT_SECONDS;
private Supplier<Random> randomSupplier = ThreadLocalRandom::current;
private Jitter jitter = Jitter.NONE;

public Builder<T> initialVisibilityTimeoutSeconds(int initialVisibilityTimeoutSeconds) {
ErrorHandlerVisibilityHelper.checkVisibilityTimeout(initialVisibilityTimeoutSeconds);
Expand All @@ -137,11 +152,23 @@ public Builder<T> maxVisibilityTimeoutSeconds(int maxVisibilityTimeoutSeconds) {
return this;
}

public Builder<T> randomSupplier(Supplier<Random> randomSupplier) {
Assert.notNull(randomSupplier, "randomSupplier cannot be null");
this.randomSupplier = randomSupplier;
return this;
}

public Builder<T> jitter(Jitter jitter) {
Assert.notNull(jitter, "jitter cannot be null");
this.jitter = jitter;
return this;
}

public ExponentialBackoffErrorHandler<T> build() {
Assert.isTrue(initialVisibilityTimeoutSeconds <= maxVisibilityTimeoutSeconds,
"Initial visibility timeout must not exceed max visibility timeout");
return new ExponentialBackoffErrorHandler<>(initialVisibilityTimeoutSeconds, multiplier,
maxVisibilityTimeoutSeconds);
maxVisibilityTimeoutSeconds, randomSupplier, jitter);
}
}
}
Loading