Skip to content

Commit

Permalink
spring-projectsGH-2742: Exit batch retries when container is paused (s…
Browse files Browse the repository at this point in the history
…pring-projects#2743)

* spring-projectsGH-2742: Exit batch retries when container is paused

* spring-projectsGH-2742: fix code style spaces

* spring-projectsGH-2742: better method name for what it does
  • Loading branch information
antonio-tomac committed Jul 16, 2023
1 parent bb34b3e commit b5506ee
Show file tree
Hide file tree
Showing 4 changed files with 198 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@
*/
public final class ErrorHandlingUtils {

static Runnable NO_OP = () -> { };

private ErrorHandlingUtils() {
}

Expand Down Expand Up @@ -126,25 +128,35 @@ public static void retryBatch(Exception thrownException, ConsumerRecords<?, ?> r
consumer.poll(Duration.ZERO);
}
catch (WakeupException we) {
seeker.handleBatch(thrownException, records, consumer, container, () -> { });
seeker.handleBatch(thrownException, records, consumer, container, NO_OP);
throw new KafkaException("Woken up during retry", logLevel, we);
}
try {
ListenerUtils.stoppableSleep(container, nextBackOff);
ListenerUtils.conditionalSleep(
() -> container.isRunning() &&
!container.isPauseRequested() &&
records.partitions().stream().noneMatch(container::isPartitionPauseRequested),
nextBackOff
);
}
catch (InterruptedException e1) {
Thread.currentThread().interrupt();
seeker.handleBatch(thrownException, records, consumer, container, () -> { });
seeker.handleBatch(thrownException, records, consumer, container, NO_OP);
throw new KafkaException("Interrupted during retry", logLevel, e1);
}
if (!container.isRunning()) {
throw new KafkaException("Container stopped during retries");
}
if (container.isPauseRequested() ||
records.partitions().stream().anyMatch(container::isPartitionPauseRequested)) {
seeker.handleBatch(thrownException, records, consumer, container, NO_OP);
throw new KafkaException("Container paused requested during retries");
}
try {
consumer.poll(Duration.ZERO);
}
catch (WakeupException we) {
seeker.handleBatch(thrownException, records, consumer, container, () -> { });
seeker.handleBatch(thrownException, records, consumer, container, NO_OP);
throw new KafkaException("Woken up during retry", logLevel, we);
}
try {
Expand Down Expand Up @@ -176,7 +188,7 @@ public static void retryBatch(Exception thrownException, ConsumerRecords<?, ?> r
catch (Exception ex) {
logger.error(ex, () -> "Recoverer threw an exception; re-seeking batch");
retryListeners.forEach(listener -> listener.recoveryFailed(records, thrownException, ex));
seeker.handleBatch(thrownException, records, consumer, container, () -> { });
seeker.handleBatch(thrownException, records, consumer, container, NO_OP);
}
}
finally {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2017-2022 the original author or authors.
* Copyright 2017-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -20,6 +20,7 @@
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectStreamClass;
import java.util.function.Supplier;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
Expand All @@ -39,6 +40,7 @@
*
* @author Gary Russell
* @author Francois Rosiere
* @author Antonio Tomac
* @since 2.0
*
*/
Expand Down Expand Up @@ -185,11 +187,22 @@ public static void unrecoverableBackOff(BackOff backOff, ThreadLocal<BackOffExec
* @since 2.7
*/
public static void stoppableSleep(MessageListenerContainer container, long interval) throws InterruptedException {
conditionalSleep(container::isRunning, interval);
}

/**
* Sleep for the desired timeout, as long as shouldSleepCondition supplies true.
* @param shouldSleepCondition to.
* @param interval the timeout.
* @throws InterruptedException if the thread is interrupted.
* @since 3.0.9
*/
public static void conditionalSleep(Supplier<Boolean> shouldSleepCondition, long interval) throws InterruptedException {
long timeout = System.currentTimeMillis() + interval;
long sleepInterval = interval > SMALL_INTERVAL_THRESHOLD ? DEFAULT_SLEEP_INTERVAL : SMALL_SLEEP_INTERVAL;
do {
Thread.sleep(sleepInterval);
if (!container.isRunning()) {
if (!shouldSleepCondition.get()) {
break;
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
/*
* Copyright 2017-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.kafka.listener;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.mockito.BDDMockito.willReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoInteractions;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;

import org.apache.commons.logging.LogFactory;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.TopicPartition;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import org.springframework.classify.BinaryExceptionClassifier;
import org.springframework.core.log.LogAccessor;
import org.springframework.kafka.KafkaException;
import org.springframework.util.backoff.BackOff;
import org.springframework.util.backoff.FixedBackOff;

/**
* @author Antonio Tomac
* @since 3.0.9
*
*/
class ErrorHandlingUtilsTest {

private final Exception thrownException = new RuntimeException("initial cause");
private final Consumer<?, ?> consumer = mock(Consumer.class);
private final MessageListenerContainer container = mock(MessageListenerContainer.class);
private final Runnable listener = mock(Runnable.class);
private final BackOff backOff = new FixedBackOff(1000, 3);
private final CommonErrorHandler seeker = mock(CommonErrorHandler.class);
@SuppressWarnings("unchecked")
private final BiConsumer<ConsumerRecords<?, ?>, Exception> recoverer = mock(BiConsumer.class);
private final LogAccessor logger = new LogAccessor(LogFactory.getLog(ErrorHandlingUtilsTest.class));
private final List<RetryListener> retryListeners = new ArrayList<>();
private final BinaryExceptionClassifier classifier = BinaryExceptionClassifier.defaultClassifier();

private final ConsumerRecords<?, ?> consumerRecords = recordsOf(
new ConsumerRecord<>("foo", 0, 0L, "a", "a"),
new ConsumerRecord<>("foo", 1, 0L, "b", "b")
);

@SafeVarargs
private <K, V> ConsumerRecords<K, V> recordsOf(ConsumerRecord<K, V>... records) {
return new ConsumerRecords<>(
Arrays.stream(records).collect(Collectors.groupingBy(
(cr) -> new TopicPartition(cr.topic(), cr.partition())
))
);
}

@BeforeEach
public void resetMocks() {
reset(consumer, container, listener, seeker, recoverer);
willReturn(true).given(container).isRunning();
willReturn(false).given(container).isPauseRequested();
}

private void doRetries() {
ErrorHandlingUtils.retryBatch(
thrownException, consumerRecords, consumer, container, listener, backOff,
seeker, recoverer, logger, KafkaException.Level.INFO, retryListeners,
classifier, true
);
}

private long execDurationOf(Runnable runnable) {
long start = System.currentTimeMillis();
runnable.run();
long end = System.currentTimeMillis();
return end - start;
}

@Test
void testStopRetriesWhenNotRunning() {
willReturn(false).given(container).isRunning();
assertThatThrownBy(this::doRetries)
.isInstanceOf(KafkaException.class)
.message().isEqualTo("Container stopped during retries");
verifyNoInteractions(seeker, listener, recoverer);
}

@Test
void testOneSuccessfulRetry() {
long duration = execDurationOf(this::doRetries);
assertThat(duration).as("duration of one round of sleep").isGreaterThanOrEqualTo(1000L);
verifyNoInteractions(seeker, recoverer);
verify(listener, times(1)).run();
verifyNoInteractions(seeker, recoverer);
}

@Test
void stopRetriesWhenContainerIsPaused() {
willReturn(true).given(container).isPauseRequested();
long duration = execDurationOf(() ->
assertThatThrownBy(this::doRetries)
.isInstanceOf(KafkaException.class)
.message().isEqualTo("Container paused requested during retries")
);
assertThat(duration)
.as("duration should not be full retry interval")
.isLessThan(1000L);
verify(seeker).handleBatch(thrownException, consumerRecords, consumer, container, ErrorHandlingUtils.NO_OP);
verifyNoInteractions(listener, recoverer);
}

@Test
void stopRetriesWhenPartitionIsPaused() {
willReturn(true).given(container).isPartitionPauseRequested(new TopicPartition("foo", 1));
long duration = execDurationOf(() ->
assertThatThrownBy(this::doRetries)
.isInstanceOf(KafkaException.class)
.message().isEqualTo("Container paused requested during retries")
);
assertThat(duration)
.as("duration should not be full retry interval")
.isLessThan(1000L);
verify(seeker).handleBatch(thrownException, consumerRecords, consumer, container, ErrorHandlingUtils.NO_OP);
verifyNoInteractions(listener, recoverer);
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2021-2022 the original author or authors.
* Copyright 2021-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -26,6 +26,7 @@
/**
* @author Gary Russell
* @author Francois Rosiere
* @author Antonio Tomac
* @since 2.7.1
*
*/
Expand All @@ -40,6 +41,20 @@ void stoppableSleep() throws InterruptedException {
assertThat(System.currentTimeMillis() - t1).isGreaterThanOrEqualTo(500);
}

@Test
void conditionalSleepWithConditionTrue() throws InterruptedException {
long t1 = System.currentTimeMillis();
ListenerUtils.conditionalSleep(() -> true, 500);
assertThat(System.currentTimeMillis() - t1).isGreaterThanOrEqualTo(500);
}

@Test
void conditionalSleepWithConditionFalse() throws InterruptedException {
long t1 = System.currentTimeMillis();
ListenerUtils.conditionalSleep(() -> false, 500);
assertThat(System.currentTimeMillis() - t1).isLessThan(500);
}

@Test
void testCreationOfOffsetAndMetadataWithoutProvider() {
final MessageListenerContainer container = mock(MessageListenerContainer.class);
Expand Down

0 comments on commit b5506ee

Please sign in to comment.