Skip to content

Commit

Permalink
spring-atticGH-204: DSL Register container as bean if needed
Browse files Browse the repository at this point in the history
Fixes spring-attic#204

When an external container is provided to the DSL, register it as a bean
if it is not already a bean.
  • Loading branch information
garyrussell committed May 14, 2018
1 parent 047d88f commit 43d022b
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 9 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,4 @@ target
/bin/
/classes/
/out/
.DS_Store
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2017 the original author or authors.
* Copyright 2016-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -46,11 +46,15 @@
* @since 3.0
*/
public class KafkaMessageDrivenChannelAdapterSpec<K, V, S extends KafkaMessageDrivenChannelAdapterSpec<K, V, S>>
extends MessageProducerSpec<S, KafkaMessageDrivenChannelAdapter<K, V>> {
extends MessageProducerSpec<S, KafkaMessageDrivenChannelAdapter<K, V>>
implements ComponentsRegistration {

private final AbstractMessageListenerContainer<K, V> container;

KafkaMessageDrivenChannelAdapterSpec(AbstractMessageListenerContainer<K, V> messageListenerContainer,
KafkaMessageDrivenChannelAdapter.ListenerMode listenerMode) {
super(new KafkaMessageDrivenChannelAdapter<>(messageListenerContainer, listenerMode));
this.container = messageListenerContainer;
}

/**
Expand Down Expand Up @@ -152,6 +156,11 @@ public S filterInRetry(boolean filterInRetry) {
return _this();
}

@Override
public Map<Object, String> getComponentsToRegister() {
return Collections.singletonMap(this.container, getId());
}

/**
* A {@link ConcurrentMessageListenerContainer} configuration {@link KafkaMessageDrivenChannelAdapterSpec}
* extension.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,14 @@
import org.springframework.integration.support.MessageBuilder;
import org.springframework.integration.test.util.TestUtils;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.listener.ContainerProperties.AckMode;
import org.springframework.kafka.listener.GenericMessageListenerContainer;
import org.springframework.kafka.listener.KafkaMessageListenerContainer;
import org.springframework.kafka.listener.MessageListenerContainer;
Expand Down Expand Up @@ -259,16 +261,23 @@ public IntegrationFlow topic1ListenerFromKafkaFlow() {
.get();
}

@Bean
public ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.getContainerProperties().setAckMode(AckMode.MANUAL);
factory.setRecoveryCallback(new ErrorMessageSendingRecoverer(errorChannel(),
new RawRecordHeaderErrorMessageStrategy()));
factory.setRetryTemplate(new RetryTemplate());
return factory;
}

@Bean
public IntegrationFlow topic2ListenerFromKafkaFlow() {
return IntegrationFlows
.from(Kafka.messageDrivenChannelAdapter(consumerFactory(),
KafkaMessageDrivenChannelAdapter.ListenerMode.record, TEST_TOPIC2)
.configureListenerContainer(c ->
c.ackMode(ContainerProperties.AckMode.MANUAL))
.recoveryCallback(new ErrorMessageSendingRecoverer(errorChannel(),
new RawRecordHeaderErrorMessageStrategy()))
.retryTemplate(new RetryTemplate())
.from(Kafka.messageDrivenChannelAdapter(kafkaListenerContainerFactory().createContainer(TEST_TOPIC2),
KafkaMessageDrivenChannelAdapter.ListenerMode.record)
.filterInRetry(true))
.filter(Message.class, m ->
m.getHeaders().get(KafkaHeaders.RECEIVED_MESSAGE_KEY, Integer.class) < 101,
Expand Down

0 comments on commit 43d022b

Please sign in to comment.