Open
Description
Is Your Feature Request Related to a Problem?
user set setDelayLevelWhenNextConsume ,which is delayLevel
private void sendMessageBack(MessageExt msg, int delayLevel, final String brokerName, final MessageQueue mq)
throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
boolean needRetry = true;
try {
if (brokerName != null && brokerName.startsWith(MixAll.LOGICAL_QUEUE_MOCK_BROKER_PREFIX)
|| mq != null && mq.getBrokerName().startsWith(MixAll.LOGICAL_QUEUE_MOCK_BROKER_PREFIX)) {
needRetry = false;
sendMessageBackAsNormalMessage(msg);
} else {
String brokerAddr = (null != brokerName) ? this.mQClientFactory.findBrokerAddressInPublish(brokerName)
: RemotingHelper.parseSocketAddressAddr(msg.getStoreHost());
this.mQClientFactory.getMQClientAPIImpl().consumerSendMessageBack(brokerAddr, brokerName, msg,
this.defaultMQPushConsumer.getConsumerGroup(), delayLevel, 5000, getMaxReconsumeTimes());
}
} catch (Throwable t) {
log.error("Failed to send message back, consumerGroup={}, brokerName={}, mq={}, message={}",
this.defaultMQPushConsumer.getConsumerGroup(), brokerName, mq, msg, t);
if (needRetry) {
sendMessageBackAsNormalMessage(msg);
}
} finally {
msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQPushConsumer.getNamespace()));
}
}
Describe the Solution You'd Like
when send as normal, use the delayLevel user provided
Describe Alternatives You've Considered
no
Additional Context
No response