From 2bc933ee71421f60879c99c7888c96e95d5c9386 Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Thu, 11 Aug 2022 07:12:21 +0300 Subject: [PATCH] [fix][broker] Increment topic stats outbound message counters after messages have been written to the TCP/IP connection (#17043) --- .../pulsar/broker/service/Consumer.java | 21 ++++++++++++------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java index 37dfe087e7fb4..5c7646921fbb4 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java @@ -332,14 +332,19 @@ public Future sendMessages(final List entries, EntryBatch topicName, subscription, ackedCount, totalMessages, consumerId, avgMessagesPerEntry.get()); } incrementUnackedMessages(unackedMessages); - msgOut.recordMultipleEvents(totalMessages, totalBytes); - msgOutCounter.add(totalMessages); - bytesOutCounter.add(totalBytes); - chunkedMessageRate.recordMultipleEvents(totalChunkedMessages, 0); - - - return cnx.getCommandSender().sendMessagesToConsumer(consumerId, topicName, subscription, partitionIdx, - entries, batchSizes, batchIndexesAcks, redeliveryTracker, epoch); + Future writeAndFlushPromise = + cnx.getCommandSender().sendMessagesToConsumer(consumerId, topicName, subscription, partitionIdx, + entries, batchSizes, batchIndexesAcks, redeliveryTracker, epoch); + writeAndFlushPromise.addListener(status -> { + // only increment counters after the messages have been successfully written to the TCP/IP connection + if (status.isSuccess()) { + msgOut.recordMultipleEvents(totalMessages, totalBytes); + msgOutCounter.add(totalMessages); + bytesOutCounter.add(totalBytes); + chunkedMessageRate.recordMultipleEvents(totalChunkedMessages, 0); + } + }); + return writeAndFlushPromise; } private void incrementUnackedMessages(int ackedMessages) {