From f714ef739aa0c80546a6df0ae08efd9d13c57005 Mon Sep 17 00:00:00 2001 From: zhh Date: Wed, 14 Aug 2019 15:45:27 +0800 Subject: [PATCH] Want to support reconsume part messages --- .../consumer/ConsumeMessageConcurrentlyService.java | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java index 258e4dbf877..710272f2cac 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java @@ -268,9 +268,16 @@ public void processConsumeResult( this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), failed); break; case RECONSUME_LATER: - ackIndex = -1; - this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), - consumeRequest.getMsgs().size()); + if(ackIndex>consumeRequest.getMsgs().size()){ + ackIndex=-1; + this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup,consumeRequest.getMessageQueue().getTopic(), + consumeRequest.getMsgs().size()); + }else{ + ackIndex=ackIndex-2; + int reconsume=consumeRequest.getMsgs().size()-ackIndex-1; + this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), + reconsume); + } break; default: break;