Skip to content

Commit

Permalink
spring-projectsGH-2825: Pre-review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
frosiere committed Oct 16, 2023
1 parent 40f8732 commit 0b30542
Show file tree
Hide file tree
Showing 7 changed files with 52 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,18 @@ public KafkaListenerContainerFactory<?> kafkaListenerContainerFactory() {
}
----

Starting with version 3.1.0, it's also possible to apply the same kind of customization on a single listener by specifying the bean name of a
'ContainerPostProcessor' on the KafkaListener annotation.

[source, java]
----
@Bean
public ContainerPostProcessor<String, String, AbstractMessageListenerContainer<String, String>> customContainerPostProcessor() {
return container -> { /* customize the container */ };
}
...
@KafkaListener(... containerPostProcessor="customContainerPostProcessor" ...)
----

Original file line number Diff line number Diff line change
Expand Up @@ -327,9 +327,12 @@
String info() default "";

/**
* Set the bean name of a {@link org.springframework.kafka.config.ContainerPostProcessor} to allow customizing the
* container after its creation and configuration. This post processor is only applied on the current listener container
* in contrast to the {@link org.springframework.kafka.config.ContainerCustomizer} which is applied on all listener containers.
* Set the bean name of a {@link org.springframework.kafka.config.ContainerPostProcessor}
* to allow customizing the container after its creation and configuration. This post
* processor is only applied on the current listener container in contrast to the
* {@link org.springframework.kafka.config.ContainerCustomizer} which is applied on all
* listener containers.
*
* @return the bean name of the container post processor.
* @since 3.1.0
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -742,10 +742,12 @@ private KafkaListenerContainerFactory<?> resolveContainerFactory(KafkaListener k
return factory;
}

private void resolveContainerPostProcessor(MethodKafkaListenerEndpoint<?, ?> endpoint, KafkaListener kafkaListener) {
private void resolveContainerPostProcessor(MethodKafkaListenerEndpoint<?, ?> endpoint,
KafkaListener kafkaListener) {
final String containerPostProcessor = kafkaListener.containerPostProcessor();
if (StringUtils.hasText(containerPostProcessor)) {
endpoint.setContainerPostProcessor(this.beanFactory.getBean(containerPostProcessor, ContainerPostProcessor.class));
endpoint.setContainerPostProcessor(this.beanFactory.getBean(containerPostProcessor,
ContainerPostProcessor.class));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -487,7 +487,8 @@ protected void customizeContainer(C instance, KafkaListenerEndpoint endpoint) {
if (this.containerCustomizer != null) {
this.containerCustomizer.configure(instance);
}
final ContainerPostProcessor<K, V, C> containerPostProcessor = (ContainerPostProcessor<K, V, C>) endpoint.getContainerPostProcessor();
final ContainerPostProcessor<K, V, C> containerPostProcessor = (ContainerPostProcessor<K, V, C>)
endpoint.getContainerPostProcessor();
if (containerPostProcessor != null) {
containerPostProcessor.postProcess(instance);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,12 @@
import org.springframework.kafka.listener.AbstractMessageListenerContainer;

/**
* Called by the container factory after the container is created and configured. This post processor is applied
* on the listener through the {@link KafkaListener#containerPostProcessor()} attribute.
* Called by the container factory after the container is created and configured. This
* post processor is applied on the listener through the
* {@link KafkaListener#containerPostProcessor()} attribute.
* <p>
* A {@link ContainerCustomizer} can be used when customization must be applied to all containers.
* A {@link ContainerCustomizer} can be used when customization must be applied to all
* containers.
*
* @param <K> the key type.
* @param <V> the value type.
Expand All @@ -34,7 +36,6 @@
*
* @see ContainerCustomizer
* @see KafkaListener
*
*/
@FunctionalInterface
public interface ContainerPostProcessor<K, V, C extends AbstractMessageListenerContainer<K, V>> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,8 @@ default Boolean getBatchListener() {
}

/**
* Return the {@link ContainerPostProcessor} for this endpoint, or null if not explicitly set.
* Return the {@link ContainerPostProcessor} for this endpoint, or null if not
* explicitly set.
* @return the container post processor.
* @since 3.1.0
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,17 +56,22 @@ class ContainerCustomizationTest {
private static final String CONTAINER_CUSTOMIZER = "container-customizer";
private static final String POST_PROCESSOR = "post-processor";
private static final String POST_PROCESSOR_MULTI_METHOD = "post-processor-multi-method";
private static final String CONTAINER_CUSTOMIZER_AND_POST_PROCESSOR = "container-customizer-and-post-processor";
private static final String CONTAINER_CUSTOMIZER_AND_POST_PROCESSOR = "container-customizer" +
"-and-post-processor";

@Autowired
private ListenerContainerRegistry listenerContainerRegistry;

private static Stream<Arguments> listenerIdsWithRelatedInfo() {
return Stream.of(
Arguments.of(DEFAULT_LISTENER, null),
Arguments.of(CONTAINER_CUSTOMIZER, CONTAINER_CUSTOMIZER.getBytes(StandardCharsets.UTF_8)),
Arguments.of(CONTAINER_CUSTOMIZER,
CONTAINER_CUSTOMIZER.getBytes(StandardCharsets.UTF_8)),
Arguments.of(POST_PROCESSOR, POST_PROCESSOR.getBytes(StandardCharsets.UTF_8)),
Arguments.of(POST_PROCESSOR_MULTI_METHOD, POST_PROCESSOR.getBytes(StandardCharsets.UTF_8)),
Arguments.of(CONTAINER_CUSTOMIZER_AND_POST_PROCESSOR, POST_PROCESSOR.getBytes(StandardCharsets.UTF_8)));
Arguments.of(POST_PROCESSOR_MULTI_METHOD,
POST_PROCESSOR.getBytes(StandardCharsets.UTF_8)),
Arguments.of(CONTAINER_CUSTOMIZER_AND_POST_PROCESSOR,
POST_PROCESSOR.getBytes(StandardCharsets.UTF_8)));
}

@ParameterizedTest
Expand All @@ -77,7 +82,10 @@ void testCustomization(String listenerId, byte[] listenerInfo) {
assertThat(listenerContainer.getListenerInfo()).isEqualTo(listenerInfo);
}

@KafkaListener(id = POST_PROCESSOR_MULTI_METHOD, topics = TOPIC, containerPostProcessor = "infoContainerPostProcessor")
@KafkaListener(id = POST_PROCESSOR_MULTI_METHOD,
topics = TOPIC,
containerPostProcessor = "infoContainerPostProcessor"
)
static class MultiMethodListener {

@KafkaHandler
Expand Down Expand Up @@ -127,7 +135,8 @@ public MultiMethodListener multiMethodListener() {
}

@Bean
public ContainerPostProcessor<String, String, AbstractMessageListenerContainer<String, String>> infoContainerPostProcessor() {
public ContainerPostProcessor<String, String, AbstractMessageListenerContainer<String,
String>> infoContainerPostProcessor() {
return container -> container.setListenerInfo(POST_PROCESSOR.getBytes(StandardCharsets.UTF_8));
}

Expand All @@ -146,7 +155,9 @@ public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerCont
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> containerFactoryWithCustomizer() {
final var containerFactory = createKafkaListenerContainerFactory();
containerFactory.setContainerCustomizer(container -> container.setListenerInfo(CONTAINER_CUSTOMIZER.getBytes(StandardCharsets.UTF_8)));
containerFactory.setContainerCustomizer(container ->
container.setListenerInfo(CONTAINER_CUSTOMIZER.getBytes(StandardCharsets.UTF_8))
);
return containerFactory;
}

Expand Down

0 comments on commit 0b30542

Please sign in to comment.