diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedSubscriptionStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedSubscriptionStats.java index c829be28e596a..b8e7c36133961 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedSubscriptionStats.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedSubscriptionStats.java @@ -60,5 +60,7 @@ public class AggregatedSubscriptionStats { double msgDropRate; + long consumersCount; + public Map consumerStat = new HashMap<>(); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java index a3a0fcda44512..8c9f8e3271548 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java @@ -126,6 +126,7 @@ private static void aggregateTopicStats(TopicStats stats, SubscriptionStatsImpl subsStats.lastConsumedFlowTimestamp = subscriptionStats.lastConsumedFlowTimestamp; subsStats.lastConsumedTimestamp = subscriptionStats.lastConsumedTimestamp; subsStats.lastMarkDeleteAdvancedTimestamp = subscriptionStats.lastMarkDeleteAdvancedTimestamp; + subsStats.consumersCount = subscriptionStats.consumers.size(); subscriptionStats.consumers.forEach(cStats -> { stats.consumersCount++; subsStats.unackedMessages += cStats.unackedMessages; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java index 35dbdba274ac0..b45a09f06f179 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java @@ -268,6 +268,8 @@ static void printTopicStats(SimpleTextOutputStream stream, String cluster, Strin subsStats.totalMsgExpired, splitTopicAndPartitionIndexLabel); metric(stream, cluster, namespace, topic, n, "pulsar_subscription_msg_drop_rate", subsStats.msgDropRate, splitTopicAndPartitionIndexLabel); + metric(stream, cluster, namespace, topic, n, "pulsar_subscription_consumers_count", + subsStats.consumersCount, splitTopicAndPartitionIndexLabel); subsStats.consumerStat.forEach((c, consumerStats) -> { metric(stream, cluster, namespace, topic, n, c.consumerName(), c.consumerId(), "pulsar_consumer_msg_rate_redeliver", consumerStats.msgRateRedeliver, diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java index b62a7529fe389..2a6754bfb1c37 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java @@ -527,6 +527,7 @@ public void testNonPersistentSubMetrics() throws Exception { assertTrue(metrics.containsKey("pulsar_out_messages_total")); assertTrue(metrics.containsKey("pulsar_subscription_last_expire_timestamp")); assertTrue(metrics.containsKey("pulsar_subscription_msg_drop_rate")); + assertTrue(metrics.containsKey("pulsar_subscription_consumers_count")); } @Test