Skip to content

Commit

Permalink
Merge pull request #129 from liuliuzo/master
Browse files Browse the repository at this point in the history
[ISSUE #103]Support resolvePlaceholders for selectorExpression
  • Loading branch information
duhenglucky committed Oct 31, 2019
2 parents 908ea48 + 24ac48d commit 71a1e9c
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,9 @@ private void registerContainer(String beanName, Object bean) {

private DefaultRocketMQListenerContainer createRocketMQListenerContainer(String name, Object bean, RocketMQMessageListener annotation) {
DefaultRocketMQListenerContainer container = new DefaultRocketMQListenerContainer();


container.setRocketMQMessageListener(annotation);

String nameServer = environment.resolvePlaceholders(annotation.nameServer());
nameServer = StringUtils.isEmpty(nameServer) ? rocketMQProperties.getNameServer() : nameServer;
String accessChannel = environment.resolvePlaceholders(annotation.accessChannel());
Expand All @@ -135,8 +137,11 @@ private DefaultRocketMQListenerContainer createRocketMQListenerContainer(String
container.setAccessChannel(AccessChannel.valueOf(accessChannel));
}
container.setTopic(environment.resolvePlaceholders(annotation.topic()));
String tags = environment.resolvePlaceholders(annotation.selectorExpression());
if (!StringUtils.isEmpty(tags)) {
container.setSelectorExpression(tags);
}
container.setConsumerGroup(environment.resolvePlaceholders(annotation.consumerGroup()));
container.setRocketMQMessageListener(annotation);
container.setRocketMQListener((RocketMQListener) bean);
container.setObjectMapper(objectMapper);
container.setName(name); // REVIEW ME, use the same clientId or multiple?
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,10 @@ public SelectorType getSelectorType() {
return selectorType;
}

public void setSelectorExpression(String selectorExpression) {
this.selectorExpression = selectorExpression;
}

public String getSelectorExpression() {
return selectorExpression;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ public void testRocketMQListenerWithSeveralObjectMappers() {
@Test
public void testExtRocketMQTemplate() {
runner.withPropertyValues("rocketmq.name-server=127.0.0.1:9876").
withUserConfiguration(ExtRocketMQTemplateConfig.class, CustomObjectMappersConfig.class).
withUserConfiguration(TestExtRocketMQTemplateConfig.class, CustomObjectMappersConfig.class).
run(new ContextConsumer<AssertableApplicationContext>() {
@Override
public void accept(AssertableApplicationContext context) throws Throwable {
Expand All @@ -141,7 +141,7 @@ public void accept(AssertableApplicationContext context) throws Throwable {
});

runner.withPropertyValues("rocketmq.name-server=127.0.1.1:9876").
withUserConfiguration(ExtRocketMQTemplateConfig.class, CustomObjectMappersConfig.class).
withUserConfiguration(TestExtRocketMQTemplateConfig.class, CustomObjectMappersConfig.class).
run((context) -> {
// No producer on consume side
assertThat(context).getBean("extRocketMQTemplate").hasFieldOrProperty("producer");
Expand Down Expand Up @@ -170,23 +170,44 @@ public void testRocketMQTransactionListener() {
"demo.rocketmq.transaction.producer.group=transaction-group1").
withUserConfiguration(TestTransactionListenerConfig.class).
run((context) -> {
assertThat(context).hasSingleBean(MyRocketMQLocalTransactionListener.class);
assertThat(context).hasSingleBean(TestRocketMQLocalTransactionListener.class);

});

}

@Test
public void testPlaceholdersListenerContainer() {
runner.withPropertyValues("rocketmq.name-server=127.0.0.1:9876",
"demo.placeholders.consumer.group = abc3",
"demo.placeholders.consumer.topic = test",
"demo.placeholders.consumer.tags = tag1").
withUserConfiguration(TestPlaceholdersConfig.class).
run((context) -> {
// No producer on consume side
assertThat(context).doesNotHaveBean(DefaultMQProducer.class);
// Auto-create consume container if existing Bean annotated with @RocketMQMessageListener
assertThat(context).hasBean("org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer_1");
assertThat(context).getBean("org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer_1").
hasFieldOrPropertyWithValue("nameServer", "127.0.0.1:9876").
hasFieldOrPropertyWithValue("consumerGroup", "abc3").
hasFieldOrPropertyWithValue("topic", "test").
hasFieldOrPropertyWithValue("selectorExpression", "tag1");
});
}


@Configuration
static class TestConfig {

@Bean
public Object consumeListener() {
return new MyMessageListener();
return new TestDefaultNameServerListener();
}

@Bean
public Object consumeListener1() {
return new MyMessageListener1();
return new TestCustomNameServerListener();
}

}
Expand Down Expand Up @@ -217,7 +238,7 @@ public ObjectMapper rocketMQMessageObjectMapper() {
}

@RocketMQMessageListener(consumerGroup = "abc", topic = "test")
static class MyMessageListener implements RocketMQListener {
static class TestDefaultNameServerListener implements RocketMQListener {

@Override
public void onMessage(Object message) {
Expand All @@ -226,7 +247,7 @@ public void onMessage(Object message) {
}

@RocketMQMessageListener(nameServer = "127.0.1.1:9876", consumerGroup = "abc1", topic = "test")
static class MyMessageListener1 implements RocketMQListener {
static class TestCustomNameServerListener implements RocketMQListener {

@Override
public void onMessage(Object message) {
Expand All @@ -238,13 +259,13 @@ public void onMessage(Object message) {
static class TestTransactionListenerConfig {
@Bean
public Object rocketMQLocalTransactionListener() {
return new MyRocketMQLocalTransactionListener();
return new TestRocketMQLocalTransactionListener();
}

}

@RocketMQTransactionListener(txProducerGroup = "${demo.rocketmq.transaction.producer.group}")
static class MyRocketMQLocalTransactionListener implements RocketMQLocalTransactionListener {
static class TestRocketMQLocalTransactionListener implements RocketMQLocalTransactionListener {


@Override
Expand All @@ -259,18 +280,37 @@ public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
}

@Configuration
static class ExtRocketMQTemplateConfig {
static class TestExtRocketMQTemplateConfig {

@Bean
public RocketMQTemplate extRocketMQTemplate() {
return new MyExtRocketMQTemplate();
return new TestExtRocketMQTemplate();
}

}

@ExtRocketMQTemplateConfiguration(group = "test", nameServer = "127.0.0.1:9876")
static class MyExtRocketMQTemplate extends RocketMQTemplate {
static class TestExtRocketMQTemplate extends RocketMQTemplate {

}

@Configuration
static class TestPlaceholdersConfig {

@Bean
public Object consumeListener() {
return new TestPlaceholdersListener();
}

}

@RocketMQMessageListener(consumerGroup = "${demo.placeholders.consumer.group}", topic = "${demo.placeholders.consumer.topic}", selectorExpression = "${demo.placeholders.consumer.tags}")
static class TestPlaceholdersListener implements RocketMQListener {

@Override
public void onMessage(Object message) {

}
}
}

0 comments on commit 71a1e9c

Please sign in to comment.