diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java index 37dfe087e7fb4..5c7646921fbb4 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java @@ -332,14 +332,19 @@ public Future sendMessages(final List entries, EntryBatch topicName, subscription, ackedCount, totalMessages, consumerId, avgMessagesPerEntry.get()); } incrementUnackedMessages(unackedMessages); - msgOut.recordMultipleEvents(totalMessages, totalBytes); - msgOutCounter.add(totalMessages); - bytesOutCounter.add(totalBytes); - chunkedMessageRate.recordMultipleEvents(totalChunkedMessages, 0); - - - return cnx.getCommandSender().sendMessagesToConsumer(consumerId, topicName, subscription, partitionIdx, - entries, batchSizes, batchIndexesAcks, redeliveryTracker, epoch); + Future writeAndFlushPromise = + cnx.getCommandSender().sendMessagesToConsumer(consumerId, topicName, subscription, partitionIdx, + entries, batchSizes, batchIndexesAcks, redeliveryTracker, epoch); + writeAndFlushPromise.addListener(status -> { + // only increment counters after the messages have been successfully written to the TCP/IP connection + if (status.isSuccess()) { + msgOut.recordMultipleEvents(totalMessages, totalBytes); + msgOutCounter.add(totalMessages); + bytesOutCounter.add(totalBytes); + chunkedMessageRate.recordMultipleEvents(totalChunkedMessages, 0); + } + }); + return writeAndFlushPromise; } private void incrementUnackedMessages(int ackedMessages) {