diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageFetcher.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageFetcher.java index 766ff64f6cc..c948fa3fa17 100644 --- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageFetcher.java +++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageFetcher.java @@ -319,7 +319,7 @@ public CompletableFuture getMessageFromCacheAsync(CompositeQue } // if cache is miss, immediately pull messages - LOGGER.warn("TieredMessageFetcher#getMessageFromCacheAsync: cache miss: " + + LOGGER.info("TieredMessageFetcher#getMessageFromCacheAsync: cache miss: " + "topic: {}, queue: {}, queue offset: {}, max message num: {}", mq.getTopic(), mq.getQueueId(), queueOffset, maxCount); diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java index 9fb1b2f01cb..d7d13d61e2e 100644 --- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java +++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java @@ -147,6 +147,11 @@ public GetMessageResult getMessage(String group, String topic, int queueId, long public CompletableFuture getMessageAsync(String group, String topic, int queueId, long offset, int maxMsgNums, MessageFilter messageFilter) { + // For system topic, force reading from local store + if (TieredStoreUtil.isSystemTopic(topic) || PopAckConstants.isStartWithRevivePrefix(topic)) { + return next.getMessageAsync(group, topic, queueId, offset, maxMsgNums, messageFilter); + } + if (fetchFromCurrentStore(topic, queueId, offset, maxMsgNums)) { logger.trace("GetMessageAsync from current store, topic: {}, queue: {}, offset: {}", topic, queueId, offset); } else { @@ -158,6 +163,7 @@ public CompletableFuture getMessageAsync(String group, String return fetcher .getMessageAsync(group, topic, queueId, offset, maxMsgNums, messageFilter) .thenApply(result -> { + Attributes latencyAttributes = TieredStoreMetricsManager.newAttributesBuilder() .put(TieredStoreMetricsConstant.LABEL_OPERATION, TieredStoreMetricsConstant.OPERATION_API_GET_MESSAGE) .put(TieredStoreMetricsConstant.LABEL_TOPIC, topic) @@ -166,8 +172,7 @@ public CompletableFuture getMessageAsync(String group, String TieredStoreMetricsManager.apiLatency.record(stopwatch.elapsed(TimeUnit.MILLISECONDS), latencyAttributes); if (result.getStatus() == GetMessageStatus.OFFSET_FOUND_NULL || - result.getStatus() == GetMessageStatus.OFFSET_OVERFLOW_ONE || - result.getStatus() == GetMessageStatus.OFFSET_OVERFLOW_BADLY) { + result.getStatus() == GetMessageStatus.NO_MATCHED_LOGIC_QUEUE) { if (next.checkInStoreByConsumeOffset(topic, queueId, offset)) { TieredStoreMetricsManager.fallbackTotal.add(1, latencyAttributes); @@ -178,14 +183,8 @@ public CompletableFuture getMessageAsync(String group, String } } - // Fetch system topic data from the broker when using the force level. - if (result.getStatus() == GetMessageStatus.NO_MATCHED_LOGIC_QUEUE) { - if (TieredStoreUtil.isSystemTopic(topic) || PopAckConstants.isStartWithRevivePrefix(topic)) { - return next.getMessage(group, topic, queueId, offset, maxMsgNums, messageFilter); - } - } - if (result.getStatus() != GetMessageStatus.FOUND && + result.getStatus() != GetMessageStatus.NO_MATCHED_LOGIC_QUEUE && result.getStatus() != GetMessageStatus.OFFSET_OVERFLOW_ONE && result.getStatus() != GetMessageStatus.OFFSET_OVERFLOW_BADLY) { logger.warn("GetMessageAsync not found and message is not in next store, result: {}, " + @@ -206,10 +205,14 @@ public CompletableFuture getMessageAsync(String group, String if (minOffsetInQueue >= 0 && minOffsetInQueue < result.getMinOffset()) { result.setMinOffset(minOffsetInQueue); } - long maxOffsetInQueue = next.getMaxOffsetInQueue(topic, queueId); - if (maxOffsetInQueue >= 0 && maxOffsetInQueue > result.getMaxOffset()) { - result.setMaxOffset(maxOffsetInQueue); - } + + // In general, the local cq offset is slightly greater than the commit offset in read message, + // so there is no need to update the maximum offset to the local cq offset here, + // otherwise it will cause repeated consumption after next begin offset over commit offset. + + logger.trace("GetMessageAsync result, group: {}, topic: {}, queueId: {}, offset: {}, count:{}, {}", + group, topic, queueId, offset, maxMsgNums, result); + return result; }).exceptionally(e -> { logger.error("GetMessageAsync from tiered store failed", e);