Skip to content

Commit

Permalink
[ISSUE #6192] Set a default value when UniqID is empty in Proxy (#6193)
Browse files Browse the repository at this point in the history
  • Loading branch information
xdkxlk committed Feb 27, 2023
1 parent 2f93cbe commit 90bf886
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ protected SystemProperties buildSystemProperties(MessageExt messageExt) {
}

// message_id
String uniqKey = messageExt.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
String uniqKey = messageExt.getMsgId();
if (uniqKey != null) {
systemPropertiesBuilder.setMessageId(uniqKey);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.consumer.ReceiptHandle;
import org.apache.rocketmq.common.message.MessageAccessor;
import org.apache.rocketmq.common.message.MessageClientExt;
import org.apache.rocketmq.common.message.MessageClientIDSetter;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
Expand Down Expand Up @@ -144,6 +146,7 @@ public CompletableFuture<PopResult> popMessage(
List<MessageExt> messageExtList = new ArrayList<>();
for (MessageExt messageExt : popResult.getMsgFoundList()) {
try {
fillUniqIDIfNeed(messageExt);
String handleString = createHandle(messageExt.getProperty(MessageConst.PROPERTY_POP_CK), messageExt.getCommitLogOffset());
if (handleString == null) {
log.error("[BUG] pop message from broker but handle is empty. requestHeader:{}, msg:{}", requestHeader, messageExt);
Expand Down Expand Up @@ -193,6 +196,15 @@ public CompletableFuture<PopResult> popMessage(
return FutureUtils.addExecutor(future, this.executor);
}

private void fillUniqIDIfNeed(MessageExt messageExt) {
if (StringUtils.isBlank(MessageClientIDSetter.getUniqID(messageExt))) {
if (messageExt instanceof MessageClientExt) {
MessageClientExt clientExt = (MessageClientExt) messageExt;
MessageAccessor.putProperty(messageExt, MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX, clientExt.getOffsetMsgId());
}
}
}

public CompletableFuture<AckResult> ackMessage(
ProxyContext ctx,
ReceiptHandle handle,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.rocketmq.common.consumer.ReceiptHandle;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageAccessor;
import org.apache.rocketmq.common.message.MessageClientIDSetter;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageId;
Expand Down Expand Up @@ -84,6 +85,9 @@ public CompletableFuture<List<SendResult>> sendMessage(ProxyContext ctx, QueueSe
throw new ProxyException(ProxyExceptionCode.FORBIDDEN, "no writable queue");
}

for (Message msg : messageList) {
MessageClientIDSetter.setUniqID(msg);
}
SendMessageRequestHeader requestHeader = buildSendMessageRequestHeader(messageList, producerGroup, sysFlag, messageQueue.getQueueId());

future = this.serviceManager.getMessageService().sendMessage(
Expand Down

0 comments on commit 90bf886

Please sign in to comment.