Skip to content

Commit

Permalink
Merge pull request #216 from RongtongJin/master-2
Browse files Browse the repository at this point in the history
[ISSUE #215] Polish the code and modify version
  • Loading branch information
vongosling committed Dec 27, 2019
2 parents 1aea7e1 + 29a8be0 commit 0c83322
Show file tree
Hide file tree
Showing 10 changed files with 113 additions and 136 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@

<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-all</artifactId>
<version>2.0.5-SNAPSHOT</version>
<version>2.1.0-SNAPSHOT</version>
<packaging>pom</packaging>

<name>Apache RocketMQ Spring Boot ${project.version}</name>
Expand Down
4 changes: 2 additions & 2 deletions rocketmq-spring-boot-parent/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-all</artifactId>
<version>2.0.5-SNAPSHOT</version>
<version>2.1.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand All @@ -38,7 +38,7 @@
<spring.boot.version>2.0.5.RELEASE</spring.boot.version>
<spring.version>5.1.0.RELEASE</spring.version>

<rocketmq.spring.boot.version>2.0.5-SNAPSHOT</rocketmq.spring.boot.version>
<rocketmq.spring.boot.version>2.1.0-SNAPSHOT</rocketmq.spring.boot.version>

<rocketmq-version>4.6.0</rocketmq-version>
<slf4j.version>1.7.25</slf4j.version>
Expand Down
2 changes: 1 addition & 1 deletion rocketmq-spring-boot-samples/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
</modules>

<properties>
<rocketmq-spring-boot-starter-version>2.0.5-SNAPSHOT</rocketmq-spring-boot-starter-version>
<rocketmq-spring-boot-starter-version>2.1.0-SNAPSHOT</rocketmq-spring-boot-starter-version>
</properties>

<dependencies>
Expand Down
4 changes: 2 additions & 2 deletions rocketmq-spring-boot-starter/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,15 @@
<parent>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-parent</artifactId>
<version>2.0.5-SNAPSHOT</version>
<version>2.1.0-SNAPSHOT</version>
<relativePath>../rocketmq-spring-boot-parent/pom.xml</relativePath>
</parent>

<artifactId>rocketmq-spring-boot-starter</artifactId>
<packaging>jar</packaging>

<name>RocketMQ Spring Boot Starter</name>
<description>SRocketMQ Spring Boot Starter</description>
<description>RocketMQ Spring Boot Starter</description>
<url>https://github.com/apache/rocketmq-spring</url>

<dependencies>
Expand Down
4 changes: 2 additions & 2 deletions rocketmq-spring-boot/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,15 @@
<parent>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-parent</artifactId>
<version>2.0.5-SNAPSHOT</version>
<version>2.1.0-SNAPSHOT</version>
<relativePath>../rocketmq-spring-boot-parent/pom.xml</relativePath>
</parent>

<artifactId>rocketmq-spring-boot</artifactId>
<packaging>jar</packaging>

<name>RocketMQ Spring Boot AutoConfigure</name>
<description>SRocketMQ Spring Boot AutoConfigure</description>
<description>RocketMQ Spring Boot AutoConfigure</description>
<url>https://github.com/apache/rocketmq-spring</url>


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,14 @@

package org.apache.rocketmq.spring.autoconfigure;

import java.lang.reflect.Field;
import java.util.Map;
import java.util.Objects;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.client.trace.AsyncTraceDispatcher;
import org.apache.rocketmq.client.trace.hook.SendMessageTraceHookImpl;
import org.apache.rocketmq.spring.annotation.ExtRocketMQTemplateConfiguration;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.apache.rocketmq.spring.support.RocketMQMessageConverter;
import org.apache.rocketmq.spring.support.RocketMQUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.aop.framework.AopProxyUtils;
Expand Down Expand Up @@ -104,7 +99,6 @@ private void registerTemplate(String beanName, Object bean) {
}

private DefaultMQProducer createProducer(ExtRocketMQTemplateConfiguration annotation) {
DefaultMQProducer producer = null;

RocketMQProperties.Producer producerConfig = rocketMQProperties.getProducer();
if (producerConfig == null) {
Expand All @@ -118,35 +112,15 @@ private DefaultMQProducer createProducer(ExtRocketMQTemplateConfiguration annota
ak = StringUtils.isEmpty(ak) ? producerConfig.getAccessKey() : annotation.accessKey();
String sk = environment.resolvePlaceholders(annotation.secretKey());
sk = StringUtils.isEmpty(sk) ? producerConfig.getSecretKey() : annotation.secretKey();
boolean isEnableMsgTrace = annotation.enableMsgTrace();
String customizedTraceTopic = environment.resolvePlaceholders(annotation.customizedTraceTopic());
customizedTraceTopic = StringUtils.isEmpty(customizedTraceTopic) ? producerConfig.getCustomizedTraceTopic() : customizedTraceTopic;

boolean isEnableAcl = !StringUtils.isEmpty(ak) && !StringUtils.isEmpty(sk);

if (isEnableAcl) {
producer = new TransactionMQProducer(groupName, new AclClientRPCHook(new SessionCredentials(ak, sk)));
producer.setVipChannelEnabled(false);
} else {
producer = new TransactionMQProducer(groupName);
}

if (annotation.enableMsgTrace()) {
try {
AsyncTraceDispatcher dispatcher = new AsyncTraceDispatcher(customizedTraceTopic, isEnableAcl ? new AclClientRPCHook(new SessionCredentials(ak, sk)) : null);
dispatcher.setHostProducer(producer.getDefaultMQProducerImpl());
Field field = DefaultMQProducer.class.getDeclaredField("traceDispatcher");
field.setAccessible(true);
field.set(producer, dispatcher);
producer.getDefaultMQProducerImpl().registerSendMessageHook(
new SendMessageTraceHookImpl(dispatcher));
} catch (Throwable e) {
log.error("system mqtrace hook init failed ,maybe can't send msg trace data");
}
}
DefaultMQProducer producer = RocketMQUtil.createDefaultMQProducer(groupName, ak, sk, isEnableMsgTrace, customizedTraceTopic);

producer.setNamesrvAddr(nameServer);
producer.setSendMsgTimeout(annotation.sendMessageTimeout() == -1 ? producerConfig.getSendMessageTimeout() : annotation.sendMessageTimeout());
producer.setRetryTimesWhenSendFailed(annotation.retryTimesWhenSendAsyncFailed() == -1 ? producerConfig.getRetryTimesWhenSendFailed() : annotation.retryTimesWhenSendAsyncFailed());
producer.setRetryTimesWhenSendFailed(annotation.retryTimesWhenSendFailed() == -1 ? producerConfig.getRetryTimesWhenSendFailed() : annotation.retryTimesWhenSendFailed());
producer.setRetryTimesWhenSendAsyncFailed(annotation.retryTimesWhenSendAsyncFailed() == -1 ? producerConfig.getRetryTimesWhenSendAsyncFailed() : annotation.retryTimesWhenSendAsyncFailed());
producer.setMaxMessageSize(annotation.maxMessageSize() == -1 ? producerConfig.getMaxMessageSize() : annotation.maxMessageSize());
producer.setCompressMsgBodyOverHowmuch(annotation.compressMessageBodyThreshold() == -1 ? producerConfig.getCompressMessageBodyThreshold() : annotation.compressMessageBodyThreshold());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,13 @@

package org.apache.rocketmq.spring.autoconfigure;

import java.lang.reflect.Field;
import javax.annotation.PostConstruct;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.MQAdmin;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.client.trace.AsyncTraceDispatcher;
import org.apache.rocketmq.client.trace.hook.SendMessageTraceHookImpl;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.apache.rocketmq.spring.support.RocketMQMessageConverter;
import org.apache.rocketmq.spring.support.RocketMQUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
Expand Down Expand Up @@ -84,30 +79,12 @@ public DefaultMQProducer defaultMQProducer(RocketMQProperties rocketMQProperties

String accessChannel = rocketMQProperties.getAccessChannel();

DefaultMQProducer producer;
String ak = rocketMQProperties.getProducer().getAccessKey();
String sk = rocketMQProperties.getProducer().getSecretKey();
boolean isEnableAcl = !StringUtils.isEmpty(ak) && !StringUtils.isEmpty(sk);
if (isEnableAcl) {
producer = new TransactionMQProducer(groupName, new AclClientRPCHook(new SessionCredentials(ak, sk)));
producer.setVipChannelEnabled(false);
} else {
producer = new TransactionMQProducer(groupName);
}
boolean isEnableMsgTrace = rocketMQProperties.getProducer().isEnableMsgTrace();
String customizedTraceTopic = rocketMQProperties.getProducer().getCustomizedTraceTopic();

if (rocketMQProperties.getProducer().isEnableMsgTrace()) {
try {
AsyncTraceDispatcher dispatcher = new AsyncTraceDispatcher(rocketMQProperties.getProducer().getCustomizedTraceTopic(), isEnableAcl ? new AclClientRPCHook(new SessionCredentials(ak, sk)) : null);
dispatcher.setHostProducer(producer.getDefaultMQProducerImpl());
Field field = DefaultMQProducer.class.getDeclaredField("traceDispatcher");
field.setAccessible(true);
field.set(producer, dispatcher);
producer.getDefaultMQProducerImpl().registerSendMessageHook(
new SendMessageTraceHookImpl(dispatcher));
} catch (Throwable e) {
log.error("system mqtrace hook init failed ,maybe can't send msg trace data");
}
}
DefaultMQProducer producer = RocketMQUtil.createDefaultMQProducer(groupName, ak, sk, isEnableMsgTrace, customizedTraceTopic);

producer.setNamesrvAddr(nameServer);
if (!StringUtils.isEmpty(accessChannel)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public class RocketMQProperties {
private String nameServer;

/**
* Enum type for accesChannel, values: LOCAL, CLOUD
* Enum type for accessChannel, values: LOCAL, CLOUD
*/
private String accessChannel;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,14 +209,14 @@ public SelectorType getSelectorType() {
return selectorType;
}

public void setSelectorExpression(String selectorExpression) {
this.selectorExpression = selectorExpression;
}

public String getSelectorExpression() {
return selectorExpression;
}

public void setSelectorExpression(String selectorExpression) {
this.selectorExpression = selectorExpression;
}

public MessageModel getMessageModel() {
return messageModel;
}
Expand Down Expand Up @@ -296,7 +296,6 @@ public int getPhase() {
return Integer.MAX_VALUE;
}


@Override
public void afterPropertiesSet() throws Exception {
initRocketMQPushConsumer();
Expand Down Expand Up @@ -328,53 +327,6 @@ public void setName(String name) {
this.name = name;
}

public class DefaultMessageListenerConcurrently implements MessageListenerConcurrently {

@SuppressWarnings("unchecked")
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt messageExt : msgs) {
log.debug("received msg: {}", messageExt);
try {
long now = System.currentTimeMillis();
rocketMQListener.onMessage(doConvertMessage(messageExt));
long costTime = System.currentTimeMillis() - now;
log.debug("consume {} cost: {} ms", messageExt.getMsgId(), costTime);
} catch (Exception e) {
log.warn("consume message failed. messageExt:{}", messageExt, e);
context.setDelayLevelWhenNextConsume(delayLevelWhenNextConsume);
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}

return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
}

public class DefaultMessageListenerOrderly implements MessageListenerOrderly {

@SuppressWarnings("unchecked")
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
for (MessageExt messageExt : msgs) {
log.debug("received msg: {}", messageExt);
try {
long now = System.currentTimeMillis();
rocketMQListener.onMessage(doConvertMessage(messageExt));
long costTime = System.currentTimeMillis() - now;
log.info("consume {} cost: {} ms", messageExt.getMsgId(), costTime);
} catch (Exception e) {
log.warn("consume message failed. messageExt:{}", messageExt, e);
context.setSuspendCurrentQueueTimeMillis(suspendCurrentQueueTimeMillis);
return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
}
}

return ConsumeOrderlyStatus.SUCCESS;
}
}


@SuppressWarnings("unchecked")
private Object doConvertMessage(MessageExt messageExt) {
if (Objects.equals(messageType, MessageExt.class)) {
Expand Down Expand Up @@ -402,7 +354,6 @@ private Object doConvertMessage(MessageExt messageExt) {
}
}


private MethodParameter getMethodParameter() {
Class<?> targetClass = AopProxyUtils.ultimateTargetClass(rocketMQListener);
Type messageType = this.getMessageType();
Expand Down Expand Up @@ -470,7 +421,7 @@ private void initRocketMQPushConsumer() throws MQClientException {
} else {
log.debug("Access-key or secret-key not configure in " + this + ".");
consumer = new DefaultMQPushConsumer(consumerGroup, enableMsgTrace,
this.applicationContext.getEnvironment().
this.applicationContext.getEnvironment().
resolveRequiredPlaceholders(this.rocketMQMessageListener.customizedTraceTopic()));
}

Expand Down Expand Up @@ -528,4 +479,50 @@ private void initRocketMQPushConsumer() throws MQClientException {

}

public class DefaultMessageListenerConcurrently implements MessageListenerConcurrently {

@SuppressWarnings("unchecked")
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt messageExt : msgs) {
log.debug("received msg: {}", messageExt);
try {
long now = System.currentTimeMillis();
rocketMQListener.onMessage(doConvertMessage(messageExt));
long costTime = System.currentTimeMillis() - now;
log.debug("consume {} cost: {} ms", messageExt.getMsgId(), costTime);
} catch (Exception e) {
log.warn("consume message failed. messageExt:{}", messageExt, e);
context.setDelayLevelWhenNextConsume(delayLevelWhenNextConsume);
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}

return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
}

public class DefaultMessageListenerOrderly implements MessageListenerOrderly {

@SuppressWarnings("unchecked")
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
for (MessageExt messageExt : msgs) {
log.debug("received msg: {}", messageExt);
try {
long now = System.currentTimeMillis();
rocketMQListener.onMessage(doConvertMessage(messageExt));
long costTime = System.currentTimeMillis() - now;
log.info("consume {} cost: {} ms", messageExt.getMsgId(), costTime);
} catch (Exception e) {
log.warn("consume message failed. messageExt:{}", messageExt, e);
context.setSuspendCurrentQueueTimeMillis(suspendCurrentQueueTimeMillis);
return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
}
}

return ConsumeOrderlyStatus.SUCCESS;
}
}

}
Loading

0 comments on commit 0c83322

Please sign in to comment.