Skip to content

Commit

Permalink
[ISSUE #218] Fix spring scopeTarget will repeat consumer instance (#210)
Browse files Browse the repository at this point in the history
* change clientId algorithm

* code format

* develop

* optimize on 2.0.5.EINSITANG

* revert pom version

* change note

* change note

* revert demo.rocketmq.myNameServer

* remove clientInstaceName

* remove unuse method

* pass ci-check

* remove pass annotation

* correct variable word

* optimize annotation

* merge

Co-authored-by: von gosling <vongosling@apache.org>
  • Loading branch information
forfuns and vongosling committed Feb 10, 2020
1 parent c7f230f commit 2853384
Show file tree
Hide file tree
Showing 4 changed files with 79 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.apache.rocketmq.spring.support.RocketMQMessageConverter;
import org.apache.rocketmq.spring.support.RocketMQUtil;
import org.apache.rocketmq.spring.support.SpringBeanUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.aop.framework.AopProxyUtils;
Expand Down Expand Up @@ -52,20 +53,20 @@ public class ExtProducerResetConfiguration implements ApplicationContextAware, S
private RocketMQMessageConverter rocketMQMessageConverter;

public ExtProducerResetConfiguration(RocketMQMessageConverter rocketMQMessageConverter,
StandardEnvironment environment, RocketMQProperties rocketMQProperties) {
StandardEnvironment environment, RocketMQProperties rocketMQProperties) {
this.rocketMQMessageConverter = rocketMQMessageConverter;
this.environment = environment;
this.rocketMQProperties = rocketMQProperties;
}

@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = (ConfigurableApplicationContext)applicationContext;
this.applicationContext = (ConfigurableApplicationContext) applicationContext;
}

@Override
public void afterSingletonsInstantiated() {
Map<String, Object> beans = this.applicationContext.getBeansWithAnnotation(ExtRocketMQTemplateConfiguration.class);
Map<String, Object> beans = SpringBeanUtil.getBeansWithAnnotation(this.applicationContext, ExtRocketMQTemplateConfiguration.class);

if (Objects.nonNull(beans)) {
beans.forEach(this::registerTemplate);
Expand All @@ -80,7 +81,7 @@ private void registerTemplate(String beanName, Object bean) {
}

ExtRocketMQTemplateConfiguration annotation = clazz.getAnnotation(ExtRocketMQTemplateConfiguration.class);
GenericApplicationContext genericApplicationContext = (GenericApplicationContext)applicationContext;
GenericApplicationContext genericApplicationContext = (GenericApplicationContext) applicationContext;
validate(annotation, genericApplicationContext);

DefaultMQProducer mqProducer = createProducer(annotation);
Expand All @@ -92,7 +93,7 @@ private void registerTemplate(String beanName, Object bean) {
throw new BeanDefinitionValidationException(String.format("Failed to startup MQProducer for RocketMQTemplate {}",
beanName), e);
}
RocketMQTemplate rocketMQTemplate = (RocketMQTemplate)bean;
RocketMQTemplate rocketMQTemplate = (RocketMQTemplate) bean;
rocketMQTemplate.setProducer(mqProducer);
rocketMQTemplate.setMessageConverter(rocketMQMessageConverter.getMessageConverter());
log.info("Set real producer to :{} {}", beanName, annotation.value());
Expand Down Expand Up @@ -130,7 +131,7 @@ private DefaultMQProducer createProducer(ExtRocketMQTemplateConfiguration annota
}

private void validate(ExtRocketMQTemplateConfiguration annotation,
GenericApplicationContext genericApplicationContext) {
GenericApplicationContext genericApplicationContext) {
if (genericApplicationContext.isBeanNameInUse(annotation.value())) {
throw new BeanDefinitionValidationException(String.format("Bean {} has been used in Spring Application Context, " +
"please check the @ExtRocketMQTemplateConfiguration",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,6 @@

package org.apache.rocketmq.spring.autoconfigure;

import java.util.Collections;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.MessageModel;
Expand All @@ -29,6 +25,7 @@
import org.apache.rocketmq.spring.core.RocketMQReplyListener;
import org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer;
import org.apache.rocketmq.spring.support.RocketMQMessageConverter;
import org.apache.rocketmq.spring.support.SpringBeanUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.aop.framework.AopProxyUtils;
Expand All @@ -43,6 +40,11 @@
import org.springframework.core.env.StandardEnvironment;
import org.springframework.util.StringUtils;

import java.util.Collections;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicLong;

@Configuration
public class ListenerContainerConfiguration implements ApplicationContextAware, SmartInitializingSingleton {
private final static Logger log = LoggerFactory.getLogger(ListenerContainerConfiguration.class);
Expand All @@ -58,7 +60,7 @@ public class ListenerContainerConfiguration implements ApplicationContextAware,
private RocketMQMessageConverter rocketMQMessageConverter;

public ListenerContainerConfiguration(RocketMQMessageConverter rocketMQMessageConverter,
StandardEnvironment environment, RocketMQProperties rocketMQProperties) {
StandardEnvironment environment, RocketMQProperties rocketMQProperties) {
this.rocketMQMessageConverter = rocketMQMessageConverter;
this.environment = environment;
this.rocketMQProperties = rocketMQProperties;
Expand All @@ -71,7 +73,8 @@ public void setApplicationContext(ApplicationContext applicationContext) throws

@Override
public void afterSingletonsInstantiated() {
Map<String, Object> beans = this.applicationContext.getBeansWithAnnotation(RocketMQMessageListener.class);
Map<String, Object> beans = SpringBeanUtil.getBeansWithAnnotation(this.applicationContext, RocketMQMessageListener.
class);

if (Objects.nonNull(beans)) {
beans.forEach(this::registerContainer);
Expand Down Expand Up @@ -127,7 +130,7 @@ private void registerContainer(String beanName, Object bean) {
}

private DefaultRocketMQListenerContainer createRocketMQListenerContainer(String name, Object bean,
RocketMQMessageListener annotation) {
RocketMQMessageListener annotation) {
DefaultRocketMQListenerContainer container = new DefaultRocketMQListenerContainer();

container.setRocketMQMessageListener(annotation);
Expand All @@ -145,13 +148,15 @@ private DefaultRocketMQListenerContainer createRocketMQListenerContainer(String
container.setSelectorExpression(tags);
}
container.setConsumerGroup(environment.resolvePlaceholders(annotation.consumerGroup()));

if (RocketMQListener.class.isAssignableFrom(bean.getClass())) {
container.setRocketMQListener((RocketMQListener) bean);
} else if (RocketMQReplyListener.class.isAssignableFrom(bean.getClass())) {
container.setRocketMQReplyListener((RocketMQReplyListener) bean);
}

container.setMessageConverter(rocketMQMessageConverter.getMessageConverter());
container.setName(name); // REVIEW ME, use the same clientId or multiple?
container.setName(name);

return container;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,13 @@
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.apache.rocketmq.spring.support.RocketMQUtil;
import org.apache.rocketmq.spring.support.SpringBeanUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.aop.framework.AopProxyUtils;
Expand All @@ -44,12 +46,14 @@ public class RocketMQTransactionConfiguration implements ApplicationContextAware

private ConfigurableApplicationContext applicationContext;

@Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = (ConfigurableApplicationContext) applicationContext;
}

@Override public void afterSingletonsInstantiated() {
Map<String, Object> beans = this.applicationContext.getBeansWithAnnotation(RocketMQTransactionListener.class);
@Override
public void afterSingletonsInstantiated() {
Map<String, Object> beans = SpringBeanUtil.getBeansWithAnnotation(this.applicationContext, RocketMQTransactionListener.class);

if (Objects.nonNull(beans)) {
beans.forEach(this::registerTransactionListener);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.rocketmq.spring.support;

import org.springframework.aop.scope.ScopedProxyUtils;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.lang.NonNull;

import java.lang.annotation.Annotation;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;

public class SpringBeanUtil {

/**
* Override applicationContext.getBeansWithAnnotation method to make sure without same ProxyTarget beans
*
* @param applicationContext spring Application Context
* @param clazz annotation class
* @return beans map without proxyTarget bean
*/
public static Map<String, Object> getBeansWithAnnotation(@NonNull ConfigurableApplicationContext applicationContext, Class<? extends Annotation> clazz) {
Map<String, Object> beans = applicationContext.getBeansWithAnnotation(clazz);
Map<String, Object> filterBeans = new HashMap<>(beans.size());
// remove proxy target
Set<Map.Entry<String, Object>> entrySet = beans.entrySet();
entrySet.forEach((entry) -> {
final String beanName = entry.getKey();
if (!ScopedProxyUtils.isScopedTarget(beanName)) {
filterBeans.put(beanName, entry.getValue());
}
});
return filterBeans;
}

}

0 comments on commit 2853384

Please sign in to comment.