Skip to content

Commit

Permalink
non-persistent topic metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
gavingaozhangmin committed Jan 19, 2022
1 parent 4731bfe commit aca0eec
Show file tree
Hide file tree
Showing 4 changed files with 110 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -58,5 +58,7 @@ public class AggregatedSubscriptionStats {

long totalMsgExpired;

double msgDropRate;

public Map<Consumer, AggregatedConsumerStats> consumerStat = new HashMap<>();
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.stats.ConsumerStatsImpl;
import org.apache.pulsar.common.policies.data.stats.NonPersistentTopicStatsImpl;
import org.apache.pulsar.common.policies.data.stats.ReplicatorStatsImpl;
import org.apache.pulsar.common.policies.data.stats.TopicStatsImpl;
import org.apache.pulsar.common.util.SimpleTextOutputStream;
Expand Down Expand Up @@ -141,7 +142,6 @@ private static void getTopicStats(Topic topic, TopicStats stats, boolean include
stats.managedLedgerStats.storageWriteRate = mlStats.getAddEntryMessagesRate();
stats.managedLedgerStats.storageReadRate = mlStats.getReadEntriesRate();
}

TopicStatsImpl tStatus = topic.getStats(getPreciseBacklog, subscriptionBacklogSize, false);
stats.msgInCounter = tStatus.msgInCounter;
stats.bytesInCounter = tStatus.bytesInCounter;
Expand Down Expand Up @@ -174,37 +174,73 @@ private static void getTopicStats(Topic topic, TopicStats stats, boolean include
}
});

tStatus.subscriptions.forEach((subName, subscriptionStats) -> {
stats.subscriptionsCount++;
stats.msgBacklog += subscriptionStats.msgBacklog;

AggregatedSubscriptionStats subsStats = stats.subscriptionStats
.computeIfAbsent(subName, k -> new AggregatedSubscriptionStats());
subsStats.msgBacklog = subscriptionStats.msgBacklog;
subsStats.msgDelayed = subscriptionStats.msgDelayed;
subsStats.msgRateExpired = subscriptionStats.msgRateExpired;
subsStats.totalMsgExpired = subscriptionStats.totalMsgExpired;
subsStats.msgBacklogNoDelayed = subsStats.msgBacklog - subsStats.msgDelayed;
subsStats.lastExpireTimestamp = subscriptionStats.lastExpireTimestamp;
subsStats.lastAckedTimestamp = subscriptionStats.lastAckedTimestamp;
subsStats.lastConsumedFlowTimestamp = subscriptionStats.lastConsumedFlowTimestamp;
subsStats.lastConsumedTimestamp = subscriptionStats.lastConsumedTimestamp;
subsStats.lastMarkDeleteAdvancedTimestamp = subscriptionStats.lastMarkDeleteAdvancedTimestamp;
subscriptionStats.consumers.forEach(cStats -> {
stats.consumersCount++;
subsStats.unackedMessages += cStats.unackedMessages;
subsStats.msgRateRedeliver += cStats.msgRateRedeliver;
subsStats.msgRateOut += cStats.msgRateOut;
subsStats.msgThroughputOut += cStats.msgThroughputOut;
subsStats.bytesOutCounter += cStats.bytesOutCounter;
subsStats.msgOutCounter += cStats.msgOutCounter;
if (!subsStats.blockedSubscriptionOnUnackedMsgs && cStats.blockedConsumerOnUnackedMsgs) {
subsStats.blockedSubscriptionOnUnackedMsgs = true;
}
if (topic instanceof PersistentTopic) {
tStatus.subscriptions.forEach((subName, subscriptionStats) -> {
stats.subscriptionsCount++;
stats.msgBacklog += subscriptionStats.msgBacklog;

AggregatedSubscriptionStats subsStats = stats.subscriptionStats
.computeIfAbsent(subName, k -> new AggregatedSubscriptionStats());
subsStats.msgBacklog = subscriptionStats.msgBacklog;
subsStats.msgDelayed = subscriptionStats.msgDelayed;
subsStats.msgRateExpired = subscriptionStats.msgRateExpired;
subsStats.totalMsgExpired = subscriptionStats.totalMsgExpired;
subsStats.msgBacklogNoDelayed = subsStats.msgBacklog - subsStats.msgDelayed;
subsStats.lastExpireTimestamp = subscriptionStats.lastExpireTimestamp;
subsStats.lastAckedTimestamp = subscriptionStats.lastAckedTimestamp;
subsStats.lastConsumedFlowTimestamp = subscriptionStats.lastConsumedFlowTimestamp;
subsStats.lastConsumedTimestamp = subscriptionStats.lastConsumedTimestamp;
subsStats.lastMarkDeleteAdvancedTimestamp = subscriptionStats.lastMarkDeleteAdvancedTimestamp;
subscriptionStats.consumers.forEach(cStats -> {
stats.consumersCount++;
subsStats.unackedMessages += cStats.unackedMessages;
subsStats.msgRateRedeliver += cStats.msgRateRedeliver;
subsStats.msgRateOut += cStats.msgRateOut;
subsStats.msgThroughputOut += cStats.msgThroughputOut;
subsStats.bytesOutCounter += cStats.bytesOutCounter;
subsStats.msgOutCounter += cStats.msgOutCounter;
if (!subsStats.blockedSubscriptionOnUnackedMsgs && cStats.blockedConsumerOnUnackedMsgs) {
subsStats.blockedSubscriptionOnUnackedMsgs = true;
}
});
stats.rateOut += subsStats.msgRateOut;
stats.throughputOut += subsStats.msgThroughputOut;
});
stats.rateOut += subsStats.msgRateOut;
stats.throughputOut += subsStats.msgThroughputOut;
});
} else {
((NonPersistentTopicStatsImpl) tStatus).getNonPersistentSubscriptions()
.forEach((subName, subscriptionStats) -> {
stats.subscriptionsCount++;
stats.msgBacklog += subscriptionStats.getMsgBacklog();

AggregatedSubscriptionStats subsStats = stats.subscriptionStats
.computeIfAbsent(subName, k -> new AggregatedSubscriptionStats());
subsStats.msgBacklog = subscriptionStats.getMsgBacklog();
subsStats.msgDelayed = subscriptionStats.getMsgDelayed();
subsStats.msgRateExpired = subscriptionStats.getMsgRateExpired();
subsStats.totalMsgExpired = subscriptionStats.getTotalMsgExpired();
subsStats.msgBacklogNoDelayed = subsStats.msgBacklog - subsStats.msgDelayed;
subsStats.lastExpireTimestamp = subscriptionStats.getLastExpireTimestamp();
subsStats.lastAckedTimestamp = subscriptionStats.getLastAckedTimestamp();
subsStats.lastConsumedFlowTimestamp = subscriptionStats.getLastConsumedFlowTimestamp();
subsStats.lastConsumedTimestamp = subscriptionStats.getLastConsumedTimestamp();
subsStats.lastMarkDeleteAdvancedTimestamp = subscriptionStats.getLastMarkDeleteAdvancedTimestamp();
subsStats.msgDropRate = subscriptionStats.getMsgDropRate();
subscriptionStats.getConsumers().forEach(cStats -> {
stats.consumersCount++;
subsStats.unackedMessages += cStats.getUnackedMessages();
subsStats.msgRateRedeliver += cStats.getMsgRateRedeliver();
subsStats.msgRateOut += cStats.getMsgRateOut();
subsStats.msgThroughputOut += cStats.getMsgThroughputOut();
subsStats.bytesOutCounter += cStats.getBytesOutCounter();
subsStats.msgOutCounter += cStats.getMsgOutCounter();
if (!subsStats.blockedSubscriptionOnUnackedMsgs && cStats.isBlockedConsumerOnUnackedMsgs()) {
subsStats.blockedSubscriptionOnUnackedMsgs = true;
}
});
stats.rateOut += subsStats.msgRateOut;
stats.throughputOut += subsStats.msgThroughputOut;
});
}

// Consumer stats can be a lot if a subscription has many consumers
if (includeConsumerMetrics) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,8 @@ static void printTopicStats(SimpleTextOutputStream stream, String cluster, Strin
subsStats.msgRateExpired, splitTopicAndPartitionIndexLabel);
metric(stream, cluster, namespace, topic, n, "pulsar_subscription_total_msg_expired",
subsStats.totalMsgExpired, splitTopicAndPartitionIndexLabel);
metric(stream, cluster, namespace, topic, n, "pulsar_subscription_msg_drop_rate",
subsStats.msgDropRate, splitTopicAndPartitionIndexLabel);
subsStats.consumerStat.forEach((c, consumerStats) -> {
metric(stream, cluster, namespace, topic, n, c.consumerName(), c.consumerId(),
"pulsar_consumer_msg_rate_redeliver", consumerStats.msgRateRedeliver,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,45 @@ public void testPerTopicExpiredStat() throws Exception {

}

@Test
public void testNonPersistentSubMetrics() throws Exception {
Producer<byte[]> p1 =
pulsarClient.newProducer().topic("non-persistent://my-property/use/my-ns/my-topic1").create();

Consumer<byte[]> c1 = pulsarClient.newConsumer()
.topic("non-persistent://my-property/use/my-ns/my-topic1")
.subscriptionName("test")
.subscribe();

final int messages = 100;

for (int i = 0; i < messages; i++) {
String message = "my-message-" + i;
p1.send(message.getBytes());
}

for (int i = 0; i < messages; i++) {
c1.acknowledge(c1.receive());
}

ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
PrometheusMetricsGenerator.generate(pulsar, true, false, false, statsOut);
String metricsStr = statsOut.toString();
Multimap<String, Metric> metrics = parseMetrics(metricsStr);
assertTrue(metrics.containsKey("pulsar_subscription_back_log"));
assertTrue(metrics.containsKey("pulsar_subscription_back_log_no_delayed"));
assertTrue(metrics.containsKey("pulsar_subscription_msg_throughput_out"));
assertTrue(metrics.containsKey("pulsar_throughput_out"));
assertTrue(metrics.containsKey("pulsar_subscription_msg_rate_redeliver"));
assertTrue(metrics.containsKey("pulsar_subscription_unacked_messages"));
assertTrue(metrics.containsKey("pulsar_subscription_blocked_on_unacked_messages"));
assertTrue(metrics.containsKey("pulsar_subscription_msg_rate_out"));
assertTrue(metrics.containsKey("pulsar_out_bytes_total"));
assertTrue(metrics.containsKey("pulsar_out_messages_total"));
assertTrue(metrics.containsKey("pulsar_subscription_last_expire_timestamp"));
assertTrue(metrics.containsKey("pulsar_subscription_msg_drop_rate"));
}

@Test
public void testPerNamespaceStats() throws Exception {
Producer<byte[]> p1 = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic1").create();
Expand Down

0 comments on commit aca0eec

Please sign in to comment.