Skip to content

Commit

Permalink
Merge 0ccbfa6 into 7193566
Browse files Browse the repository at this point in the history
  • Loading branch information
chenghm123 committed Feb 21, 2021
2 parents 7193566 + 0ccbfa6 commit 8eb32d5
Show file tree
Hide file tree
Showing 4 changed files with 116 additions and 342 deletions.
Expand Up @@ -35,21 +35,17 @@
import org.apache.rocketmq.spring.support.RocketMQUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.aop.framework.AopProxyUtils;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.MessagingException;
import org.springframework.messaging.converter.SmartMessageConverter;
import org.springframework.messaging.core.AbstractMessageSendingTemplate;
import org.springframework.messaging.core.MessagePostProcessor;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.util.MimeTypeUtils;

import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
Expand Down Expand Up @@ -246,7 +242,8 @@ public <T> T sendAndReceive(String destination, Message<?> message, Type type, S
} else {
replyMessage = (MessageExt) producer.request(rocketMsg, messageQueueSelector, hashKey, timeout);
}
return replyMessage != null ? (T) doConvertMessage(replyMessage, type) : null;
return replyMessage != null ? (T) RocketMQUtil.doConvertMessage(replyMessage, type, null,
this.getMessageConverter(), charset) : null;
} catch (Exception e) {
log.error("send request message failed. destination:{}, message:{} ", destination, message);
throw new MessagingException(e.getMessage(), e);
Expand Down Expand Up @@ -436,7 +433,9 @@ public void sendAndReceive(String destination, Message<?> message,
if (rocketMQLocalRequestCallback != null) {
requestCallback = new RequestCallback() {
@Override public void onSuccess(org.apache.rocketmq.common.message.Message message) {
rocketMQLocalRequestCallback.onSuccess(doConvertMessage((MessageExt) message, getMessageType(rocketMQLocalRequestCallback)));
Type messageType = RocketMQUtil.getMessageType(rocketMQLocalRequestCallback, RocketMQLocalRequestCallback.class);
rocketMQLocalRequestCallback.onSuccess(RocketMQUtil.doConvertMessage((MessageExt) message,
messageType, null, getMessageConverter(), charset));
}

@Override public void onException(Throwable e) {
Expand Down Expand Up @@ -986,60 +985,6 @@ private org.apache.rocketmq.common.message.Message createRocketMqMessage(
destination, msg);
}

private Object doConvertMessage(MessageExt messageExt, Type type) {
if (Objects.equals(type, MessageExt.class)) {
return messageExt;
} else if (Objects.equals(type, byte[].class)) {
return messageExt.getBody();
} else {
String str = new String(messageExt.getBody(), Charset.forName(charset));
if (Objects.equals(type, String.class)) {
return str;
} else {
// If msgType not string, use objectMapper change it.
try {
if (type instanceof Class) {
//if the messageType has not Generic Parameter
return this.getMessageConverter().fromMessage(MessageBuilder.withPayload(str).build(), (Class<?>) type);
} else {
//if the messageType has Generic Parameter, then use SmartMessageConverter#fromMessage with third parameter "conversionHint".
//we have validate the MessageConverter is SmartMessageConverter in this#getMethodParameter.
return ((SmartMessageConverter) this.getMessageConverter()).fromMessage(MessageBuilder.withPayload(str).build(), (Class<?>) ((ParameterizedType) type).getRawType(), null);
}
} catch (Exception e) {
log.error("convert failed. str:{}, msgType:{}", str, type);
throw new RuntimeException("cannot convert message to " + type, e);
}
}
}
}

private Type getMessageType(RocketMQLocalRequestCallback rocketMQLocalRequestCallback) {
Class<?> targetClass = AopProxyUtils.ultimateTargetClass(rocketMQLocalRequestCallback);
Type matchedGenericInterface = null;
while (Objects.nonNull(targetClass)) {
Type[] interfaces = targetClass.getGenericInterfaces();
if (Objects.nonNull(interfaces)) {
for (Type type : interfaces) {
if (type instanceof ParameterizedType && (Objects.equals(((ParameterizedType) type).getRawType(), RocketMQLocalRequestCallback.class))) {
matchedGenericInterface = type;
break;
}
}
}
targetClass = targetClass.getSuperclass();
}
if (Objects.isNull(matchedGenericInterface)) {
return Object.class;
}

Type[] actualTypeArguments = ((ParameterizedType) matchedGenericInterface).getActualTypeArguments();
if (Objects.nonNull(actualTypeArguments) && actualTypeArguments.length > 0) {
return actualTypeArguments[0];
}
return Object.class;
}

private MessageBatch batch(Collection<org.apache.rocketmq.common.message.Message> msgs) throws MQClientException {
MessageBatch msgBatch;
try {
Expand Down Expand Up @@ -1080,28 +1025,10 @@ public <T> List<T> receive(Class<T> clazz, long timeout) {
List<MessageExt> messageExts = this.consumer.poll(timeout);
List<T> list = new ArrayList<>(messageExts.size());
for (MessageExt messageExt : messageExts) {
list.add(doConvertMessage(messageExt, clazz));
list.add((T) RocketMQUtil.doConvertMessage(messageExt, clazz,
null, getMessageConverter(), charset));
}
return list;
}

@SuppressWarnings("unchecked")
private <T> T doConvertMessage(MessageExt messageExt, Class<T> messageType) {
if (Objects.equals(messageType, MessageExt.class)) {
return (T) messageExt;
} else {
String str = new String(messageExt.getBody(), Charset.forName(charset));
if (Objects.equals(messageType, String.class)) {
return (T) str;
} else {
// If msgType not string, use objectMapper change it.
try {
return (T) this.getMessageConverter().fromMessage(MessageBuilder.withPayload(str).build(), messageType);
} catch (Exception e) {
log.info("convert failed. str:{}, msgType:{}", str, messageType);
throw new RuntimeException("cannot convert message to " + messageType, e);
}
}
}
}
}
Expand Up @@ -17,12 +17,6 @@

package org.apache.rocketmq.spring.support;

import java.lang.reflect.Method;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.nio.charset.Charset;
import java.util.List;
import java.util.Objects;
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.MessageSelector;
Expand Down Expand Up @@ -50,7 +44,6 @@
import org.apache.rocketmq.spring.core.RocketMQReplyListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.aop.framework.AopProxyUtils;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.context.ApplicationContext;
Expand All @@ -66,6 +59,11 @@
import org.springframework.util.Assert;
import org.springframework.util.MimeTypeUtils;

import java.lang.reflect.Type;
import java.nio.charset.Charset;
import java.util.List;
import java.util.Objects;

@SuppressWarnings("WeakerAccess")
public class DefaultRocketMQListenerContainer implements InitializingBean,
RocketMQListenerContainer, SmartLifecycle, ApplicationContextAware {
Expand Down Expand Up @@ -315,8 +313,20 @@ public int getPhase() {
public void afterPropertiesSet() throws Exception {
initRocketMQPushConsumer();

this.messageType = getMessageType();
this.methodParameter = getMethodParameter();
if (rocketMQListener != null) {
this.messageType = RocketMQUtil.getMessageType(rocketMQListener, RocketMQListener.class);
} else {
this.messageType = RocketMQUtil.getMessageType(rocketMQReplyListener, RocketMQReplyListener.class);
}

if (rocketMQListener != null) {
this.methodParameter = RocketMQUtil.getMethodParameter(messageType,
rocketMQListener, "onMessage", messageConverter);
} else {
this.methodParameter = RocketMQUtil.getMethodParameter(messageType,
rocketMQReplyListener, "onMessage", messageConverter);
}

log.debug("RocketMQ messageType: {}", messageType);
}

Expand Down Expand Up @@ -391,9 +401,11 @@ public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderly
private void handleMessage(
MessageExt messageExt) throws MQClientException, RemotingException, InterruptedException {
if (rocketMQListener != null) {
rocketMQListener.onMessage(doConvertMessage(messageExt));
rocketMQListener.onMessage(RocketMQUtil.doConvertMessage(messageExt, messageType,
methodParameter, messageConverter, charset));
} else if (rocketMQReplyListener != null) {
Object replyContent = rocketMQReplyListener.onMessage(doConvertMessage(messageExt));
Object replyContent = rocketMQReplyListener.onMessage(RocketMQUtil.doConvertMessage(messageExt, messageType,
methodParameter, messageConverter, charset));
Message<?> message = MessageBuilder.withPayload(replyContent).build();

org.apache.rocketmq.common.message.Message replyMessage = MessageUtil.createReplyMessage(messageExt, convertToBytes(message));
Expand Down Expand Up @@ -455,90 +467,6 @@ private Message<?> doConvert(Object payload, MessageHeaders headers) {
return builder.build();
}

@SuppressWarnings("unchecked")
private Object doConvertMessage(MessageExt messageExt) {
if (Objects.equals(messageType, MessageExt.class) || Objects.equals(messageType, org.apache.rocketmq.common.message.Message.class)) {
return messageExt;
} else {
String str = new String(messageExt.getBody(), Charset.forName(charset));
if (Objects.equals(messageType, String.class)) {
return str;
} else {
// If msgType not string, use objectMapper change it.
try {
if (messageType instanceof Class) {
//if the messageType has not Generic Parameter
return this.getMessageConverter().fromMessage(MessageBuilder.withPayload(str).build(), (Class<?>) messageType);
} else {
//if the messageType has Generic Parameter, then use SmartMessageConverter#fromMessage with third parameter "conversionHint".
//we have validate the MessageConverter is SmartMessageConverter in this#getMethodParameter.
return ((SmartMessageConverter) this.getMessageConverter()).fromMessage(MessageBuilder.withPayload(str).build(), (Class<?>) ((ParameterizedType) messageType).getRawType(), methodParameter);
}
} catch (Exception e) {
log.info("convert failed. str:{}, msgType:{}", str, messageType);
throw new RuntimeException("cannot convert message to " + messageType, e);
}
}
}
}

private MethodParameter getMethodParameter() {
Class<?> targetClass;
if (rocketMQListener != null) {
targetClass = AopProxyUtils.ultimateTargetClass(rocketMQListener);
} else {
targetClass = AopProxyUtils.ultimateTargetClass(rocketMQReplyListener);
}
Type messageType = this.getMessageType();
Class clazz = null;
if (messageType instanceof ParameterizedType && messageConverter instanceof SmartMessageConverter) {
clazz = (Class) ((ParameterizedType) messageType).getRawType();
} else if (messageType instanceof Class) {
clazz = (Class) messageType;
} else {
throw new RuntimeException("parameterType:" + messageType + " of onMessage method is not supported");
}
try {
final Method method = targetClass.getMethod("onMessage", clazz);
return new MethodParameter(method, 0);
} catch (NoSuchMethodException e) {
e.printStackTrace();
throw new RuntimeException("parameterType:" + messageType + " of onMessage method is not supported");
}
}

private Type getMessageType() {
Class<?> targetClass;
if (rocketMQListener != null) {
targetClass = AopProxyUtils.ultimateTargetClass(rocketMQListener);
} else {
targetClass = AopProxyUtils.ultimateTargetClass(rocketMQReplyListener);
}
Type matchedGenericInterface = null;
while (Objects.nonNull(targetClass)) {
Type[] interfaces = targetClass.getGenericInterfaces();
if (Objects.nonNull(interfaces)) {
for (Type type : interfaces) {
if (type instanceof ParameterizedType &&
(Objects.equals(((ParameterizedType) type).getRawType(), RocketMQListener.class) || Objects.equals(((ParameterizedType) type).getRawType(), RocketMQReplyListener.class))) {
matchedGenericInterface = type;
break;
}
}
}
targetClass = targetClass.getSuperclass();
}
if (Objects.isNull(matchedGenericInterface)) {
return Object.class;
}

Type[] actualTypeArguments = ((ParameterizedType) matchedGenericInterface).getActualTypeArguments();
if (Objects.nonNull(actualTypeArguments) && actualTypeArguments.length > 0) {
return actualTypeArguments[0];
}
return Object.class;
}

private void initRocketMQPushConsumer() throws MQClientException {
if (rocketMQListener == null && rocketMQReplyListener == null) {
throw new IllegalArgumentException("Property 'rocketMQListener' or 'rocketMQReplyListener' is required");
Expand Down

0 comments on commit 8eb32d5

Please sign in to comment.