Skip to content

Commit

Permalink
[pulsar-client] Fix pending queue-size stats for batch messages
Browse files Browse the repository at this point in the history
  • Loading branch information
rdhabalia committed Nov 10, 2021
1 parent ffccd46 commit 8ba55d8
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 5 deletions.
Expand Up @@ -33,9 +33,9 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import com.google.gson.Gson;
import org.apache.pulsar.broker.stats.NamespaceStats;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.awaitility.Awaitility;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.annotations.AfterMethod;
Expand All @@ -46,6 +46,7 @@
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.RateLimiter;
import com.google.gson.Gson;
import com.google.gson.JsonArray;
import com.google.gson.JsonObject;

Expand Down Expand Up @@ -78,6 +79,11 @@ public Object[][] ackTimeoutSecProvider() {
return new Object[][] { { 0, 0 }, { 0, 2 }, { 1000, 0 }, { 1000, 2 } };
}

@DataProvider(name = "batchingEnabled")
public Object[][] batchingEnabled() {
return new Object[][] { { true }, { false } };
}

@Test(dataProvider = "batch_with_timeout")
public void testSyncProducerAndConsumer(int batchMessageDelayMs, int ackTimeoutSec) throws Exception {
log.info("-- Starting {} test --", methodName);
Expand Down Expand Up @@ -427,14 +433,14 @@ public void testAddBrokerLatencyStats() throws Exception {
log.info("-- Exiting {} test --", methodName);
}

@Test
public void testProducerPendingQueueSizeStats() throws Exception {
@Test(dataProvider = "batchingEnabled")
public void testProducerPendingQueueSizeStats(boolean batchingEnabled) throws Exception {
log.info("-- Starting {} test --", methodName);
ProducerBuilder<byte[]> producerBuilder = pulsarClient.newProducer()
.topic("persistent://my-property/tp1/my-ns/my-topic1");

@Cleanup
Producer<byte[]> producer = producerBuilder.enableBatching(false).create();
Producer<byte[]> producer = producerBuilder.enableBatching(batchingEnabled).create();

stopBroker();

Expand All @@ -443,6 +449,8 @@ public void testProducerPendingQueueSizeStats() throws Exception {
String message = "my-message-" + i;
producer.sendAsync(message.getBytes());
}
Awaitility.await().timeout(2, TimeUnit.MINUTES)
.until(() -> producer.getStats().getPendingQueueSize() == numMessages);
assertEquals(producer.getStats().getPendingQueueSize(), numMessages);
}
}
Expand Up @@ -56,6 +56,7 @@
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.function.Consumer;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.mutable.MutableInt;
import org.apache.pulsar.client.api.BatcherBuilder;
import org.apache.pulsar.client.api.CompressionType;
import org.apache.pulsar.client.api.Message;
Expand Down Expand Up @@ -1924,7 +1925,14 @@ public String getConnectedSince() {
}

public int getPendingQueueSize() {
return pendingMessages.size();
if (!isBatchMessagingEnabled()) {
return pendingMessages.size();
}
MutableInt size = new MutableInt(0);
pendingMessages.forEach(op -> {
size.add(Math.max(op.numMessagesInBatch, 1));
});
return size.getValue();
}

@Override
Expand Down

0 comments on commit 8ba55d8

Please sign in to comment.