diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ListenerContainerConfiguration.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ListenerContainerConfiguration.java index 5d461bcf..927bf44c 100644 --- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ListenerContainerConfiguration.java +++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ListenerContainerConfiguration.java @@ -126,7 +126,9 @@ private void registerContainer(String beanName, Object bean) { private DefaultRocketMQListenerContainer createRocketMQListenerContainer(String name, Object bean, RocketMQMessageListener annotation) { DefaultRocketMQListenerContainer container = new DefaultRocketMQListenerContainer(); - + + container.setRocketMQMessageListener(annotation); + String nameServer = environment.resolvePlaceholders(annotation.nameServer()); nameServer = StringUtils.isEmpty(nameServer) ? rocketMQProperties.getNameServer() : nameServer; String accessChannel = environment.resolvePlaceholders(annotation.accessChannel()); @@ -135,8 +137,11 @@ private DefaultRocketMQListenerContainer createRocketMQListenerContainer(String container.setAccessChannel(AccessChannel.valueOf(accessChannel)); } container.setTopic(environment.resolvePlaceholders(annotation.topic())); + String tags = environment.resolvePlaceholders(annotation.selectorExpression()); + if (!StringUtils.isEmpty(tags)) { + container.setSelectorExpression(tags); + } container.setConsumerGroup(environment.resolvePlaceholders(annotation.consumerGroup())); - container.setRocketMQMessageListener(annotation); container.setRocketMQListener((RocketMQListener) bean); container.setObjectMapper(objectMapper); container.setName(name); // REVIEW ME, use the same clientId or multiple? diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java index c4b5d197..497d94ba 100644 --- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java +++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java @@ -204,6 +204,10 @@ public SelectorType getSelectorType() { return selectorType; } + public void setSelectorExpression(String selectorExpression) { + this.selectorExpression = selectorExpression; + } + public String getSelectorExpression() { return selectorExpression; } diff --git a/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/autoconfigure/RocketMQAutoConfigurationTest.java b/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/autoconfigure/RocketMQAutoConfigurationTest.java index 18fdcca0..9fa0fa41 100644 --- a/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/autoconfigure/RocketMQAutoConfigurationTest.java +++ b/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/autoconfigure/RocketMQAutoConfigurationTest.java @@ -130,7 +130,7 @@ public void testRocketMQListenerWithSeveralObjectMappers() { @Test public void testExtRocketMQTemplate() { runner.withPropertyValues("rocketmq.name-server=127.0.0.1:9876"). - withUserConfiguration(ExtRocketMQTemplateConfig.class, CustomObjectMappersConfig.class). + withUserConfiguration(TestExtRocketMQTemplateConfig.class, CustomObjectMappersConfig.class). run(new ContextConsumer() { @Override public void accept(AssertableApplicationContext context) throws Throwable { @@ -141,7 +141,7 @@ public void accept(AssertableApplicationContext context) throws Throwable { }); runner.withPropertyValues("rocketmq.name-server=127.0.1.1:9876"). - withUserConfiguration(ExtRocketMQTemplateConfig.class, CustomObjectMappersConfig.class). + withUserConfiguration(TestExtRocketMQTemplateConfig.class, CustomObjectMappersConfig.class). run((context) -> { // No producer on consume side assertThat(context).getBean("extRocketMQTemplate").hasFieldOrProperty("producer"); @@ -170,23 +170,44 @@ public void testRocketMQTransactionListener() { "demo.rocketmq.transaction.producer.group=transaction-group1"). withUserConfiguration(TestTransactionListenerConfig.class). run((context) -> { - assertThat(context).hasSingleBean(MyRocketMQLocalTransactionListener.class); + assertThat(context).hasSingleBean(TestRocketMQLocalTransactionListener.class); }); } + @Test + public void testPlaceholdersListenerContainer() { + runner.withPropertyValues("rocketmq.name-server=127.0.0.1:9876", + "demo.placeholders.consumer.group = abc3", + "demo.placeholders.consumer.topic = test", + "demo.placeholders.consumer.tags = tag1"). + withUserConfiguration(TestPlaceholdersConfig.class). + run((context) -> { + // No producer on consume side + assertThat(context).doesNotHaveBean(DefaultMQProducer.class); + // Auto-create consume container if existing Bean annotated with @RocketMQMessageListener + assertThat(context).hasBean("org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer_1"); + assertThat(context).getBean("org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer_1"). + hasFieldOrPropertyWithValue("nameServer", "127.0.0.1:9876"). + hasFieldOrPropertyWithValue("consumerGroup", "abc3"). + hasFieldOrPropertyWithValue("topic", "test"). + hasFieldOrPropertyWithValue("selectorExpression", "tag1"); + }); + } + + @Configuration static class TestConfig { @Bean public Object consumeListener() { - return new MyMessageListener(); + return new TestDefaultNameServerListener(); } @Bean public Object consumeListener1() { - return new MyMessageListener1(); + return new TestCustomNameServerListener(); } } @@ -217,7 +238,7 @@ public ObjectMapper rocketMQMessageObjectMapper() { } @RocketMQMessageListener(consumerGroup = "abc", topic = "test") - static class MyMessageListener implements RocketMQListener { + static class TestDefaultNameServerListener implements RocketMQListener { @Override public void onMessage(Object message) { @@ -226,7 +247,7 @@ public void onMessage(Object message) { } @RocketMQMessageListener(nameServer = "127.0.1.1:9876", consumerGroup = "abc1", topic = "test") - static class MyMessageListener1 implements RocketMQListener { + static class TestCustomNameServerListener implements RocketMQListener { @Override public void onMessage(Object message) { @@ -238,13 +259,13 @@ public void onMessage(Object message) { static class TestTransactionListenerConfig { @Bean public Object rocketMQLocalTransactionListener() { - return new MyRocketMQLocalTransactionListener(); + return new TestRocketMQLocalTransactionListener(); } } @RocketMQTransactionListener(txProducerGroup = "${demo.rocketmq.transaction.producer.group}") - static class MyRocketMQLocalTransactionListener implements RocketMQLocalTransactionListener { + static class TestRocketMQLocalTransactionListener implements RocketMQLocalTransactionListener { @Override @@ -259,18 +280,37 @@ public RocketMQLocalTransactionState checkLocalTransaction(Message msg) { } @Configuration - static class ExtRocketMQTemplateConfig { + static class TestExtRocketMQTemplateConfig { @Bean public RocketMQTemplate extRocketMQTemplate() { - return new MyExtRocketMQTemplate(); + return new TestExtRocketMQTemplate(); } } @ExtRocketMQTemplateConfiguration(group = "test", nameServer = "127.0.0.1:9876") - static class MyExtRocketMQTemplate extends RocketMQTemplate { + static class TestExtRocketMQTemplate extends RocketMQTemplate { + + } + @Configuration + static class TestPlaceholdersConfig { + + @Bean + public Object consumeListener() { + return new TestPlaceholdersListener(); + } + + } + + @RocketMQMessageListener(consumerGroup = "${demo.placeholders.consumer.group}", topic = "${demo.placeholders.consumer.topic}", selectorExpression = "${demo.placeholders.consumer.tags}") + static class TestPlaceholdersListener implements RocketMQListener { + + @Override + public void onMessage(Object message) { + + } } }