From 90bf886340215b1c45e43bf740c67317fdf9665e Mon Sep 17 00:00:00 2001 From: lk Date: Mon, 27 Feb 2023 12:56:26 +0800 Subject: [PATCH] [ISSUE #6192] Set a default value when UniqID is empty in Proxy (#6193) --- .../rocketmq/proxy/grpc/v2/common/GrpcConverter.java | 2 +- .../rocketmq/proxy/processor/ConsumerProcessor.java | 12 ++++++++++++ .../rocketmq/proxy/processor/ProducerProcessor.java | 4 ++++ 3 files changed, 17 insertions(+), 1 deletion(-) diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/common/GrpcConverter.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/common/GrpcConverter.java index 96a214750a9..21526054a5f 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/common/GrpcConverter.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/common/GrpcConverter.java @@ -139,7 +139,7 @@ protected SystemProperties buildSystemProperties(MessageExt messageExt) { } // message_id - String uniqKey = messageExt.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX); + String uniqKey = messageExt.getMsgId(); if (uniqKey != null) { systemPropertiesBuilder.setMessageId(uniqKey); } diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ConsumerProcessor.java b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ConsumerProcessor.java index 37c2e54d6dd..d67f4b855d9 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ConsumerProcessor.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ConsumerProcessor.java @@ -34,6 +34,8 @@ import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.consumer.ReceiptHandle; import org.apache.rocketmq.common.message.MessageAccessor; +import org.apache.rocketmq.common.message.MessageClientExt; +import org.apache.rocketmq.common.message.MessageClientIDSetter; import org.apache.rocketmq.common.message.MessageConst; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageQueue; @@ -144,6 +146,7 @@ public CompletableFuture popMessage( List messageExtList = new ArrayList<>(); for (MessageExt messageExt : popResult.getMsgFoundList()) { try { + fillUniqIDIfNeed(messageExt); String handleString = createHandle(messageExt.getProperty(MessageConst.PROPERTY_POP_CK), messageExt.getCommitLogOffset()); if (handleString == null) { log.error("[BUG] pop message from broker but handle is empty. requestHeader:{}, msg:{}", requestHeader, messageExt); @@ -193,6 +196,15 @@ public CompletableFuture popMessage( return FutureUtils.addExecutor(future, this.executor); } + private void fillUniqIDIfNeed(MessageExt messageExt) { + if (StringUtils.isBlank(MessageClientIDSetter.getUniqID(messageExt))) { + if (messageExt instanceof MessageClientExt) { + MessageClientExt clientExt = (MessageClientExt) messageExt; + MessageAccessor.putProperty(messageExt, MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX, clientExt.getOffsetMsgId()); + } + } + } + public CompletableFuture ackMessage( ProxyContext ctx, ReceiptHandle handle, diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ProducerProcessor.java b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ProducerProcessor.java index 2fce78d31c5..749f9da2bec 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ProducerProcessor.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ProducerProcessor.java @@ -28,6 +28,7 @@ import org.apache.rocketmq.common.consumer.ReceiptHandle; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageAccessor; +import org.apache.rocketmq.common.message.MessageClientIDSetter; import org.apache.rocketmq.common.message.MessageConst; import org.apache.rocketmq.common.message.MessageDecoder; import org.apache.rocketmq.common.message.MessageId; @@ -84,6 +85,9 @@ public CompletableFuture> sendMessage(ProxyContext ctx, QueueSe throw new ProxyException(ProxyExceptionCode.FORBIDDEN, "no writable queue"); } + for (Message msg : messageList) { + MessageClientIDSetter.setUniqID(msg); + } SendMessageRequestHeader requestHeader = buildSendMessageRequestHeader(messageList, producerGroup, sysFlag, messageQueue.getQueueId()); future = this.serviceManager.getMessageService().sendMessage(