diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java index 258e4dbf877..710272f2cac 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java @@ -268,9 +268,16 @@ public void processConsumeResult( this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), failed); break; case RECONSUME_LATER: - ackIndex = -1; - this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), - consumeRequest.getMsgs().size()); + if(ackIndex>consumeRequest.getMsgs().size()){ + ackIndex=-1; + this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup,consumeRequest.getMessageQueue().getTopic(), + consumeRequest.getMsgs().size()); + }else{ + ackIndex=ackIndex-2; + int reconsume=consumeRequest.getMsgs().size()-ackIndex-1; + this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), + reconsume); + } break; default: break;