diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java index ec7ebfd6848e0..0b64a68c9464c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java @@ -99,6 +99,9 @@ void updateStats(TopicStats stats) { replStats.msgThroughputIn += as.msgThroughputIn; replStats.msgThroughputOut += as.msgThroughputOut; replStats.replicationBacklog += as.replicationBacklog; + replStats.msgRateExpired += as.msgRateExpired; + replStats.connectedCount += as.connectedCount; + replStats.replicationDelayInSeconds += as.replicationDelayInSeconds; }); stats.subscriptionStats.forEach((n, as) -> { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedReplicationStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedReplicationStats.java index ac0b41a53d5ce..ca92d557143e5 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedReplicationStats.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedReplicationStats.java @@ -19,6 +19,8 @@ package org.apache.pulsar.broker.stats.prometheus; public class AggregatedReplicationStats { + + /** Total rate of messages received from the remote cluster (msg/s). */ public double msgRateIn; /** Total throughput received from the remote cluster. bytes/s */ @@ -30,7 +32,16 @@ public class AggregatedReplicationStats { /** Total throughput delivered to the replication-subscriber. bytes/s */ public double msgThroughputOut; - /** Number of messages pending to be replicated to remote cluster */ + /** Total rate of messages expired (msg/s). */ + public double msgRateExpired; + + /** Number of messages pending to be replicated to remote cluster. */ public long replicationBacklog; + /** The count of replication-subscriber up and running to replicate to remote cluster. */ + public long connectedCount; + + /** Time in seconds from the time a message was produced to the time when it is about to be replicated. */ + public long replicationDelayInSeconds; + } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java index cf89ea2b24471..fcdbbcebc2d30 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java @@ -196,6 +196,11 @@ private static void getTopicStats(Topic topic, TopicStats stats, boolean include aggReplStats.msgRateOut += replStats.msgRateOut; aggReplStats.msgThroughputOut += replStats.msgThroughputOut; aggReplStats.replicationBacklog += replStats.replicationBacklog; + aggReplStats.msgRateIn += replStats.msgRateIn; + aggReplStats.msgThroughputIn += replStats.msgThroughputIn; + aggReplStats.msgRateExpired += replStats.msgRateExpired; + aggReplStats.connectedCount += replStats.connected ? 1 : 0; + aggReplStats.replicationDelayInSeconds += replStats.replicationDelayInSeconds; }); } @@ -309,6 +314,12 @@ private static void printNamespaceStats(SimpleTextOutputStream stream, String cl replStats.msgThroughputOut); metricWithRemoteCluster(stream, cluster, namespace, "pulsar_replication_backlog", remoteCluster, replStats.replicationBacklog); + metricWithRemoteCluster(stream, cluster, namespace, "pulsar_replication_connected_count", remoteCluster, + replStats.connectedCount); + metricWithRemoteCluster(stream, cluster, namespace, "pulsar_replication_rate_expired", remoteCluster, + replStats.msgRateExpired); + metricWithRemoteCluster(stream, cluster, namespace, "pulsar_replication_delay_in_seconds", + remoteCluster, replStats.replicationDelayInSeconds); }); } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java index 1d39d588127a3..5bb37ffc36bf3 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java @@ -215,6 +215,12 @@ static void printTopicStats(SimpleTextOutputStream stream, String cluster, Strin replStats.msgThroughputOut); metricWithRemoteCluster(stream, cluster, namespace, topic, "pulsar_replication_backlog", remoteCluster, replStats.replicationBacklog); + metricWithRemoteCluster(stream, cluster, namespace, topic, "pulsar_replication_connected_count", + remoteCluster, replStats.connectedCount); + metricWithRemoteCluster(stream, cluster, namespace, topic, "pulsar_replication_rate_expired", + remoteCluster, replStats.msgRateExpired); + metricWithRemoteCluster(stream, cluster, namespace, topic, "pulsar_replication_delay_in_seconds", + remoteCluster, replStats.replicationDelayInSeconds); }); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStatsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStatsTest.java index c01173ee2511c..07c8f2446cee9 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStatsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStatsTest.java @@ -48,6 +48,9 @@ public void testSimpleAggregation() throws Exception { replStats1.msgRateOut = 2.0; replStats1.msgThroughputOut = 256.0; replStats1.replicationBacklog = 1; + replStats1.connectedCount = 0; + replStats1.msgRateExpired = 3.0; + replStats1.replicationDelayInSeconds = 20; topicStats1.replicationStats.put(namespace, replStats1); AggregatedSubscriptionStats subStats1 = new AggregatedSubscriptionStats(); @@ -76,6 +79,9 @@ public void testSimpleAggregation() throws Exception { replStats2.msgRateOut = 10.5; replStats2.msgThroughputOut = 1536.0; replStats2.replicationBacklog = 99; + replStats2.connectedCount = 1; + replStats2.msgRateExpired = 3.0; + replStats2.replicationDelayInSeconds = 20; topicStats2.replicationStats.put(namespace, replStats2); AggregatedSubscriptionStats subStats2 = new AggregatedSubscriptionStats(); @@ -109,6 +115,9 @@ public void testSimpleAggregation() throws Exception { assertEquals(nsReplStats.msgRateOut, 12.5); assertEquals(nsReplStats.msgThroughputOut, 1792.0); assertEquals(nsReplStats.replicationBacklog, 100); + assertEquals(nsReplStats.connectedCount, 1); + assertEquals(nsReplStats.msgRateExpired, 6.0); + assertEquals(nsReplStats.replicationDelayInSeconds, 40); AggregatedSubscriptionStats nsSubStats = nsStats.subscriptionStats.get(namespace); assertNotNull(nsSubStats); diff --git a/site2/docs/reference-metrics.md b/site2/docs/reference-metrics.md index 02780bc887a18..08f9a35ae674b 100644 --- a/site2/docs/reference-metrics.md +++ b/site2/docs/reference-metrics.md @@ -149,6 +149,10 @@ All the replication metrics are also labelled with `remoteCluster=${pulsar_remot | pulsar_replication_throughput_in | Gauge | The total throughput of the namespace replicating from remote cluster (bytes/second). | | pulsar_replication_throughput_out | Gauge | The total throughput of the namespace replicating to remote cluster (bytes/second). | | pulsar_replication_backlog | Gauge | The total backlog of the namespace replicating to remote cluster (messages). | +| pulsar_replication_rate_expired | Gauge | Total rate of messages expired (messages/second). | +| pulsar_replication_connected_count | Gauge | The count of replication-subscriber up and running to replicate to remote cluster. | +| pulsar_replication_delay_in_seconds | Gauge | Time in seconds from the time a message was produced to the time when it is about to be replicated. | +~~~~ ### Topic metrics