Skip to content

Commit

Permalink
Polishing pre review comments.
Browse files Browse the repository at this point in the history
  • Loading branch information
garyrussell committed Apr 27, 2022
1 parent 2c04ad6 commit db68ab7
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 22 deletions.
4 changes: 3 additions & 1 deletion spring-kafka-docs/src/main/asciidoc/retrytopic.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,13 @@ NOTE: You can have separate `ListenerContainerFactory` instances for the main an

==== Configuration

Since 2.9, the `@EnableKafkaRetryTopic` annotation should be used in a `@Configuration` annotated class instead of the regular `@EnableKafka` annotation, which it contains.
Starting with version 2.9, the `@EnableKafkaRetryTopic` annotation should be used in a `@Configuration` annotated class.
This enables the feature to bootstrap properly and gives access to injecting some of the feature's components to be looked up at runtime.
Also, to configure the feature's components and global features, the `RetryTopicConfigurationSupport` class should be extended in a `@Configuration` class, and the appropriate methods overridden.
For more details refer to <<retry-topic-global-settings>>.

NOTE: It is not necessary to also add `@EnableKafka`, if you add this annotation, because `@EnableKafkaRetryTopic` is meta-annotated with `@EnableKafka`.

===== Using the `@RetryableTopic` annotation

To configure the retry topic and dlt for a `@KafkaListener` annotated method, you just have to add the `@RetryableTopic` annotation to it and Spring for Apache Kafka will bootstrap all the necessary topics and consumers with the default configurations.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,13 @@
* }
* </pre>
*
* To configure the feature's components, extend the {@link RetryTopicConfigurationSupport}
* class and override the appropriate methods. Then import the subclass using the
* {@link Import @Import} annotation on a {@link Configuration @Configuration} class,
* such as:
* Using this annotation configures the default {@link RetryTopicConfigurationSupport}
* bean. This annotation is meta-annotated with {@code @EnableKafka} so it is not
* necessary to specify both.
*
* To configure the feature's components, extend the
* {@link RetryTopicConfigurationSupport} class and override the appropriate methods on a
* {@link Configuration @Configuration} class, such as:
*
* <pre class="code">
*
Expand All @@ -74,6 +77,7 @@
* .addToFatalExceptions(ShouldSkipBothRetriesException.class);
* }
* </pre>
* In this case, you should not use this annotation, use {@code @EnableKafka} instead.
*
* @author Tomaz Fernandes
* @since 2.9
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,9 @@
import java.time.Clock;

import org.springframework.beans.factory.BeanFactory;
import org.springframework.core.task.TaskExecutor;
import org.springframework.kafka.listener.DeadLetterPublishingRecoverer;
import org.springframework.kafka.listener.KafkaBackOffManagerFactory;
import org.springframework.kafka.listener.KafkaConsumerBackoffManager;
import org.springframework.kafka.listener.KafkaConsumerTimingAdjuster;
import org.springframework.kafka.listener.ListenerContainerRegistry;
import org.springframework.kafka.listener.MessageListenerContainer;
import org.springframework.kafka.listener.PartitionPausingBackOffManagerFactory;
Expand All @@ -41,7 +39,6 @@
import org.springframework.kafka.retrytopic.RetryTopicConfigurer;
import org.springframework.kafka.retrytopic.RetryTopicNamesProviderFactory;
import org.springframework.kafka.retrytopic.SuffixingRetryTopicNamesProviderFactory;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

/**
* Provide the component instances that will be used with
Expand Down Expand Up @@ -170,15 +167,6 @@ public KafkaBackOffManagerFactory kafkaBackOffManagerFactory(ListenerContainerRe
return new PartitionPausingBackOffManagerFactory(registry);
}

/**
* Create the {@link TaskExecutor} that will be used in the
* {@link KafkaConsumerTimingAdjuster}.
* @return the task executor.
*/
public TaskExecutor taskExecutor() {
return new ThreadPoolTaskExecutor();
}

/**
* Return the {@link Clock} instance that will be used for all
* time-related operations in the retry topic processes.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@
/**
* This is the main class providing the configuration behind the non-blocking,
* topic-based delayed retries feature. It is typically imported by adding
* {@link EnableKafkaRetryTopic @EnableRetryTopic} to an application
* {@link EnableKafkaRetryTopic @EnableKafkaRetryTopic} to an application
* {@link Configuration @Configuration} class. An alternative more advanced option
* is to extend directly from this class and override methods as necessary, remembering
* to add {@link Configuration @Configuration} to the subclass and {@link Bean @Bean}
Expand Down Expand Up @@ -305,11 +305,14 @@ private void configurePartitionPausingFactory(TaskExecutor taskExecutor,
Assert.isTrue(!configurer.timingAdjustmentEnabled
|| configurer.maxThreadPoolSize == null
|| ThreadPoolTaskExecutor.class.isAssignableFrom(taskExecutor.getClass()),
() -> "TaskExecutor must be an instance of ThreadPoolTaskExecutor to set maxThreadPoolSize");
"TaskExecutor must be an instance of ThreadPoolTaskExecutor to set maxThreadPoolSize");
factory.setTimingAdjustmentEnabled(configurer.timingAdjustmentEnabled);
if (ThreadPoolTaskExecutor.class.isAssignableFrom(taskExecutor.getClass())) {
JavaUtils.INSTANCE
.acceptIfNotNull(configurer.maxThreadPoolSize, poolSize -> ((ThreadPoolTaskExecutor) taskExecutor)
.setMaxPoolSize(poolSize));
}
JavaUtils.INSTANCE
.acceptIfNotNull(configurer.maxThreadPoolSize, poolSize -> ((ThreadPoolTaskExecutor) taskExecutor)
.setMaxPoolSize(poolSize))
.acceptIfCondition(configurer.timingAdjustmentEnabled, taskExecutor, factory::setTaskExecutor)
.acceptIfNotNull(configurer.clock, factory::setClock);
}
Expand All @@ -324,7 +327,7 @@ public TaskExecutor backoffManagerTaskExecutor() {
KafkaBackOffManagerConfigurer configurer = new KafkaBackOffManagerConfigurer();
configureKafkaBackOffManager(configurer);
return configurer.timingAdjustmentEnabled
? this.componentFactory.taskExecutor()
? new ThreadPoolTaskExecutor()
: task -> {
};
}
Expand Down Expand Up @@ -445,7 +448,9 @@ public KafkaBackOffManagerConfigurer setClock(Clock clock) {
public static class CustomizersConfigurer {

private Consumer<CommonErrorHandler> errorHandlerCustomizer;

private Consumer<ConcurrentMessageListenerContainer<?, ?>> listenerContainerCustomizer;

private Consumer<DeadLetterPublishingRecoverer> deadLetterPublishingRecovererCustomizer;

/**
Expand Down

1 comment on commit db68ab7

@artembilan
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

Please sign in to comment.