Skip to content

Commit

Permalink
Add pulsar_subscription_consumers_count metric (#15042)
Browse files Browse the repository at this point in the history
Fixes #15032

### Modifications

Added metric `pulsar_subscription_consumers_count`
  • Loading branch information
cbornet committed Apr 19, 2022
1 parent 32d7a51 commit 789fa50
Show file tree
Hide file tree
Showing 4 changed files with 6 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,5 +60,7 @@ public class AggregatedSubscriptionStats {

double msgDropRate;

long consumersCount;

public Map<Consumer, AggregatedConsumerStats> consumerStat = new HashMap<>();
}
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ private static void aggregateTopicStats(TopicStats stats, SubscriptionStatsImpl
subsStats.lastConsumedFlowTimestamp = subscriptionStats.lastConsumedFlowTimestamp;
subsStats.lastConsumedTimestamp = subscriptionStats.lastConsumedTimestamp;
subsStats.lastMarkDeleteAdvancedTimestamp = subscriptionStats.lastMarkDeleteAdvancedTimestamp;
subsStats.consumersCount = subscriptionStats.consumers.size();
subscriptionStats.consumers.forEach(cStats -> {
stats.consumersCount++;
subsStats.unackedMessages += cStats.unackedMessages;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,8 @@ static void printTopicStats(SimpleTextOutputStream stream, String cluster, Strin
subsStats.totalMsgExpired, splitTopicAndPartitionIndexLabel);
metric(stream, cluster, namespace, topic, n, "pulsar_subscription_msg_drop_rate",
subsStats.msgDropRate, splitTopicAndPartitionIndexLabel);
metric(stream, cluster, namespace, topic, n, "pulsar_subscription_consumers_count",
subsStats.consumersCount, splitTopicAndPartitionIndexLabel);
subsStats.consumerStat.forEach((c, consumerStats) -> {
metric(stream, cluster, namespace, topic, n, c.consumerName(), c.consumerId(),
"pulsar_consumer_msg_rate_redeliver", consumerStats.msgRateRedeliver,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -527,6 +527,7 @@ public void testNonPersistentSubMetrics() throws Exception {
assertTrue(metrics.containsKey("pulsar_out_messages_total"));
assertTrue(metrics.containsKey("pulsar_subscription_last_expire_timestamp"));
assertTrue(metrics.containsKey("pulsar_subscription_msg_drop_rate"));
assertTrue(metrics.containsKey("pulsar_subscription_consumers_count"));
}

@Test
Expand Down

0 comments on commit 789fa50

Please sign in to comment.