diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ExtProducerResetConfiguration.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ExtProducerResetConfiguration.java index e5e74336..9ea7699b 100644 --- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ExtProducerResetConfiguration.java +++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ExtProducerResetConfiguration.java @@ -25,6 +25,7 @@ import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.apache.rocketmq.spring.support.RocketMQMessageConverter; import org.apache.rocketmq.spring.support.RocketMQUtil; +import org.apache.rocketmq.spring.support.SpringBeanUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.aop.framework.AopProxyUtils; @@ -52,7 +53,7 @@ public class ExtProducerResetConfiguration implements ApplicationContextAware, S private RocketMQMessageConverter rocketMQMessageConverter; public ExtProducerResetConfiguration(RocketMQMessageConverter rocketMQMessageConverter, - StandardEnvironment environment, RocketMQProperties rocketMQProperties) { + StandardEnvironment environment, RocketMQProperties rocketMQProperties) { this.rocketMQMessageConverter = rocketMQMessageConverter; this.environment = environment; this.rocketMQProperties = rocketMQProperties; @@ -60,12 +61,12 @@ public ExtProducerResetConfiguration(RocketMQMessageConverter rocketMQMessageCon @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { - this.applicationContext = (ConfigurableApplicationContext)applicationContext; + this.applicationContext = (ConfigurableApplicationContext) applicationContext; } @Override public void afterSingletonsInstantiated() { - Map beans = this.applicationContext.getBeansWithAnnotation(ExtRocketMQTemplateConfiguration.class); + Map beans = SpringBeanUtil.getBeansWithAnnotation(this.applicationContext, ExtRocketMQTemplateConfiguration.class); if (Objects.nonNull(beans)) { beans.forEach(this::registerTemplate); @@ -80,7 +81,7 @@ private void registerTemplate(String beanName, Object bean) { } ExtRocketMQTemplateConfiguration annotation = clazz.getAnnotation(ExtRocketMQTemplateConfiguration.class); - GenericApplicationContext genericApplicationContext = (GenericApplicationContext)applicationContext; + GenericApplicationContext genericApplicationContext = (GenericApplicationContext) applicationContext; validate(annotation, genericApplicationContext); DefaultMQProducer mqProducer = createProducer(annotation); @@ -92,7 +93,7 @@ private void registerTemplate(String beanName, Object bean) { throw new BeanDefinitionValidationException(String.format("Failed to startup MQProducer for RocketMQTemplate {}", beanName), e); } - RocketMQTemplate rocketMQTemplate = (RocketMQTemplate)bean; + RocketMQTemplate rocketMQTemplate = (RocketMQTemplate) bean; rocketMQTemplate.setProducer(mqProducer); rocketMQTemplate.setMessageConverter(rocketMQMessageConverter.getMessageConverter()); log.info("Set real producer to :{} {}", beanName, annotation.value()); @@ -130,7 +131,7 @@ private DefaultMQProducer createProducer(ExtRocketMQTemplateConfiguration annota } private void validate(ExtRocketMQTemplateConfiguration annotation, - GenericApplicationContext genericApplicationContext) { + GenericApplicationContext genericApplicationContext) { if (genericApplicationContext.isBeanNameInUse(annotation.value())) { throw new BeanDefinitionValidationException(String.format("Bean {} has been used in Spring Application Context, " + "please check the @ExtRocketMQTemplateConfiguration", diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ListenerContainerConfiguration.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ListenerContainerConfiguration.java index 699474de..008a4dbe 100644 --- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ListenerContainerConfiguration.java +++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ListenerContainerConfiguration.java @@ -17,10 +17,6 @@ package org.apache.rocketmq.spring.autoconfigure; -import java.util.Collections; -import java.util.Map; -import java.util.Objects; -import java.util.concurrent.atomic.AtomicLong; import org.apache.rocketmq.client.AccessChannel; import org.apache.rocketmq.spring.annotation.ConsumeMode; import org.apache.rocketmq.spring.annotation.MessageModel; @@ -29,6 +25,7 @@ import org.apache.rocketmq.spring.core.RocketMQReplyListener; import org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer; import org.apache.rocketmq.spring.support.RocketMQMessageConverter; +import org.apache.rocketmq.spring.support.SpringBeanUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.aop.framework.AopProxyUtils; @@ -43,6 +40,11 @@ import org.springframework.core.env.StandardEnvironment; import org.springframework.util.StringUtils; +import java.util.Collections; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.atomic.AtomicLong; + @Configuration public class ListenerContainerConfiguration implements ApplicationContextAware, SmartInitializingSingleton { private final static Logger log = LoggerFactory.getLogger(ListenerContainerConfiguration.class); @@ -58,7 +60,7 @@ public class ListenerContainerConfiguration implements ApplicationContextAware, private RocketMQMessageConverter rocketMQMessageConverter; public ListenerContainerConfiguration(RocketMQMessageConverter rocketMQMessageConverter, - StandardEnvironment environment, RocketMQProperties rocketMQProperties) { + StandardEnvironment environment, RocketMQProperties rocketMQProperties) { this.rocketMQMessageConverter = rocketMQMessageConverter; this.environment = environment; this.rocketMQProperties = rocketMQProperties; @@ -71,7 +73,8 @@ public void setApplicationContext(ApplicationContext applicationContext) throws @Override public void afterSingletonsInstantiated() { - Map beans = this.applicationContext.getBeansWithAnnotation(RocketMQMessageListener.class); + Map beans = SpringBeanUtil.getBeansWithAnnotation(this.applicationContext, RocketMQMessageListener. + class); if (Objects.nonNull(beans)) { beans.forEach(this::registerContainer); @@ -127,7 +130,7 @@ private void registerContainer(String beanName, Object bean) { } private DefaultRocketMQListenerContainer createRocketMQListenerContainer(String name, Object bean, - RocketMQMessageListener annotation) { + RocketMQMessageListener annotation) { DefaultRocketMQListenerContainer container = new DefaultRocketMQListenerContainer(); container.setRocketMQMessageListener(annotation); @@ -145,13 +148,15 @@ private DefaultRocketMQListenerContainer createRocketMQListenerContainer(String container.setSelectorExpression(tags); } container.setConsumerGroup(environment.resolvePlaceholders(annotation.consumerGroup())); + if (RocketMQListener.class.isAssignableFrom(bean.getClass())) { container.setRocketMQListener((RocketMQListener) bean); } else if (RocketMQReplyListener.class.isAssignableFrom(bean.getClass())) { container.setRocketMQReplyListener((RocketMQReplyListener) bean); } + container.setMessageConverter(rocketMQMessageConverter.getMessageConverter()); - container.setName(name); // REVIEW ME, use the same clientId or multiple? + container.setName(name); return container; } diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQTransactionConfiguration.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQTransactionConfiguration.java index 1a897e51..2daefcfc 100644 --- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQTransactionConfiguration.java +++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQTransactionConfiguration.java @@ -22,11 +22,13 @@ import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; + import org.apache.rocketmq.client.producer.TransactionMQProducer; import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener; import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener; import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.apache.rocketmq.spring.support.RocketMQUtil; +import org.apache.rocketmq.spring.support.SpringBeanUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.aop.framework.AopProxyUtils; @@ -44,12 +46,14 @@ public class RocketMQTransactionConfiguration implements ApplicationContextAware private ConfigurableApplicationContext applicationContext; - @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { + @Override + public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { this.applicationContext = (ConfigurableApplicationContext) applicationContext; } - @Override public void afterSingletonsInstantiated() { - Map beans = this.applicationContext.getBeansWithAnnotation(RocketMQTransactionListener.class); + @Override + public void afterSingletonsInstantiated() { + Map beans = SpringBeanUtil.getBeansWithAnnotation(this.applicationContext, RocketMQTransactionListener.class); if (Objects.nonNull(beans)) { beans.forEach(this::registerTransactionListener); diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/SpringBeanUtil.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/SpringBeanUtil.java new file mode 100644 index 00000000..b5d11615 --- /dev/null +++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/SpringBeanUtil.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.spring.support; + +import org.springframework.aop.scope.ScopedProxyUtils; +import org.springframework.context.ConfigurableApplicationContext; +import org.springframework.lang.NonNull; + +import java.lang.annotation.Annotation; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +public class SpringBeanUtil { + + /** + * Override applicationContext.getBeansWithAnnotation method to make sure without same ProxyTarget beans + * + * @param applicationContext spring Application Context + * @param clazz annotation class + * @return beans map without proxyTarget bean + */ + public static Map getBeansWithAnnotation(@NonNull ConfigurableApplicationContext applicationContext, Class clazz) { + Map beans = applicationContext.getBeansWithAnnotation(clazz); + Map filterBeans = new HashMap<>(beans.size()); + // remove proxy target + Set> entrySet = beans.entrySet(); + entrySet.forEach((entry) -> { + final String beanName = entry.getKey(); + if (!ScopedProxyUtils.isScopedTarget(beanName)) { + filterBeans.put(beanName, entry.getValue()); + } + }); + return filterBeans; + } + +}