diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/ErrorHandlingUtils.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/ErrorHandlingUtils.java index 0a2a2d5af8..f4901ea1ba 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/ErrorHandlingUtils.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/ErrorHandlingUtils.java @@ -45,6 +45,8 @@ */ public final class ErrorHandlingUtils { + static Runnable NO_OP = () -> { }; + private ErrorHandlingUtils() { } @@ -126,25 +128,35 @@ public static void retryBatch(Exception thrownException, ConsumerRecords r consumer.poll(Duration.ZERO); } catch (WakeupException we) { - seeker.handleBatch(thrownException, records, consumer, container, () -> { }); + seeker.handleBatch(thrownException, records, consumer, container, NO_OP); throw new KafkaException("Woken up during retry", logLevel, we); } try { - ListenerUtils.stoppableSleep(container, nextBackOff); + ListenerUtils.conditionalSleep( + () -> container.isRunning() && + !container.isPauseRequested() && + records.partitions().stream().noneMatch(container::isPartitionPauseRequested), + nextBackOff + ); } catch (InterruptedException e1) { Thread.currentThread().interrupt(); - seeker.handleBatch(thrownException, records, consumer, container, () -> { }); + seeker.handleBatch(thrownException, records, consumer, container, NO_OP); throw new KafkaException("Interrupted during retry", logLevel, e1); } if (!container.isRunning()) { throw new KafkaException("Container stopped during retries"); } + if (container.isPauseRequested() || + records.partitions().stream().anyMatch(container::isPartitionPauseRequested)) { + seeker.handleBatch(thrownException, records, consumer, container, NO_OP); + throw new KafkaException("Container paused requested during retries"); + } try { consumer.poll(Duration.ZERO); } catch (WakeupException we) { - seeker.handleBatch(thrownException, records, consumer, container, () -> { }); + seeker.handleBatch(thrownException, records, consumer, container, NO_OP); throw new KafkaException("Woken up during retry", logLevel, we); } try { @@ -176,7 +188,7 @@ public static void retryBatch(Exception thrownException, ConsumerRecords r catch (Exception ex) { logger.error(ex, () -> "Recoverer threw an exception; re-seeking batch"); retryListeners.forEach(listener -> listener.recoveryFailed(records, thrownException, ex)); - seeker.handleBatch(thrownException, records, consumer, container, () -> { }); + seeker.handleBatch(thrownException, records, consumer, container, NO_OP); } } finally { diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/ListenerUtils.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/ListenerUtils.java index d6f5652826..2eae8a4627 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/ListenerUtils.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/ListenerUtils.java @@ -1,5 +1,5 @@ /* - * Copyright 2017-2022 the original author or authors. + * Copyright 2017-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,6 +20,7 @@ import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectStreamClass; +import java.util.function.Supplier; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.OffsetAndMetadata; @@ -39,6 +40,7 @@ * * @author Gary Russell * @author Francois Rosiere + * @author Antonio Tomac * @since 2.0 * */ @@ -185,11 +187,22 @@ public static void unrecoverableBackOff(BackOff backOff, ThreadLocal shouldSleepCondition, long interval) throws InterruptedException { long timeout = System.currentTimeMillis() + interval; long sleepInterval = interval > SMALL_INTERVAL_THRESHOLD ? DEFAULT_SLEEP_INTERVAL : SMALL_SLEEP_INTERVAL; do { Thread.sleep(sleepInterval); - if (!container.isRunning()) { + if (!shouldSleepCondition.get()) { break; } } diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/ErrorHandlingUtilsTest.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/ErrorHandlingUtilsTest.java new file mode 100644 index 0000000000..9fb8bd35d3 --- /dev/null +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/ErrorHandlingUtilsTest.java @@ -0,0 +1,150 @@ +/* + * Copyright 2017-2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.listener; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.mockito.BDDMockito.willReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoInteractions; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.function.BiConsumer; +import java.util.stream.Collectors; + +import org.apache.commons.logging.LogFactory; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.common.TopicPartition; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import org.springframework.classify.BinaryExceptionClassifier; +import org.springframework.core.log.LogAccessor; +import org.springframework.kafka.KafkaException; +import org.springframework.util.backoff.BackOff; +import org.springframework.util.backoff.FixedBackOff; + +/** + * @author Antonio Tomac + * @since 3.0.9 + * + */ +class ErrorHandlingUtilsTest { + + private final Exception thrownException = new RuntimeException("initial cause"); + private final Consumer consumer = mock(Consumer.class); + private final MessageListenerContainer container = mock(MessageListenerContainer.class); + private final Runnable listener = mock(Runnable.class); + private final BackOff backOff = new FixedBackOff(1000, 3); + private final CommonErrorHandler seeker = mock(CommonErrorHandler.class); + @SuppressWarnings("unchecked") + private final BiConsumer, Exception> recoverer = mock(BiConsumer.class); + private final LogAccessor logger = new LogAccessor(LogFactory.getLog(ErrorHandlingUtilsTest.class)); + private final List retryListeners = new ArrayList<>(); + private final BinaryExceptionClassifier classifier = BinaryExceptionClassifier.defaultClassifier(); + + private final ConsumerRecords consumerRecords = recordsOf( + new ConsumerRecord<>("foo", 0, 0L, "a", "a"), + new ConsumerRecord<>("foo", 1, 0L, "b", "b") + ); + + @SafeVarargs + private ConsumerRecords recordsOf(ConsumerRecord... records) { + return new ConsumerRecords<>( + Arrays.stream(records).collect(Collectors.groupingBy( + (cr) -> new TopicPartition(cr.topic(), cr.partition()) + )) + ); + } + + @BeforeEach + public void resetMocks() { + reset(consumer, container, listener, seeker, recoverer); + willReturn(true).given(container).isRunning(); + willReturn(false).given(container).isPauseRequested(); + } + + private void doRetries() { + ErrorHandlingUtils.retryBatch( + thrownException, consumerRecords, consumer, container, listener, backOff, + seeker, recoverer, logger, KafkaException.Level.INFO, retryListeners, + classifier, true + ); + } + + private long execDurationOf(Runnable runnable) { + long start = System.currentTimeMillis(); + runnable.run(); + long end = System.currentTimeMillis(); + return end - start; + } + + @Test + void testStopRetriesWhenNotRunning() { + willReturn(false).given(container).isRunning(); + assertThatThrownBy(this::doRetries) + .isInstanceOf(KafkaException.class) + .message().isEqualTo("Container stopped during retries"); + verifyNoInteractions(seeker, listener, recoverer); + } + + @Test + void testOneSuccessfulRetry() { + long duration = execDurationOf(this::doRetries); + assertThat(duration).as("duration of one round of sleep").isGreaterThanOrEqualTo(1000L); + verifyNoInteractions(seeker, recoverer); + verify(listener, times(1)).run(); + verifyNoInteractions(seeker, recoverer); + } + + @Test + void stopRetriesWhenContainerIsPaused() { + willReturn(true).given(container).isPauseRequested(); + long duration = execDurationOf(() -> + assertThatThrownBy(this::doRetries) + .isInstanceOf(KafkaException.class) + .message().isEqualTo("Container paused requested during retries") + ); + assertThat(duration) + .as("duration should not be full retry interval") + .isLessThan(1000L); + verify(seeker).handleBatch(thrownException, consumerRecords, consumer, container, ErrorHandlingUtils.NO_OP); + verifyNoInteractions(listener, recoverer); + } + + @Test + void stopRetriesWhenPartitionIsPaused() { + willReturn(true).given(container).isPartitionPauseRequested(new TopicPartition("foo", 1)); + long duration = execDurationOf(() -> + assertThatThrownBy(this::doRetries) + .isInstanceOf(KafkaException.class) + .message().isEqualTo("Container paused requested during retries") + ); + assertThat(duration) + .as("duration should not be full retry interval") + .isLessThan(1000L); + verify(seeker).handleBatch(thrownException, consumerRecords, consumer, container, ErrorHandlingUtils.NO_OP); + verifyNoInteractions(listener, recoverer); + } +} diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/ListenerUtilsTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/ListenerUtilsTests.java index c60ce19827..ae31c9d5b1 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/ListenerUtilsTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/ListenerUtilsTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2021-2022 the original author or authors. + * Copyright 2021-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -26,6 +26,7 @@ /** * @author Gary Russell * @author Francois Rosiere + * @author Antonio Tomac * @since 2.7.1 * */ @@ -40,6 +41,20 @@ void stoppableSleep() throws InterruptedException { assertThat(System.currentTimeMillis() - t1).isGreaterThanOrEqualTo(500); } + @Test + void conditionalSleepWithConditionTrue() throws InterruptedException { + long t1 = System.currentTimeMillis(); + ListenerUtils.conditionalSleep(() -> true, 500); + assertThat(System.currentTimeMillis() - t1).isGreaterThanOrEqualTo(500); + } + + @Test + void conditionalSleepWithConditionFalse() throws InterruptedException { + long t1 = System.currentTimeMillis(); + ListenerUtils.conditionalSleep(() -> false, 500); + assertThat(System.currentTimeMillis() - t1).isLessThan(500); + } + @Test void testCreationOfOffsetAndMetadataWithoutProvider() { final MessageListenerContainer container = mock(MessageListenerContainer.class);