From 08444190d2997ddd1e43e5148a4f5379dc4441d7 Mon Sep 17 00:00:00 2001 From: Tomaz Fernandes Date: Wed, 17 Sep 2025 14:36:02 -0300 Subject: [PATCH] Consolidate ExponentialErrorHandlers * Introduce unified Jitter API (NONE, FULL, HALF) with builder `.jitter(Jitter)` * HALF jitter now samples uniformly in [ceil(timeout/2), timeout]; FULL/NONE unchanged * Centralize logic in ExponentialBackoffErrorHandler * Update docs and tests (unit + integration) --- docs/src/main/asciidoc/sqs.adoc | 132 +++++--------- .../ExponentialBackoffErrorHandler.java | 33 +++- ...tialBackoffErrorHandlerWithFullJitter.java | 162 ----------------- ...tialBackoffErrorHandlerWithHalfJitter.java | 164 ------------------ .../sqs/listener/errorhandler/Jitter.java | 90 ++++++++++ .../SqsErrorHandlerIntegrationTests.java | 16 +- ...onentialBackoffErrorHandlerJitterTest.java | 26 +-- ...onentialBackoffErrorHandlerJitterTest.java | 30 +--- 8 files changed, 187 insertions(+), 466 deletions(-) delete mode 100644 spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/errorhandler/ExponentialBackoffErrorHandlerWithFullJitter.java delete mode 100644 spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/errorhandler/ExponentialBackoffErrorHandlerWithHalfJitter.java create mode 100644 spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/errorhandler/Jitter.java diff --git a/docs/src/main/asciidoc/sqs.adoc b/docs/src/main/asciidoc/sqs.adoc index e045d0911..41c52dee6 100644 --- a/docs/src/main/asciidoc/sqs.adoc +++ b/docs/src/main/asciidoc/sqs.adoc @@ -1348,9 +1348,8 @@ public SqsMessageListenerContainerFactory defaultSqsListenerContainerFac ---- ==== Exponential Backoff Error Handler -This error handler implements an exponential backoff strategy for retrying failed SQS message processing. - -The backoff duration is computed using the `ApproximateReceiveCount` message attribute, applying an exponential function to determine the delay. Once calculated, the handler sets the message's visibility timeout to the computed backoff value, postponing the message's reprocessing accordingly. +Implements exponential backoff for retrying failed SQS message processing. The base backoff is computed from `ApproximateReceiveCount` as +`initialVisibilityTimeoutSeconds * multiplier^(receiveCount - 1)`, capped by `maxVisibilityTimeoutSeconds`. [cols="2,3,1,1"] |=== @@ -1362,7 +1361,7 @@ The backoff duration is computed using the `ApproximateReceiveCount` message att NOTE: The maximum visibility timeout allowed by SQS is 43200 seconds (12 hours). If the value provided to the `maxVisibilityTimeoutSeconds` parameter exceeds this limit, an `IllegalArgumentException` will be thrown. -When using auto-configured factory, simply declare a `@Bean` and the error handler will be set +When using the auto-configured factory, a single `@Bean` can be declared and the error handler will be set: [source, java] ---- @@ -1377,32 +1376,23 @@ public ExponentialBackoffErrorHandler asyncErrorHandler() { } ---- -Alternatively, `ExponentialBackoffErrorHandler` can be set in the `MessageListenerContainerFactory` or directly in the `MessageListenerContainer`: +Alternatively, set it via `SqsMessageListenerContainerFactory#errorHandler(...)` or directly on a container if configuring programmatically. -[source, java] ----- -@Bean -public SqsMessageListenerContainerFactory defaultSqsListenerContainerFactory() { - return SqsMessageListenerContainerFactory - .builder() - .sqsAsyncClientSupplier(BaseSqsIntegrationTest::createAsyncClient) - .errorHandler(ExponentialBackoffErrorHandler - .builder() - .initialVisibilityTimeoutSeconds(1) - .multiplier(2) - .maxVisibilityTimeoutSeconds(10) - .build()) - .build(); -} ----- +===== Jitter +Jitter randomizes the final visibility timeout derived from the exponential calculation to avoid synchronized retries ("thundering herd"). It can be configured on the builder via `jitter(...)`. + +`ExponentialBackoffErrorHandler` supports the following strategies: + +- `Jitter.NONE` (default): no randomization; uses the exact exponential timeout. +- `Jitter.FULL`: picks a random value uniformly in [0, timeout]. +- `Jitter.HALF`: picks a random value uniformly in [ceil(timeout/2), timeout]. -==== Exponential Backoff Full Jitter Error Handler -This error handler applies an exponential backoff strategy with *full jitter* -when retrying failed message processing. After calculating the exponential -visibility timeout using the `ApproximateReceiveCount` message attribute, a -random value between zero and the computed timeout is selected. The selected -value becomes the new visibility timeout, spreading retries and helping to -avoid spikes caused by synchronized retries. +The random source can be overridden with `randomSupplier(...)`. + +====== Full Jitter +Applies exponential backoff with *full jitter* when retrying failed message processing. After calculating the exponential +visibility timeout using the `ApproximateReceiveCount` message attribute, a random value between zero and the computed timeout is selected. +The selected value becomes the new visibility timeout, spreading retries and helping to avoid spikes caused by synchronized retries. [cols="2,3,1,1"] |=== @@ -1421,47 +1411,24 @@ NOTE: The maximum visibility timeout allowed by SQS is 43200 seconds (12 hours). If the value provided to the `maxVisibilityTimeoutSeconds` parameter exceeds this limit, an `IllegalArgumentException` will be thrown. -When using auto-configured factory, simply declare a `@Bean` and the error -handler will be set +To enable full jitter, `jitter(Jitter.FULL)` should be set on the builder: [source, java] ---- -@Bean -public ExponentialBackoffErrorHandlerWithFullJitter asyncErrorHandler() { - return ExponentialBackoffErrorHandlerWithFullJitter - .builder() - .initialVisibilityTimeoutSeconds(1) - .multiplier(2) - .maxVisibilityTimeoutSeconds(10) - .build(); -} +ExponentialBackoffErrorHandler handler = ExponentialBackoffErrorHandler + .builder() + .initialVisibilityTimeoutSeconds(1) + .multiplier(2) + .maxVisibilityTimeoutSeconds(10) + .jitter(Jitter.FULL) + .build(); ---- -Alternatively, `ExponentialBackoffErrorHandlerWithFullJitter` can be set in the -`MessageListenerContainerFactory` or directly in the `MessageListenerContainer`: - -[source, java] ----- -@Bean -public SqsMessageListenerContainerFactory defaultSqsListenerContainerFactory() { - return SqsMessageListenerContainerFactory - .builder() - .sqsAsyncClientSupplier(BaseSqsIntegrationTest::createAsyncClient) - .errorHandler(ExponentialBackoffErrorHandlerWithFullJitter - .builder() - .initialVisibilityTimeoutSeconds(1) - .multiplier(2) - .maxVisibilityTimeoutSeconds(10) - .build()) - .build(); -} ----- +The handler can then be passed to a factory or container as needed. -==== Exponential Backoff Half Jitter Error Handler -This variant also computes the visibility timeout exponentially but applies -*half jitter*. The exponential delay is halved and a random value between zero -and this half is added to it. The resulting timeout is then used to change the -message visibility. +====== Half Jitter +Computes the visibility timeout exponentially and applies *half jitter*. A random value is selected uniformly between ceil(timeout/2) and timeout. +The selected value becomes the new visibility timeout, providing moderate randomization while maintaining a reasonable minimum delay. [cols="2,3,1,1"] |=== @@ -1480,41 +1447,20 @@ NOTE: The maximum visibility timeout allowed by SQS is 43200 seconds (12 hours). If the value provided to the `maxVisibilityTimeoutSeconds` parameter exceeds this limit, an `IllegalArgumentException` will be thrown. -When using auto-configured factory, simply declare a `@Bean` and the error -handler will be set +To enable half jitter, `jitter(Jitter.HALF)` on the builder: [source, java] ---- -@Bean -public ExponentialBackoffErrorHandlerWithHalfJitter asyncErrorHandler() { - return ExponentialBackoffErrorHandlerWithHalfJitter - .builder() - .initialVisibilityTimeoutSeconds(1) - .multiplier(2) - .maxVisibilityTimeoutSeconds(10) - .build(); -} +ExponentialBackoffErrorHandler handler = ExponentialBackoffErrorHandler + .builder() + .initialVisibilityTimeoutSeconds(1) + .multiplier(2) + .maxVisibilityTimeoutSeconds(10) + .jitter(Jitter.HALF) + .build(); ---- -Alternatively, `ExponentialBackoffErrorHandlerWithHalfJitter` can be set in the -`MessageListenerContainerFactory` or directly in the `MessageListenerContainer`: - -[source, java] ----- -@Bean -public SqsMessageListenerContainerFactory defaultSqsListenerContainerFactory() { - return SqsMessageListenerContainerFactory - .builder() - .sqsAsyncClientSupplier(BaseSqsIntegrationTest::createAsyncClient) - .errorHandler(ExponentialBackoffErrorHandlerWithHalfJitter - .builder() - .initialVisibilityTimeoutSeconds(1) - .multiplier(2) - .maxVisibilityTimeoutSeconds(10) - .build()) - .build(); -} ----- +The handler can then be passed to a factory or container as needed. ==== Linear Backoff Error Handler `LinearBackoffErrorHandler` increases the visibility timeout linearly whenever a diff --git a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/errorhandler/ExponentialBackoffErrorHandler.java b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/errorhandler/ExponentialBackoffErrorHandler.java index b3f022ccc..ed4e2ca54 100644 --- a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/errorhandler/ExponentialBackoffErrorHandler.java +++ b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/errorhandler/ExponentialBackoffErrorHandler.java @@ -19,7 +19,10 @@ import io.awspring.cloud.sqs.listener.BatchVisibility; import io.awspring.cloud.sqs.listener.Visibility; import java.util.Collection; +import java.util.Random; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ThreadLocalRandom; +import java.util.function.Supplier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.messaging.Message; @@ -38,6 +41,8 @@ * * @author Bruno Garcia * @author Rafael Pavarini + * @author Tomaz Fernandes + * */ public class ExponentialBackoffErrorHandler implements AsyncErrorHandler { @@ -46,12 +51,16 @@ public class ExponentialBackoffErrorHandler implements AsyncErrorHandler { private final int initialVisibilityTimeoutSeconds; private final double multiplier; private final int maxVisibilityTimeoutSeconds; + private final Supplier randomSupplier; + private final Jitter jitter; private ExponentialBackoffErrorHandler(int initialVisibilityTimeoutSeconds, double multiplier, - int maxVisibilityTimeoutSeconds) { + int maxVisibilityTimeoutSeconds, Supplier randomSupplier, Jitter jitter) { this.initialVisibilityTimeoutSeconds = initialVisibilityTimeoutSeconds; this.multiplier = multiplier; this.maxVisibilityTimeoutSeconds = maxVisibilityTimeoutSeconds; + this.randomSupplier = randomSupplier; + this.jitter = jitter; } @Override @@ -104,8 +113,12 @@ private int calculateTimeout(Message message) { } private int calculateTimeout(long receiveMessageCount) { - return ErrorHandlerVisibilityHelper.calculateVisibilityTimeoutExponentially(receiveMessageCount, + int timeout = ErrorHandlerVisibilityHelper.calculateVisibilityTimeoutExponentially(receiveMessageCount, initialVisibilityTimeoutSeconds, multiplier, maxVisibilityTimeoutSeconds); + int jitteredTimeout = jitter.applyJitter(new Jitter.Context(timeout, randomSupplier)); + logger.debug("Exponential backoff jitter applied: original={}, jittered={}, receiveCount={}, jitterType={}", + timeout, jitteredTimeout, receiveMessageCount, jitter.getClass().getSimpleName()); + return jitteredTimeout; } public static Builder builder() { @@ -117,6 +130,8 @@ public static class Builder { private int initialVisibilityTimeoutSeconds = BackoffVisibilityConstants.DEFAULT_INITIAL_VISIBILITY_TIMEOUT_SECONDS; private double multiplier = BackoffVisibilityConstants.DEFAULT_MULTIPLIER; private int maxVisibilityTimeoutSeconds = BackoffVisibilityConstants.DEFAULT_MAX_VISIBILITY_TIMEOUT_SECONDS; + private Supplier randomSupplier = ThreadLocalRandom::current; + private Jitter jitter = Jitter.NONE; public Builder initialVisibilityTimeoutSeconds(int initialVisibilityTimeoutSeconds) { ErrorHandlerVisibilityHelper.checkVisibilityTimeout(initialVisibilityTimeoutSeconds); @@ -137,11 +152,23 @@ public Builder maxVisibilityTimeoutSeconds(int maxVisibilityTimeoutSeconds) { return this; } + public Builder randomSupplier(Supplier randomSupplier) { + Assert.notNull(randomSupplier, "randomSupplier cannot be null"); + this.randomSupplier = randomSupplier; + return this; + } + + public Builder jitter(Jitter jitter) { + Assert.notNull(jitter, "jitter cannot be null"); + this.jitter = jitter; + return this; + } + public ExponentialBackoffErrorHandler build() { Assert.isTrue(initialVisibilityTimeoutSeconds <= maxVisibilityTimeoutSeconds, "Initial visibility timeout must not exceed max visibility timeout"); return new ExponentialBackoffErrorHandler<>(initialVisibilityTimeoutSeconds, multiplier, - maxVisibilityTimeoutSeconds); + maxVisibilityTimeoutSeconds, randomSupplier, jitter); } } } diff --git a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/errorhandler/ExponentialBackoffErrorHandlerWithFullJitter.java b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/errorhandler/ExponentialBackoffErrorHandlerWithFullJitter.java deleted file mode 100644 index 9a23b79d6..000000000 --- a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/errorhandler/ExponentialBackoffErrorHandlerWithFullJitter.java +++ /dev/null @@ -1,162 +0,0 @@ -/* - * Copyright 2013-2025 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.awspring.cloud.sqs.listener.errorhandler; - -import io.awspring.cloud.sqs.MessageHeaderUtils; -import io.awspring.cloud.sqs.listener.BatchVisibility; -import io.awspring.cloud.sqs.listener.Visibility; -import java.util.Collection; -import java.util.Random; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ThreadLocalRandom; -import java.util.function.Supplier; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.messaging.Message; -import org.springframework.util.Assert; - -/** - * An implementation of an Exponential Backoff with full jitter error handler for asynchronous message processing. - * - *

- * This error handler sets the SQS message visibility timeout exponentially based on the number of received attempts - * whenever an exception occurs. - * - *

- * When AcknowledgementMode is set to ON_SUCCESS (the default), returning a failed future prevents the message from - * being acknowledged. - * - * @author Bruno Garcia - * @author Rafael Pavarini - */ -public class ExponentialBackoffErrorHandlerWithFullJitter implements AsyncErrorHandler { - private static final Logger logger = LoggerFactory.getLogger(ExponentialBackoffErrorHandlerWithFullJitter.class); - - private final int initialVisibilityTimeoutSeconds; - private final double multiplier; - private final int maxVisibilityTimeoutSeconds; - private final Supplier randomSupplier; - - private ExponentialBackoffErrorHandlerWithFullJitter(int initialVisibilityTimeoutSeconds, double multiplier, - int maxVisibilityTimeoutSeconds, Supplier randomSupplier) { - this.initialVisibilityTimeoutSeconds = initialVisibilityTimeoutSeconds; - this.multiplier = multiplier; - this.maxVisibilityTimeoutSeconds = maxVisibilityTimeoutSeconds; - this.randomSupplier = randomSupplier; - } - - @Override - public CompletableFuture handle(Message message, Throwable t) { - return applyExponentialBackoffVisibilityTimeoutWithFullJitter(message) - .thenCompose(theVoid -> CompletableFuture.failedFuture(t)); - } - - @Override - public CompletableFuture handle(Collection> messages, Throwable t) { - return applyExponentialBackoffVisibilityTimeoutWithFullJitter(messages) - .thenCompose(theVoid -> CompletableFuture.failedFuture(t)); - } - - private CompletableFuture applyExponentialBackoffVisibilityTimeoutWithFullJitter( - Collection> messages) { - CompletableFuture[] futures = ErrorHandlerVisibilityHelper.groupMessagesByReceiveMessageCount(messages) - .entrySet().stream().map(entry -> { - int timeout = calculateTimeout(entry.getKey()); - return applyBatchVisibilityChange(entry.getValue(), timeout); - }).toArray(CompletableFuture[]::new); - - return CompletableFuture.allOf(futures); - } - - private CompletableFuture applyBatchVisibilityChange(Collection> messages, int timeout) { - logger.debug("Changing batch visibility timeout to {} - Messages Id {}", timeout, - MessageHeaderUtils.getId(messages)); - BatchVisibility visibility = ErrorHandlerVisibilityHelper.getVisibility(messages); - return visibility.changeToAsync(timeout).exceptionallyCompose(throwable -> { - logger.warn("Failed to change batch visibility timeout to {} - Messages Id {}", timeout, - MessageHeaderUtils.getId(messages), throwable); - return CompletableFuture.failedFuture(throwable); - }); - } - - private CompletableFuture applyExponentialBackoffVisibilityTimeoutWithFullJitter(Message message) { - int timeout = calculateTimeout(message); - Visibility visibility = ErrorHandlerVisibilityHelper.getVisibility(message); - logger.debug("Changing visibility timeout to {} - Message Id {}", timeout, message.getHeaders().getId()); - return visibility.changeToAsync(timeout).exceptionallyCompose(throwable -> { - logger.warn("Failed to change visibility timeout to {} - Message Id {}", timeout, - message.getHeaders().getId(), throwable); - return CompletableFuture.failedFuture(throwable); - }); - } - - private int calculateTimeout(Message message) { - long receiveMessageCount = ErrorHandlerVisibilityHelper.getReceiveMessageCount(message); - return calculateTimeout(receiveMessageCount); - } - - private int calculateTimeout(long receiveMessageCount) { - int timeout = ErrorHandlerVisibilityHelper.calculateVisibilityTimeoutExponentially(receiveMessageCount, - initialVisibilityTimeoutSeconds, multiplier, maxVisibilityTimeoutSeconds); - return randomSupplier.get().nextInt(timeout + 1); - } - - public static ExponentialBackoffErrorHandlerWithFullJitter.Builder builder() { - return new ExponentialBackoffErrorHandlerWithFullJitter.Builder<>(); - } - - public static class Builder { - - private int initialVisibilityTimeoutSeconds = BackoffVisibilityConstants.DEFAULT_INITIAL_VISIBILITY_TIMEOUT_SECONDS; - private double multiplier = BackoffVisibilityConstants.DEFAULT_MULTIPLIER; - private int maxVisibilityTimeoutSeconds = BackoffVisibilityConstants.DEFAULT_MAX_VISIBILITY_TIMEOUT_SECONDS; - private Supplier randomSupplier = ThreadLocalRandom::current; - - public ExponentialBackoffErrorHandlerWithFullJitter.Builder initialVisibilityTimeoutSeconds( - int initialVisibilityTimeoutSeconds) { - ErrorHandlerVisibilityHelper.checkVisibilityTimeout(initialVisibilityTimeoutSeconds); - this.initialVisibilityTimeoutSeconds = initialVisibilityTimeoutSeconds; - return this; - } - - public ExponentialBackoffErrorHandlerWithFullJitter.Builder multiplier(double multiplier) { - Assert.isTrue(multiplier >= 1, - () -> "Invalid multiplier '" + multiplier + "'. Should be greater than " + "or equal to 1."); - this.multiplier = multiplier; - return this; - } - - public ExponentialBackoffErrorHandlerWithFullJitter.Builder maxVisibilityTimeoutSeconds( - int maxVisibilityTimeoutSeconds) { - ErrorHandlerVisibilityHelper.checkVisibilityTimeout(maxVisibilityTimeoutSeconds); - this.maxVisibilityTimeoutSeconds = maxVisibilityTimeoutSeconds; - return this; - } - - public ExponentialBackoffErrorHandlerWithFullJitter.Builder randomSupplier(Supplier randomSupplier) { - Assert.notNull(randomSupplier, "randomSupplier cannot be null"); - this.randomSupplier = randomSupplier; - return this; - } - - public ExponentialBackoffErrorHandlerWithFullJitter build() { - Assert.isTrue(initialVisibilityTimeoutSeconds <= maxVisibilityTimeoutSeconds, - "Initial visibility timeout must not exceed max visibility timeout"); - return new ExponentialBackoffErrorHandlerWithFullJitter<>(initialVisibilityTimeoutSeconds, multiplier, - maxVisibilityTimeoutSeconds, randomSupplier); - } - } -} diff --git a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/errorhandler/ExponentialBackoffErrorHandlerWithHalfJitter.java b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/errorhandler/ExponentialBackoffErrorHandlerWithHalfJitter.java deleted file mode 100644 index d24721611..000000000 --- a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/errorhandler/ExponentialBackoffErrorHandlerWithHalfJitter.java +++ /dev/null @@ -1,164 +0,0 @@ -/* - * Copyright 2013-2025 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.awspring.cloud.sqs.listener.errorhandler; - -import io.awspring.cloud.sqs.MessageHeaderUtils; -import io.awspring.cloud.sqs.listener.BatchVisibility; -import io.awspring.cloud.sqs.listener.Visibility; -import java.util.Collection; -import java.util.Random; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ThreadLocalRandom; -import java.util.function.Supplier; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.messaging.Message; -import org.springframework.util.Assert; - -/** - * An implementation of an Exponential Backoff with half-jitter error handler for asynchronous message processing. - * - *

- * This error handler sets the SQS message visibility timeout exponentially based on the number of received attempts - * whenever an exception occurs. - * - *

- * When AcknowledgementMode is set to ON_SUCCESS (the default), returning a failed future prevents the message from - * being acknowledged. - * - * @author Bruno Garcia - * @author Rafael Pavarini - */ -public class ExponentialBackoffErrorHandlerWithHalfJitter implements AsyncErrorHandler { - private static final Logger logger = LoggerFactory.getLogger(ExponentialBackoffErrorHandlerWithHalfJitter.class); - - private final int initialVisibilityTimeoutSeconds; - private final double multiplier; - private final int maxVisibilityTimeoutSeconds; - private final Supplier randomSupplier; - - private ExponentialBackoffErrorHandlerWithHalfJitter(int initialVisibilityTimeoutSeconds, double multiplier, - int maxVisibilityTimeoutSeconds, Supplier randomSupplier) { - this.initialVisibilityTimeoutSeconds = initialVisibilityTimeoutSeconds; - this.multiplier = multiplier; - this.maxVisibilityTimeoutSeconds = maxVisibilityTimeoutSeconds; - this.randomSupplier = randomSupplier; - } - - @Override - public CompletableFuture handle(Message message, Throwable t) { - return applyExponentialBackoffVisibilityTimeoutWithHalfJitter(message) - .thenCompose(theVoid -> CompletableFuture.failedFuture(t)); - } - - @Override - public CompletableFuture handle(Collection> messages, Throwable t) { - return applyExponentialBackoffVisibilityTimeoutWithHalfJitter(messages) - .thenCompose(theVoid -> CompletableFuture.failedFuture(t)); - } - - private CompletableFuture applyExponentialBackoffVisibilityTimeoutWithHalfJitter( - Collection> messages) { - CompletableFuture[] futures = ErrorHandlerVisibilityHelper.groupMessagesByReceiveMessageCount(messages) - .entrySet().stream().map(entry -> { - int timeout = calculateTimeout(entry.getKey()); - return applyBatchVisibilityChange(entry.getValue(), timeout); - }).toArray(CompletableFuture[]::new); - - return CompletableFuture.allOf(futures); - } - - private CompletableFuture applyBatchVisibilityChange(Collection> messages, int timeout) { - logger.debug("Changing batch visibility timeout to {} - Messages Id {}", timeout, - MessageHeaderUtils.getId(messages)); - BatchVisibility visibility = ErrorHandlerVisibilityHelper.getVisibility(messages); - return visibility.changeToAsync(timeout).exceptionallyCompose(throwable -> { - logger.warn("Failed to change batch visibility timeout to {} - Messages Id {}", timeout, - MessageHeaderUtils.getId(messages), throwable); - return CompletableFuture.failedFuture(throwable); - }); - } - - private CompletableFuture applyExponentialBackoffVisibilityTimeoutWithHalfJitter(Message message) { - int timeout = calculateTimeout(message); - Visibility visibility = ErrorHandlerVisibilityHelper.getVisibility(message); - logger.debug("Changing visibility timeout to {} - Message Id {}", timeout, message.getHeaders().getId()); - return visibility.changeToAsync(timeout).exceptionallyCompose(throwable -> { - logger.warn("Failed to change visibility timeout to {} - Message Id {}", timeout, - message.getHeaders().getId(), throwable); - return CompletableFuture.failedFuture(throwable); - }); - } - - private int calculateTimeout(Message message) { - long receiveMessageCount = ErrorHandlerVisibilityHelper.getReceiveMessageCount(message); - return calculateTimeout(receiveMessageCount); - } - - private int calculateTimeout(long receiveMessageCount) { - int timeout = ErrorHandlerVisibilityHelper.calculateVisibilityTimeoutExponentially(receiveMessageCount, - initialVisibilityTimeoutSeconds, multiplier, maxVisibilityTimeoutSeconds); - int half = timeout / 2; - int jitter = randomSupplier.get().nextInt(0, half + 1); - return half + jitter; - } - - public static ExponentialBackoffErrorHandlerWithHalfJitter.Builder builder() { - return new ExponentialBackoffErrorHandlerWithHalfJitter.Builder<>(); - } - - public static class Builder { - - private int initialVisibilityTimeoutSeconds = BackoffVisibilityConstants.DEFAULT_INITIAL_VISIBILITY_TIMEOUT_SECONDS; - private double multiplier = BackoffVisibilityConstants.DEFAULT_MULTIPLIER; - private int maxVisibilityTimeoutSeconds = BackoffVisibilityConstants.DEFAULT_MAX_VISIBILITY_TIMEOUT_SECONDS; - private Supplier randomSupplier = ThreadLocalRandom::current; - - public ExponentialBackoffErrorHandlerWithHalfJitter.Builder randomSupplier(Supplier randomSupplier) { - Assert.notNull(randomSupplier, "randomSupplier cannot be null"); - this.randomSupplier = randomSupplier; - return this; - } - - public ExponentialBackoffErrorHandlerWithHalfJitter.Builder initialVisibilityTimeoutSeconds( - int initialVisibilityTimeoutSeconds) { - ErrorHandlerVisibilityHelper.checkVisibilityTimeout(initialVisibilityTimeoutSeconds); - this.initialVisibilityTimeoutSeconds = initialVisibilityTimeoutSeconds; - return this; - } - - public ExponentialBackoffErrorHandlerWithHalfJitter.Builder multiplier(double multiplier) { - Assert.isTrue(multiplier >= 1, - () -> "Invalid multiplier '" + multiplier + "'. Should be greater than " + "or equal to 1."); - this.multiplier = multiplier; - return this; - } - - public ExponentialBackoffErrorHandlerWithHalfJitter.Builder maxVisibilityTimeoutSeconds( - int maxVisibilityTimeoutSeconds) { - ErrorHandlerVisibilityHelper.checkVisibilityTimeout(maxVisibilityTimeoutSeconds); - this.maxVisibilityTimeoutSeconds = maxVisibilityTimeoutSeconds; - return this; - } - - public ExponentialBackoffErrorHandlerWithHalfJitter build() { - Assert.isTrue(initialVisibilityTimeoutSeconds <= maxVisibilityTimeoutSeconds, - "Initial visibility timeout must not exceed max visibility timeout"); - return new ExponentialBackoffErrorHandlerWithHalfJitter<>(initialVisibilityTimeoutSeconds, multiplier, - maxVisibilityTimeoutSeconds, randomSupplier); - } - } -} diff --git a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/errorhandler/Jitter.java b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/errorhandler/Jitter.java new file mode 100644 index 000000000..10d9746c6 --- /dev/null +++ b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/errorhandler/Jitter.java @@ -0,0 +1,90 @@ +/* + * Copyright 2013-2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.awspring.cloud.sqs.listener.errorhandler; + +import java.util.Random; +import java.util.function.Supplier; +import org.springframework.util.Assert; + +/** + * Strategy interface for applying jitter to exponential backoff calculations. + * + * @author Bruno Garcia + * @author Rafael Pavarini + * @author Tomaz Fernandes + */ +public interface Jitter { + + /** + * Apply jitter to the calculated timeout value. + * + * @param context the jitter context containing timeout and random supplier + * @return the timeout value with jitter applied + */ + int applyJitter(Context context); + + /** + * Context for jitter calculations. + */ + class Context { + private final int timeout; + private final Supplier randomSupplier; + + /** + * Create a new Context instance. + * + * @param timeout the timeout value (must be >= 1) + * @param randomSupplier the random supplier (must not be null) + */ + public Context(int timeout, Supplier randomSupplier) { + Assert.isTrue(timeout >= 1, () -> "Timeout must be >= 1, but was " + timeout); + Assert.notNull(randomSupplier, "Random supplier cannot be null"); + this.timeout = timeout; + this.randomSupplier = randomSupplier; + } + + public int getTimeout() { + return timeout; + } + + public Supplier getRandomSupplier() { + return randomSupplier; + } + } + + /** + * No jitter strategy - returns the original timeout value. The original timeout value is expected to be valid + * (greater than 0) as it's validated in the builder. + */ + Jitter NONE = Context::getTimeout; + + /** + * Full jitter strategy - returns a random value between 0 and the original timeout. Ensures the result is at least + * 1 to avoid invalid timeout values. + */ + Jitter FULL = context -> Math.max(1, context.getRandomSupplier().get().nextInt(context.getTimeout() + 1)); + + /** + * Half jitter strategy - returns a value uniformly between ceil(timeout/2) and timeout. Ensures the result is at + * least 1 to avoid invalid timeout values. + */ + Jitter HALF = context -> { + int timeout = context.getTimeout(); + int lowerBound = Math.max(1, (int) Math.ceil(timeout / 2.0)); + return context.getRandomSupplier().get().nextInt(lowerBound, timeout + 1); + }; + +} diff --git a/spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/integration/SqsErrorHandlerIntegrationTests.java b/spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/integration/SqsErrorHandlerIntegrationTests.java index ffb45ac5c..38deb6975 100644 --- a/spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/integration/SqsErrorHandlerIntegrationTests.java +++ b/spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/integration/SqsErrorHandlerIntegrationTests.java @@ -214,7 +214,7 @@ void receivesBatchMessageExponentialHalfJitterBackOffErrorHandler() throws Excep sqsTemplate.sendManyAsync(SUCCESS_EXPONENTIAL_HALF_JITTER_BACKOFF_ERROR_HANDLER_BATCH_QUEUE, messages); logger.debug("Sent message to queue {} with messageBody {}", SUCCESS_EXPONENTIAL_HALF_JITTER_BACKOFF_ERROR_HANDLER_BATCH_QUEUE, messages); - await().atLeast(50, TimeUnit.SECONDS).atMost(64, TimeUnit.SECONDS) + await().atLeast(32, TimeUnit.SECONDS).atMost(64, TimeUnit.SECONDS) .until(() -> latchContainer.receivesRetryBatchMessageHalfJitterLatch.getCount() == 0); } @@ -638,10 +638,11 @@ public SqsMessageListenerContainerFactory exponentialBackOffFullJitterEr .maxMessagesPerPoll(10) .queueAttributeNames(Collections.singletonList(QueueAttributeName.QUEUE_ARN)) .maxDelayBetweenPolls(Duration.ofSeconds(15))) - .errorHandler(ExponentialBackoffErrorHandlerWithFullJitter.builder() + .errorHandler(ExponentialBackoffErrorHandler.builder() .initialVisibilityTimeoutSeconds(ExponentialBackOffJitterErrorHandlerListener.initialValueSeconds) .multiplier(ExponentialBackOffJitterErrorHandlerListener.multiplier) .randomSupplier(() -> new MockedRandomNextInt(getRandomFunction())) + .jitter(Jitter.FULL) .build()) .sqsAsyncClientSupplier(BaseSqsIntegrationTest::createAsyncClient) .build(); @@ -659,10 +660,11 @@ public SqsMessageListenerContainerFactory exponentialBackOffHalfJitterEr .maxMessagesPerPoll(10) .queueAttributeNames(Collections.singletonList(QueueAttributeName.QUEUE_ARN)) .maxDelayBetweenPolls(Duration.ofSeconds(15))) - .errorHandler(ExponentialBackoffErrorHandlerWithHalfJitter.builder() + .errorHandler(ExponentialBackoffErrorHandler.builder() .initialVisibilityTimeoutSeconds(ExponentialBackOffJitterErrorHandlerListener.initialValueSeconds) .multiplier(ExponentialBackOffJitterErrorHandlerListener.multiplier) .randomSupplier(() -> new MockedRandomNextInt(getRandomFunction())) + .jitter(Jitter.HALF) .build()) .sqsAsyncClientSupplier(BaseSqsIntegrationTest::createAsyncClient) .build(); @@ -774,13 +776,7 @@ private static long calculateJitterTotalElapsedExpectedTime(long receiveCount, b for (int i = 0; i < receiveCount - 1; i++) { long timeout = (long) (ExponentialBackOffJitterErrorHandlerListener.initialValueSeconds * Math.pow(ExponentialBackOffJitterErrorHandlerListener.multiplier, i)); - if (halfJitter) { - long half = timeout / 2; - sum += half + (half + 1) / 2; - } - else { - sum += (timeout + 1) / 2; - } + sum += timeout / 2; } return sum; } diff --git a/spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/listener/errorhandler/BaseExponentialBackoffErrorHandlerJitterTest.java b/spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/listener/errorhandler/BaseExponentialBackoffErrorHandlerJitterTest.java index f07b70785..aff48fc6f 100644 --- a/spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/listener/errorhandler/BaseExponentialBackoffErrorHandlerJitterTest.java +++ b/spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/listener/errorhandler/BaseExponentialBackoffErrorHandlerJitterTest.java @@ -62,39 +62,39 @@ BaseTestCase VisibilityTimeoutExpectedFullJitter(int VisibilityTimeoutExpectedFu } CompletableFuture calculateWithVisibilityTimeoutExpectedHalfJitter(Message message, Throwable t) { - ExponentialBackoffErrorHandlerWithHalfJitter handler = ExponentialBackoffErrorHandlerWithHalfJitter - .builder().randomSupplier(this.randomSupplier) + ExponentialBackoffErrorHandler handler = ExponentialBackoffErrorHandler.builder() + .randomSupplier(this.randomSupplier) .initialVisibilityTimeoutSeconds(this.initialVisibilityTimeoutSeconds).multiplier(this.multiplier) - .build(); + .jitter(Jitter.HALF).build(); return handler.handle(message, t); } CompletableFuture calculateWithVisibilityTimeoutExpectedHalfJitter(Collection> messages, Throwable t) { - ExponentialBackoffErrorHandlerWithHalfJitter handler = ExponentialBackoffErrorHandlerWithHalfJitter - .builder().randomSupplier(this.randomSupplier) + ExponentialBackoffErrorHandler handler = ExponentialBackoffErrorHandler.builder() + .randomSupplier(this.randomSupplier) .initialVisibilityTimeoutSeconds(this.initialVisibilityTimeoutSeconds).multiplier(this.multiplier) - .build(); + .jitter(Jitter.HALF).build(); return handler.handle(messages, t); } CompletableFuture calculateWithVisibilityTimeoutExpectedFullJitter(Message message, Throwable t) { - ExponentialBackoffErrorHandlerWithFullJitter handler = ExponentialBackoffErrorHandlerWithFullJitter - .builder().randomSupplier(this.randomSupplier) + ExponentialBackoffErrorHandler handler = ExponentialBackoffErrorHandler.builder() + .randomSupplier(this.randomSupplier) .initialVisibilityTimeoutSeconds(this.initialVisibilityTimeoutSeconds).multiplier(this.multiplier) - .build(); + .jitter(Jitter.FULL).build(); return handler.handle(message, t); } CompletableFuture calculateWithVisibilityTimeoutExpectedFullJitter(Collection> messages, Throwable t) { - ExponentialBackoffErrorHandlerWithFullJitter handler = ExponentialBackoffErrorHandlerWithFullJitter - .builder().randomSupplier(this.randomSupplier) + ExponentialBackoffErrorHandler handler = ExponentialBackoffErrorHandler.builder() + .randomSupplier(this.randomSupplier) .initialVisibilityTimeoutSeconds(this.initialVisibilityTimeoutSeconds).multiplier(this.multiplier) - .build(); + .jitter(Jitter.FULL).build(); return handler.handle(messages, t); } @@ -114,7 +114,7 @@ public int nextInt(int bound) { @Override public int nextInt(int origin, int bound) { - return nextInt.apply(bound); + return nextInt.apply(bound - origin) + origin; } } } diff --git a/spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/listener/errorhandler/ExponentialBackoffErrorHandlerJitterTest.java b/spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/listener/errorhandler/ExponentialBackoffErrorHandlerJitterTest.java index ef330c3a5..02903e2e6 100644 --- a/spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/listener/errorhandler/ExponentialBackoffErrorHandlerJitterTest.java +++ b/spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/listener/errorhandler/ExponentialBackoffErrorHandlerJitterTest.java @@ -36,8 +36,7 @@ import org.springframework.messaging.MessageHeaders; /** - * Tests for {@link ExponentialBackoffErrorHandlerWithFullJitter} and - * {@link ExponentialBackoffErrorHandlerWithHalfJitter}. + * Tests for {@link ExponentialBackoffErrorHandler} with different {@link Jitter} implementations. * * @author Bruno Garcia * @author Rafael Pavarini @@ -143,10 +142,8 @@ void calculateExponentialHalfJitter(BaseTestCase baseTestCase) { private static Collection testCases() { return List.of( - baseTestCaseMidRandomSupplier().sqsApproximateReceiveCount("1").VisibilityTimeoutExpectedHalfJitter( - (int) ((BackoffVisibilityConstants.DEFAULT_INITIAL_VISIBILITY_TIMEOUT_SECONDS * 1.5) / 2)) - .VisibilityTimeoutExpectedFullJitter( - BackoffVisibilityConstants.DEFAULT_INITIAL_VISIBILITY_TIMEOUT_SECONDS / 2), + baseTestCaseMidRandomSupplier().sqsApproximateReceiveCount("1").VisibilityTimeoutExpectedHalfJitter(75) + .VisibilityTimeoutExpectedFullJitter(50), baseTestCaseMidRandomSupplier().sqsApproximateReceiveCount("2").VisibilityTimeoutExpectedHalfJitter(150) .VisibilityTimeoutExpectedFullJitter(100), @@ -161,20 +158,13 @@ private static Collection testCases() { .VisibilityTimeoutExpectedHalfJitter(4800).VisibilityTimeoutExpectedFullJitter(3200), baseTestCaseMidRandomSupplier().sqsApproximateReceiveCount("11") - .VisibilityTimeoutExpectedHalfJitter(Visibility.MAX_VISIBILITY_TIMEOUT_SECONDS / 2 - + Visibility.MAX_VISIBILITY_TIMEOUT_SECONDS / 4) - .VisibilityTimeoutExpectedFullJitter(Visibility.MAX_VISIBILITY_TIMEOUT_SECONDS / 2), + .VisibilityTimeoutExpectedHalfJitter(32400).VisibilityTimeoutExpectedFullJitter(21600), baseTestCaseMidRandomSupplier().sqsApproximateReceiveCount("13") - .VisibilityTimeoutExpectedHalfJitter(Visibility.MAX_VISIBILITY_TIMEOUT_SECONDS / 2 - + Visibility.MAX_VISIBILITY_TIMEOUT_SECONDS / 4) - .VisibilityTimeoutExpectedFullJitter(Visibility.MAX_VISIBILITY_TIMEOUT_SECONDS / 2), + .VisibilityTimeoutExpectedHalfJitter(32400).VisibilityTimeoutExpectedFullJitter(21600), - baseTestCaseMaxRandomSupplier().sqsApproximateReceiveCount("1") - .VisibilityTimeoutExpectedHalfJitter( - BackoffVisibilityConstants.DEFAULT_INITIAL_VISIBILITY_TIMEOUT_SECONDS) - .VisibilityTimeoutExpectedFullJitter( - BackoffVisibilityConstants.DEFAULT_INITIAL_VISIBILITY_TIMEOUT_SECONDS), + baseTestCaseMaxRandomSupplier().sqsApproximateReceiveCount("1").VisibilityTimeoutExpectedHalfJitter(100) + .VisibilityTimeoutExpectedFullJitter(100), baseTestCaseMaxRandomSupplier().sqsApproximateReceiveCount("2").VisibilityTimeoutExpectedHalfJitter(200) .VisibilityTimeoutExpectedFullJitter(200), @@ -189,12 +179,10 @@ private static Collection testCases() { .VisibilityTimeoutExpectedHalfJitter(6400).VisibilityTimeoutExpectedFullJitter(6400), baseTestCaseMaxRandomSupplier().sqsApproximateReceiveCount("11") - .VisibilityTimeoutExpectedHalfJitter(Visibility.MAX_VISIBILITY_TIMEOUT_SECONDS) - .VisibilityTimeoutExpectedFullJitter(Visibility.MAX_VISIBILITY_TIMEOUT_SECONDS), + .VisibilityTimeoutExpectedHalfJitter(43200).VisibilityTimeoutExpectedFullJitter(43200), baseTestCaseMaxRandomSupplier().sqsApproximateReceiveCount("13") - .VisibilityTimeoutExpectedHalfJitter(Visibility.MAX_VISIBILITY_TIMEOUT_SECONDS) - .VisibilityTimeoutExpectedFullJitter(Visibility.MAX_VISIBILITY_TIMEOUT_SECONDS)); + .VisibilityTimeoutExpectedHalfJitter(43200).VisibilityTimeoutExpectedFullJitter(43200)); } private static BaseTestCase baseTestCaseMidRandomSupplier() {