Skip to content

Commit

Permalink
Add autoScaledReceiverQueueSize for PerformanceConsumer
Browse files Browse the repository at this point in the history
  • Loading branch information
Jason918 committed Apr 1, 2022
1 parent 2b1b98b commit 99e6099
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1514,7 +1514,7 @@ protected void setCurrentReceiverQueueSize(int newSize) {
checkArgument(newSize > 0, "receiver queue size should larger than 0");
if (log.isDebugEnabled()) {
log.debug("[{}][{}] setMaxReceiverQueueSize={}, previous={}", topic, subscription,
getCurrentReceiverQueueSize(), newSize);
newSize, getCurrentReceiverQueueSize());
}
CURRENT_RECEIVER_QUEUE_SIZE_UPDATER.set(this, newSize);
resumeReceivingFromPausedConsumersIfNeeded();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.transaction.Transaction;
import org.apache.pulsar.client.impl.ConsumerBase;
import org.apache.pulsar.client.impl.ConsumerImpl;
import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.testclient.utils.PaddingDecimalFormat;
import org.slf4j.Logger;
Expand Down Expand Up @@ -130,6 +133,10 @@ static class Arguments {
description = "Max total size of the receiver queue across partitions")
public int maxTotalReceiverQueueSizeAcrossPartitions = 50000;

@Parameter(names = {"-aq", "--auto-scaled-receiver-queue-size"},
description = "Enable autoScaledReceiverQueueSize")
public boolean autoScaledReceiverQueueSize = false;

@Parameter(names = { "--replicated" }, description = "Whether the subscription status should be replicated")
public boolean replicatedSubscription = false;

Expand Down Expand Up @@ -342,6 +349,8 @@ public static void main(String[] args) throws Exception {
ObjectWriter w = m.writerWithDefaultPrettyPrinter();
log.info("Starting Pulsar performance consumer with config: {}", w.writeValueAsString(arguments));

final Recorder qRecorder = arguments.autoScaledReceiverQueueSize
? new Recorder(arguments.receiverQueueSize, 5) : null;
final RateLimiter limiter = arguments.rate > 0 ? RateLimiter.create(arguments.rate) : null;
long startTime = System.nanoTime();
long testEndTime = startTime + (long) (arguments.testTime * 1e9);
Expand Down Expand Up @@ -394,6 +403,9 @@ public static void main(String[] args) throws Exception {
thread.interrupt();
}
}
if (qRecorder != null) {
qRecorder.recordValue(((ConsumerBase<?>) consumer).getTotalIncomingMessages());
}
messagesReceived.increment();
bytesReceived.add(msg.size());

Expand Down Expand Up @@ -500,7 +512,8 @@ public static void main(String[] args) throws Exception {
.autoAckOldestChunkedMessageOnQueueFull(arguments.autoAckOldestChunkedMessageOnQueueFull)
.enableBatchIndexAcknowledgment(arguments.batchIndexAck)
.poolMessages(arguments.poolMessages)
.replicateSubscriptionState(arguments.replicatedSubscription);
.replicateSubscriptionState(arguments.replicatedSubscription)
.autoScaledReceiverQueueSizeEnabled(arguments.autoScaledReceiverQueueSize);
if (arguments.maxPendingChunkedMessage > 0) {
consumerBuilder.maxPendingChunkedMessage(arguments.maxPendingChunkedMessage);
}
Expand Down Expand Up @@ -543,6 +556,7 @@ public static void main(String[] args) throws Exception {
long oldTime = System.nanoTime();

Histogram reportHistogram = null;
Histogram qHistogram = null;
HistogramLogWriter histogramLogWriter = null;

if (arguments.histogramFile != null) {
Expand Down Expand Up @@ -596,6 +610,28 @@ public static void main(String[] args) throws Exception {
reportHistogram.getValueAtPercentile(99), reportHistogram.getValueAtPercentile(99.9),
reportHistogram.getValueAtPercentile(99.99), reportHistogram.getMaxValue());

if (arguments.autoScaledReceiverQueueSize && log.isDebugEnabled() && qRecorder != null) {
qHistogram = qRecorder.getIntervalHistogram(qHistogram);
log.debug("ReceiverQueueUsage: cnt={},mean={}, min={},max={},25pct={},50pct={},75pct={}",
qHistogram.getTotalCount(), dec.format(qHistogram.getMean()),
qHistogram.getMinValue(), qHistogram.getMaxValue(),
qHistogram.getValueAtPercentile(25),
qHistogram.getValueAtPercentile(50),
qHistogram.getValueAtPercentile(75)
);
qHistogram.reset();
for (Future<Consumer<ByteBuffer>> future : futures) {
ConsumerBase<?> consumerBase = (ConsumerBase<?>) future.get();
log.debug("[{}] CurrentReceiverQueueSize={}", consumerBase.getConsumerName(),
consumerBase.getCurrentReceiverQueueSize());
if (consumerBase instanceof MultiTopicsConsumerImpl) {
for (ConsumerImpl<?> consumer : ((MultiTopicsConsumerImpl<?>) consumerBase).getConsumers()) {
log.debug("[{}] SubConsumer.CurrentReceiverQueueSize={}", consumer.getConsumerName(),
consumer.getCurrentReceiverQueueSize());
}
}
}
}
if (histogramLogWriter != null) {
histogramLogWriter.outputIntervalHistogram(reportHistogram);
}
Expand Down

0 comments on commit 99e6099

Please sign in to comment.