Skip to content

Commit

Permalink
spring-projectsGH-204: DSL Register container as bean if needed
Browse files Browse the repository at this point in the history
Fixes spring-attic/spring-integration-kafka#204

When an external container is provided to the DSL, register it as a bean
if it is not already a bean.

Polishing id from PR comments; add gateway support too.

* Polishing JavaDocs and omissions in the test-case
  • Loading branch information
garyrussell authored and artembilan committed May 15, 2018
1 parent 7520330 commit 9bee534
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,11 @@ public static <K, V> KafkaInboundChannelAdapterSpec<K, V> inboundChannelAdapter(
}

/**
* Create an initial {@link KafkaMessageDrivenChannelAdapterSpec}.
* Create an initial {@link KafkaMessageDrivenChannelAdapterSpec}. If the listener
* container is not already a bean it will be registered in the application context.
* If the adapter spec has an {@code id}, the bean name will be that id appended with
* '.container'. Otherwise, the bean name will be generated from the container class
* name.
* @param listenerContainer the {@link AbstractMessageListenerContainer}.
* @param <K> the Kafka message key type.
* @param <V> the Kafka message value type.
Expand All @@ -117,7 +121,11 @@ public static <K, V> KafkaInboundChannelAdapterSpec<K, V> inboundChannelAdapter(
}

/**
* Create an initial {@link KafkaMessageDrivenChannelAdapterSpec}.
* Create an initial {@link KafkaMessageDrivenChannelAdapterSpec}. If the listener
* container is not already a bean it will be registered in the application context.
* If the adapter spec has an {@code id}, the bean name will be that id appended with
* '.container'. Otherwise, the bean name will be generated from the container class
* name.
* @param listenerContainer the {@link AbstractMessageListenerContainer}.
* @param listenerMode the {@link KafkaMessageDrivenChannelAdapter.ListenerMode}.
* @param <K> the Kafka message key type.
Expand Down Expand Up @@ -316,7 +324,10 @@ public static <K, V, R> KafkaOutboundGatewaySpec.KafkaGatewayMessageHandlerTempl

/**
* Create an initial {@link KafkaInboundGatewaySpec} with the provided container and
* template.
* template. If the listener container is not already a bean it will be registered in
* the application context. If the adapter spec has an {@code id}, the bean name will
* be that id appended with '.container'. Otherwise, the bean name will be generated
* from the container class name.
* @param container the container.
* @param template the template.
* @param <K> the Kafka message key type.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package org.springframework.integration.kafka.dsl;

import java.util.Collections;
import java.util.Map;
import java.util.function.Consumer;

Expand Down Expand Up @@ -44,12 +45,16 @@
* @since 3.0.2
*/
public class KafkaInboundGatewaySpec<K, V, R, S extends KafkaInboundGatewaySpec<K, V, R, S>>
extends MessagingGatewaySpec<S, KafkaInboundGateway<K, V, R>> {
extends MessagingGatewaySpec<S, KafkaInboundGateway<K, V, R>>
implements ComponentsRegistration {

private final AbstractMessageListenerContainer<K, V> container;

KafkaInboundGatewaySpec(AbstractMessageListenerContainer<K, V> messageListenerContainer,
KafkaTemplate<K, R> kafkaTemplate) {

super(new KafkaInboundGateway<>(messageListenerContainer, kafkaTemplate));
this.container = messageListenerContainer;
}

/**
Expand Down Expand Up @@ -86,6 +91,11 @@ public S recoveryCallback(RecoveryCallback<? extends Object> recoveryCallback) {
return _this();
}

@Override
public Map<Object, String> getComponentsToRegister() {
return Collections.singletonMap(this.container, getId() == null ? null : getId() + ".container");
}

/**
* A {@link ConcurrentMessageListenerContainer} configuration {@link KafkaInboundGatewaySpec}
* extension.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2017 the original author or authors.
* Copyright 2016-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -46,11 +46,15 @@
* @since 3.0
*/
public class KafkaMessageDrivenChannelAdapterSpec<K, V, S extends KafkaMessageDrivenChannelAdapterSpec<K, V, S>>
extends MessageProducerSpec<S, KafkaMessageDrivenChannelAdapter<K, V>> {
extends MessageProducerSpec<S, KafkaMessageDrivenChannelAdapter<K, V>>
implements ComponentsRegistration {

private final AbstractMessageListenerContainer<K, V> container;

KafkaMessageDrivenChannelAdapterSpec(AbstractMessageListenerContainer<K, V> messageListenerContainer,
KafkaMessageDrivenChannelAdapter.ListenerMode listenerMode) {
super(new KafkaMessageDrivenChannelAdapter<>(messageListenerContainer, listenerMode));
this.container = messageListenerContainer;
}

/**
Expand Down Expand Up @@ -152,6 +156,11 @@ public S filterInRetry(boolean filterInRetry) {
return _this();
}

@Override
public Map<Object, String> getComponentsToRegister() {
return Collections.singletonMap(this.container, getId() == null ? null : getId() + ".container");
}

/**
* A {@link ConcurrentMessageListenerContainer} configuration {@link KafkaMessageDrivenChannelAdapterSpec}
* extension.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,14 @@
import org.springframework.integration.support.MessageBuilder;
import org.springframework.integration.test.util.TestUtils;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.listener.ContainerProperties.AckMode;
import org.springframework.kafka.listener.GenericMessageListenerContainer;
import org.springframework.kafka.listener.KafkaMessageListenerContainer;
import org.springframework.kafka.listener.MessageListenerContainer;
Expand Down Expand Up @@ -100,8 +102,8 @@ public class KafkaDslTests {
private static final String TEST_TOPIC5 = "test-topic5";

@ClassRule
public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, TEST_TOPIC1, TEST_TOPIC2, TEST_TOPIC3,
TEST_TOPIC4, TEST_TOPIC5);
public static KafkaEmbedded embeddedKafka =
new KafkaEmbedded(1, true, TEST_TOPIC1, TEST_TOPIC2, TEST_TOPIC3, TEST_TOPIC4, TEST_TOPIC5);

@Autowired
@Qualifier("sendToKafkaFlow.input")
Expand Down Expand Up @@ -212,7 +214,7 @@ public void testKafkaAdapters() throws Exception {

@Test
public void testGateways() throws Exception {
assertThat(this.config.replyContainerLatch.await(30, TimeUnit.SECONDS));
assertThat(this.config.replyContainerLatch.await(30, TimeUnit.SECONDS)).isTrue();
assertThat(this.gate.exchange(TEST_TOPIC4, "foo")).isEqualTo("FOO");
}

Expand Down Expand Up @@ -259,16 +261,23 @@ public IntegrationFlow topic1ListenerFromKafkaFlow() {
.get();
}

@Bean
public ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.getContainerProperties().setAckMode(AckMode.MANUAL);
factory.setRecoveryCallback(new ErrorMessageSendingRecoverer(errorChannel(),
new RawRecordHeaderErrorMessageStrategy()));
factory.setRetryTemplate(new RetryTemplate());
return factory;
}

@Bean
public IntegrationFlow topic2ListenerFromKafkaFlow() {
return IntegrationFlows
.from(Kafka.messageDrivenChannelAdapter(consumerFactory(),
KafkaMessageDrivenChannelAdapter.ListenerMode.record, TEST_TOPIC2)
.configureListenerContainer(c ->
c.ackMode(ContainerProperties.AckMode.MANUAL))
.recoveryCallback(new ErrorMessageSendingRecoverer(errorChannel(),
new RawRecordHeaderErrorMessageStrategy()))
.retryTemplate(new RetryTemplate())
.from(Kafka.messageDrivenChannelAdapter(kafkaListenerContainerFactory().createContainer(TEST_TOPIC2),
KafkaMessageDrivenChannelAdapter.ListenerMode.record)
.filterInRetry(true))
.filter(Message.class, m ->
m.getHeaders().get(KafkaHeaders.RECEIVED_MESSAGE_KEY, Integer.class) < 101,
Expand Down Expand Up @@ -380,4 +389,5 @@ public interface Gate {
String exchange(@Header(KafkaHeaders.TOPIC) String topic, String out);

}

}

0 comments on commit 9bee534

Please sign in to comment.