Skip to content

Commit

Permalink
GH-812: Support @AliasFor in @KafkaListener meta
Browse files Browse the repository at this point in the history
Resolves spring-projects/spring-kafka#812

Allow user annotations meta-annotated with `@KafkaListener` to set alias properties.

Also fix exception text when no topic/partition info provided.

* Add docs.
  • Loading branch information
denis554 committed Oct 1, 2018
1 parent 1b6040d commit 0922d0b
Show file tree
Hide file tree
Showing 6 changed files with 182 additions and 3 deletions.
Expand Up @@ -52,6 +52,7 @@
import org.springframework.context.expression.StandardBeanExpressionResolver;
import org.springframework.core.MethodIntrospector;
import org.springframework.core.Ordered;
import org.springframework.core.annotation.AnnotatedElementUtils;
import org.springframework.core.annotation.AnnotationUtils;
import org.springframework.core.convert.converter.Converter;
import org.springframework.core.convert.converter.GenericConverter;
Expand Down Expand Up @@ -319,7 +320,7 @@ private Collection<KafkaListener> findListenerAnnotations(Class<?> clazz) {
*/
private Set<KafkaListener> findListenerAnnotations(Method method) {
Set<KafkaListener> listeners = new HashSet<KafkaListener>();
KafkaListener ann = AnnotationUtils.findAnnotation(method, KafkaListener.class);
KafkaListener ann = AnnotatedElementUtils.findMergedAnnotation(method, KafkaListener.class);
if (ann != null) {
listeners.add(ann);
}
Expand Down
Expand Up @@ -108,9 +108,12 @@ protected AbstractMessageListenerContainer(ConsumerFactory<K, V> consumerFactory
else if (containerProperties.getTopicPattern() != null) {
this.containerProperties = new ContainerProperties(containerProperties.getTopicPattern());
}
else {
else if (containerProperties.getTopicPartitions() != null) {
this.containerProperties = new ContainerProperties(containerProperties.getTopicPartitions());
}
else {
throw new IllegalStateException("topics, topicPattern, or topicPartitions must be provided");
}

BeanUtils.copyProperties(containerProperties, this.containerProperties,
"topics", "topicPartitions", "topicPattern", "ackCount", "ackTime");
Expand Down
Expand Up @@ -221,7 +221,7 @@ public enum AckMode {
private boolean missingTopicsFatal = true;

public ContainerProperties(String... topics) {
Assert.notEmpty(topics, "An array of topicPartitions must be provided");
Assert.notEmpty(topics, "An array of topics must be provided");
this.topics = Arrays.asList(topics).toArray(new String[topics.length]);
this.topicPattern = null;
this.topicPartitions = null;
Expand Down
@@ -0,0 +1,132 @@
/*
* Copyright 2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.kafka.annotation;

import static org.assertj.core.api.Assertions.assertThat;

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.junit.jupiter.api.Test;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.annotation.AliasFor;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.test.EmbeddedKafkaBroker;
import org.springframework.kafka.test.utils.KafkaTestUtils;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;

/**
* @author Gary Russell
* @since 2.2
*
*/
@SpringJUnitConfig
@DirtiesContext
public class AliasPropertiesTests {

@Autowired
private KafkaTemplate<Integer, String> template;

@Autowired
private Config config;

@Test
public void testAliasFor() throws Exception {
this.template.send("alias.tests", "foo");
assertThat(this.config.latch.await(10, TimeUnit.SECONDS)).isTrue();
}

@Configuration
@EnableKafka
public static class Config {

private final CountDownLatch latch = new CountDownLatch(1);

@Bean
public EmbeddedKafkaBroker embeddedKafka() {
return new EmbeddedKafkaBroker(1, true, "alias.tests");
}

@Bean
public KafkaListenerContainerFactory<?> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}

@Bean
public DefaultKafkaConsumerFactory<Integer, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}

@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> consumerProps =
KafkaTestUtils.consumerProps("myAliasGroup", "false", embeddedKafka());
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return consumerProps;
}

@Bean
public KafkaTemplate<Integer, String> template() {
return new KafkaTemplate<>(producerFactory());
}

@Bean
public ProducerFactory<Integer, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}

@Bean
public Map<String, Object> producerConfigs() {
return KafkaTestUtils.producerProps(embeddedKafka());
}

@MyListener("alias.tests")
public void listen1(String in) {
latch.countDown();
}

}

@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@KafkaListener
public @interface MyListener {

@AliasFor(annotation = KafkaListener.class, attribute = "topics")
String[] value();

}

}
39 changes: 39 additions & 0 deletions src/reference/asciidoc/kafka.adoc
Expand Up @@ -1043,6 +1043,45 @@ For the `ConcurrentMessageListenerContainer`, the `<beanName>` part of the threa
`n` increments each time the container is started.
So, with a bean name of `container`, threads in this container will be named `container-0-C-1`, `container-1-C-1` etc., after the container is started the first time; `container-0-C-2`, `container-1-C-2` etc., after a stop/start.

[[kafka-listener-meta]]
===== @KafkaListener as a Meta Annotation

Starting with version 2.2, you can now use `@KafkaListener` as a meta annotation.
For example:

====
[source, java]
----
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@KafkaListener
public @interface MyThreeConsumersListener {
@AliasFor(annotation = KafkaListener.class, attribute = "id")
String id();
@AliasFor(annotation = KafkaListener.class, attribute = "topics")
String[] topics();
@AliasFor(annotation = KafkaListener.class, attribute = "concurrency")
String concurrency() default "3";
}
----
====

You must alias at least one of `topics`, `topicPattern`, or `topicPartitions` (and, usually, `id` or `groupId` unless you have specified a `group.id` in the consumer factory configuration).

====
[source, java]
----
@MyThreeConsumersListener(id = "my.group", topics = "my.topic")
public void listen1(String in) {
...
}
----
====

[[class-level-kafkalistener]]
===== @KafkaListener on a Class

Expand Down
4 changes: 4 additions & 0 deletions src/reference/asciidoc/whats-new.adoc
Expand Up @@ -46,6 +46,10 @@ You can now override the `concurrency` and `autoStartup` properties of the liste

See <<kafka-listener-annotation>> for more information.

You can now use `@KafkaListener` as a meta-annotation on your own annotations.

See <<kafka-listener-meta>> for more information.

==== Header Mapping Changes

Headers of type `MimeType` and `MediaType` are now mapped as simple strings in the `RecordHeader` value.
Expand Down

0 comments on commit 0922d0b

Please sign in to comment.