Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -333,4 +333,62 @@ public void testConsumerMetrics() throws Exception {
assertCounterValue(metrics, "pulsar.client.consumer.closed", 1, nsAttrs);
assertCounterValue(metrics, "pulsar.client.connection.closed", 1, Attributes.empty());
}
@Test
public void testConsumerAvailablePermitsMetrics() throws Exception {
String topic = newTopicName();
final int recvQueueSize = 100;

PulsarClient client = PulsarClient.builder()
.serviceUrl(pulsar.getBrokerServiceUrl())
.openTelemetry(otel)
.build();

@Cleanup
Producer<String> producer = client.newProducer(Schema.STRING)
.topic(topic)
.create();

Consumer<String> consumer = client.newConsumer(Schema.STRING)
.topic(topic)
.subscriptionName("my-sub")
.ackTimeout(1, TimeUnit.SECONDS)
.subscriptionType(SubscriptionType.Shared)
.receiverQueueSize(recvQueueSize)
.subscribe();

for (int i = 0; i < recvQueueSize; i++) {
producer.send("Hello");
}

Thread.sleep(1000);

Attributes nsAttrs = Attributes.builder()
.put("pulsar.tenant", "my-property")
.put("pulsar.namespace", "my-property/my-ns")
.put("pulsar.subscription", "my-sub")
.build();
var metrics = collectMetrics();

assertCounterValue(metrics, "pulsar.client.consumer.available_permits", 0, nsAttrs);
assertCounterValue(metrics, "pulsar.client.consumer.permit.remaining", recvQueueSize/2, nsAttrs);

Message<String> msg1 = consumer.receive();
metrics = collectMetrics();
consumer.acknowledge(msg1);
assertCounterValue(metrics, "pulsar.client.consumer.available_permits", 1, nsAttrs);
assertCounterValue(metrics, "pulsar.client.consumer.permit.remaining", recvQueueSize/2-1, nsAttrs);
assertCounterValue(metrics, "pulsar.client.consumer.permit.limit", recvQueueSize/2, nsAttrs);

// clear the queue
while (true) {
Message<String> msg = consumer.receive(1, TimeUnit.SECONDS);
if (msg == null) {
break;
}
}
metrics = collectMetrics();
assertCounterValue(metrics, "pulsar.client.consumer.available_permits", 0, nsAttrs);

client.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,9 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
private final Counter bytesReceivedCounter;
private final UpDownCounter messagesPrefetchedGauge;
private final UpDownCounter bytesPrefetchedGauge;
private final UpDownCounter messageAvailablePermitsGauge;
private UpDownCounter messagePermitRemainingGauge;
private UpDownCounter messagePermitLimitGauge;
private final Counter consumersOpenedCounter;
private final Counter consumersClosedCounter;
private final Counter consumerAcksCounter;
Expand Down Expand Up @@ -421,6 +424,17 @@ protected ConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurat
"The number of messages currently sitting in the consumer receive queue", topic, attrs);
bytesPrefetchedGauge = ip.newUpDownCounter("pulsar.client.consumer.receive_queue.size", Unit.Bytes,
"The total size in bytes of messages currently sitting in the consumer receive queue", topic, attrs);
messageAvailablePermitsGauge = ip.newUpDownCounter("pulsar.client.consumer.available_permits",
Unit.Messages, "The number of consumer available permits", topic, attrs);
messagePermitRemainingGauge = ip.newUpDownCounter("pulsar.client.consumer.permit.remaining",
Unit.Messages, "The number of consumer permit remaining", topic, attrs, measurement -> {
measurement.record(getCurrentReceiverQueueSize() / 2 - AVAILABLE_PERMITS_UPDATER.get(this),
messagePermitRemainingGauge.getAttributes());
});
messagePermitLimitGauge = ip.newUpDownCounter("pulsar.client.consumer.permit.limit",
Unit.Messages, "The number of consumer permit remaining", topic, attrs, measurement-> {
measurement.record(getCurrentReceiverQueueSize() / 2 , messagePermitLimitGauge.getAttributes());
});

consumerAcksCounter = ip.newCounter("pulsar.client.consumer.message.ack", Unit.Messages,
"The number of acknowledged messages", topic, attrs);
Expand Down Expand Up @@ -1149,6 +1163,8 @@ public CompletableFuture<Void> closeAsync() {
}
}));
}
messagePermitRemainingGauge.close();
messagePermitLimitGauge.close();
return FutureUtil.waitForAll(closeFutures);
}

Expand Down Expand Up @@ -1842,8 +1858,10 @@ void increaseAvailablePermits(ClientCnx currentCnx) {

protected void increaseAvailablePermits(ClientCnx currentCnx, int delta) {
int available = AVAILABLE_PERMITS_UPDATER.addAndGet(this, delta);
messageAvailablePermitsGauge.add(delta);
while (available >= getCurrentReceiverQueueSize() / 2 && !paused) {
if (AVAILABLE_PERMITS_UPDATER.compareAndSet(this, available, 0)) {
messageAvailablePermitsGauge.subtract(available);
sendFlowPermitsToBroker(currentCnx, available);
break;
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.metrics.Meter;
import io.opentelemetry.api.metrics.ObservableLongMeasurement;
import java.util.function.Consumer;
import org.apache.pulsar.PulsarVersion;

public class InstrumentProvider {
Expand All @@ -49,7 +51,11 @@ public Counter newCounter(String name, Unit unit, String description, String top

public UpDownCounter newUpDownCounter(String name, Unit unit, String description, String topic,
Attributes attributes) {
return new UpDownCounter(meter, name, unit, description, topic, attributes);
return new UpDownCounter(meter, name, unit, description, topic, attributes, null);
}
public UpDownCounter newUpDownCounter(String name, Unit unit, String description, String topic,
Attributes attributes, Consumer<ObservableLongMeasurement> callback) {
return new UpDownCounter(meter, name, unit, description, topic, attributes, callback);
}

public LatencyHistogram newLatencyHistogram(String name, String description, String topic, Attributes attributes) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,21 @@
import io.opentelemetry.api.metrics.LongUpDownCounter;
import io.opentelemetry.api.metrics.LongUpDownCounterBuilder;
import io.opentelemetry.api.metrics.Meter;
import io.opentelemetry.api.metrics.ObservableLongMeasurement;
import io.opentelemetry.api.metrics.ObservableLongUpDownCounter;
import java.util.function.Consumer;
import lombok.Getter;

public class UpDownCounter {
public class UpDownCounter implements AutoCloseable{

private final LongUpDownCounter counter;
private LongUpDownCounter counter;
private ObservableLongUpDownCounter observableCounter;

@Getter
private final Attributes attributes;

UpDownCounter(Meter meter, String name, Unit unit, String description, String topic, Attributes attributes) {
UpDownCounter(Meter meter, String name, Unit unit, String description, String topic, Attributes attributes,
Consumer<ObservableLongMeasurement> callback) {
LongUpDownCounterBuilder builder = meter.upDownCounterBuilder(name)
.setDescription(description)
.setUnit(unit.toString());
Expand All @@ -46,7 +54,11 @@ public class UpDownCounter {
attributes = getTopicAttributes(topic, attributes);
}

this.counter = builder.build();
if (callback != null) {
observableCounter = builder.buildWithCallback(callback);
} else {
this.counter = builder.build();
}
this.attributes = attributes;
}

Expand All @@ -65,4 +77,11 @@ public void add(long delta) {
public void subtract(long diff) {
add(-diff);
}

@Override
public void close(){
if (observableCounter != null) {
observableCounter.close();
}
}
}