From a70818e150ed6578189a48e382fecbf7540828cf Mon Sep 17 00:00:00 2001 From: guyinyou <1094592600@qq.com> Date: Wed, 6 Jul 2022 12:56:26 +0800 Subject: [PATCH 1/2] Fix autoBatch not compatible with batchConsumeQueue --- .../org/apache/rocketmq/client/producer/ProduceAccumulator.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/ProduceAccumulator.java b/client/src/main/java/org/apache/rocketmq/client/producer/ProduceAccumulator.java index 6c5ea8531dc3..500631186f84 100644 --- a/client/src/main/java/org/apache/rocketmq/client/producer/ProduceAccumulator.java +++ b/client/src/main/java/org/apache/rocketmq/client/producer/ProduceAccumulator.java @@ -35,6 +35,7 @@ import org.apache.rocketmq.common.ServiceThread; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageBatch; +import org.apache.rocketmq.common.message.MessageClientIDSetter; import org.apache.rocketmq.common.message.MessageConst; import org.apache.rocketmq.common.message.MessageDecoder; import org.apache.rocketmq.common.message.MessageQueue; @@ -409,6 +410,7 @@ private MessageBatch batch() { messageBatch.setWaitStoreMsgOK(this.aggregateKey.waitStoreMsgOK); messageBatch.setKeys(this.keys); messageBatch.setTags(this.aggregateKey.tag); + MessageClientIDSetter.setUniqID(messageBatch); messageBatch.setBody(MessageDecoder.encodeMessages(this.messages)); return messageBatch; } From b82ee8334fb89ee2a1e6f761a3ef2564b9309cdc Mon Sep 17 00:00:00 2001 From: guyinyou <1094592600@qq.com> Date: Thu, 7 Jul 2022 11:35:10 +0800 Subject: [PATCH 2/2] Fix the problem of abnormal statistics under Bcq --- store/src/main/java/org/apache/rocketmq/store/CommitLog.java | 2 +- .../java/org/apache/rocketmq/store/DefaultMessageStore.java | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java index be013e9d456c..7baa9c6e6bc1 100644 --- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java +++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java @@ -907,7 +907,7 @@ public CompletableFuture asyncPutMessage(final MessageExtBroke PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result); // Statistics - storeStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).add(1); + storeStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).add(result.getMsgNum()); storeStatsService.getSinglePutMessageTopicSizeTotal(topic).add(result.getWroteBytes()); return handleDiskFlushAndHA(putMessageResult, msg, needAckNums, needHandleHA); diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java index 0da12a2321ba..05b486d8d785 100644 --- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java @@ -804,7 +804,7 @@ public GetMessageResult getMessage(final String group, final String topic, final continue; } - this.storeStatsService.getGetMessageTransferedMsgCount().add(1); + this.storeStatsService.getGetMessageTransferedMsgCount().add(cqUnit.getBatchNum()); getResult.addMessage(selectResult, cqUnit.getQueueOffset(), cqUnit.getBatchNum()); status = GetMessageStatus.FOUND; nextPhyFileStartOffset = Long.MIN_VALUE; @@ -2457,7 +2457,7 @@ private void doReput() { if (!DefaultMessageStore.this.getMessageStoreConfig().isDuplicationEnable() && DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE) { DefaultMessageStore.this.storeStatsService - .getSinglePutMessageTopicTimesTotal(dispatchRequest.getTopic()).add(1); + .getSinglePutMessageTopicTimesTotal(dispatchRequest.getTopic()).add(dispatchRequest.getBatchSize()); DefaultMessageStore.this.storeStatsService .getSinglePutMessageTopicSizeTotal(dispatchRequest.getTopic()) .add(dispatchRequest.getMsgSize());