From 110f06aa09b96d02e32d4db23fa0c39ea53211db Mon Sep 17 00:00:00 2001 From: "kaiyi.lk" Date: Fri, 4 Aug 2023 13:51:23 +0800 Subject: [PATCH] [ISSUE 7117] check message is in memory or not when init consumer offset for pop --- .../broker/processor/AckMessageProcessor.java | 1 - .../broker/processor/PopMessageProcessor.java | 40 ++++++++++++------- .../apache/rocketmq/common/BrokerConfig.java | 9 +++++ .../service/route/TopicRouteService.java | 2 +- 4 files changed, 36 insertions(+), 16 deletions(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java index 2140aa881cd..687811409e0 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java @@ -308,7 +308,6 @@ private void appendAck(final AckMessageRequestHeader requestHeader, final BatchA && putMessageResult.getPutMessageStatus() != PutMessageStatus.SLAVE_NOT_AVAILABLE) { POP_LOGGER.error("put ack msg error:" + putMessageResult); } - System.out.printf("put ack to store %s", ackMsg); PopMetricsManager.incPopReviveAckPutCount(ackMsg, putMessageResult.getPutMessageStatus()); brokerController.getPopInflightMessageCounter().decrementInFlightMessageNum(topic, consumeGroup, popTime, qId, ackCount); } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java index 53e17256140..441f7de08a1 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java @@ -639,20 +639,7 @@ private long getPopOffset(String topic, String group, int queueId, int initMode, long offset = this.brokerController.getConsumerOffsetManager().queryOffset(group, topic, queueId); if (offset < 0) { - if (ConsumeInitMode.MIN == initMode) { - offset = this.brokerController.getMessageStore().getMinOffsetInQueue(topic, queueId); - } else { - // pop last one,then commit offset. - offset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId) - 1; - // max & no consumer offset - if (offset < 0) { - offset = 0; - } - if (init) { - this.brokerController.getConsumerOffsetManager().commitOffset( - "getPopOffset", group, topic, queueId, offset); - } - } + offset = this.getInitOffset(topic, group, queueId, initMode, init); } if (checkResetOffset) { @@ -670,6 +657,31 @@ private long getPopOffset(String topic, String group, int queueId, int initMode, } } + private long getInitOffset(String topic, String group, int queueId, int initMode, boolean init) { + long offset; + if (ConsumeInitMode.MIN == initMode) { + return this.brokerController.getMessageStore().getMinOffsetInQueue(topic, queueId); + } else { + if (this.brokerController.getBrokerConfig().isInitPopOffsetByCheckMsgInMem() && + this.brokerController.getMessageStore().getMinOffsetInQueue(topic, queueId) <= 0 && + this.brokerController.getMessageStore().checkInMemByConsumeOffset(topic, queueId, 0, 1)) { + offset = 0; + } else { + // pop last one,then commit offset. + offset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId) - 1; + // max & no consumer offset + if (offset < 0) { + offset = 0; + } + } + if (init) { + this.brokerController.getConsumerOffsetManager().commitOffset( + "getPopOffset", group, topic, queueId, offset); + } + } + return offset; + } + public final MessageExtBrokerInner buildCkMsg(final PopCheckPoint ck, final int reviveQid) { MessageExtBrokerInner msgInner = new MessageExtBrokerInner(); diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java index 02c692e2b2a..a815636b185 100644 --- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java +++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java @@ -222,6 +222,7 @@ public class BrokerConfig extends BrokerIdentity { private int popCkOffsetMaxQueueSize = 20000; private boolean enablePopBatchAck = false; private boolean enableNotifyAfterPopOrderLockRelease = true; + private boolean initPopOffsetByCheckMsgInMem = true; private boolean realTimeNotifyConsumerChange = true; @@ -1264,6 +1265,14 @@ public void setEnableNotifyAfterPopOrderLockRelease(boolean enableNotifyAfterPop this.enableNotifyAfterPopOrderLockRelease = enableNotifyAfterPopOrderLockRelease; } + public boolean isInitPopOffsetByCheckMsgInMem() { + return initPopOffsetByCheckMsgInMem; + } + + public void setInitPopOffsetByCheckMsgInMem(boolean initPopOffsetByCheckMsgInMem) { + this.initPopOffsetByCheckMsgInMem = initPopOffsetByCheckMsgInMem; + } + public boolean isRealTimeNotifyConsumerChange() { return realTimeNotifyConsumerChange; } diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/TopicRouteService.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/TopicRouteService.java index b6b14faa492..e012a5465a4 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/TopicRouteService.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/TopicRouteService.java @@ -133,7 +133,7 @@ protected static boolean isTopicRouteValid(TopicRouteData routeData) { protected MessageQueueView buildMessageQueueView(String topic, TopicRouteData topicRouteData) { if (isTopicRouteValid(topicRouteData)) { MessageQueueView tmp = new MessageQueueView(topic, topicRouteData); - log.info("load topic route from namesrv. topic: {}, queue: {}", topic, tmp); + log.debug("load topic route from namesrv. topic: {}, queue: {}", topic, tmp); return tmp; } return MessageQueueView.WRAPPED_EMPTY_QUEUE;