Skip to content

Commit

Permalink
Export Prometheus metric for messageTTL (#8871)
Browse files Browse the repository at this point in the history
Fixes #8573

Some users who want to know how many messages are expired at what time? Currently, these metrics are too few, so that TTL looks like a black box, unobservable

add  `totalMsgExpired`、`lastExpireTimestamp`、`msgRateExpired` for Prometheus metric

PrometheusMetricsTest.java

(cherry picked from commit 060e35b)
  • Loading branch information
315157973 authored and codelipenghui committed Jan 6, 2021
1 parent bae9a12 commit 63b3741
Show file tree
Hide file tree
Showing 7 changed files with 153 additions and 11 deletions.
Expand Up @@ -20,7 +20,7 @@

import java.util.Optional;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;

import java.util.concurrent.atomic.LongAdder;
import org.apache.bookkeeper.mledger.AsyncCallbacks.FindEntryCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.MarkDeleteCallback;
import org.apache.bookkeeper.mledger.ManagedLedgerException.NonRecoverableLedgerException;
Expand All @@ -40,6 +40,7 @@ public class PersistentMessageExpiryMonitor implements FindEntryCallback {
private final String subName;
private final String topicName;
private final Rate msgExpired;
private final LongAdder totalMsgExpired;
private final boolean autoSkipNonRecoverableData;
private final PersistentSubscription subscription;

Expand All @@ -56,6 +57,7 @@ public PersistentMessageExpiryMonitor(String topicName, String subscriptionName,
this.subName = subscriptionName;
this.subscription = subscription;
this.msgExpired = new Rate();
this.totalMsgExpired = new LongAdder();
// check to avoid test failures
this.autoSkipNonRecoverableData = this.cursor.getManagedLedger() != null
&& this.cursor.getManagedLedger().getConfig().isAutoSkipNonRecoverableData();
Expand Down Expand Up @@ -97,13 +99,18 @@ public double getMessageExpiryRate() {
return msgExpired.getRate();
}

public long getTotalMessageExpired() {
return totalMsgExpired.sum();
}

private static final Logger log = LoggerFactory.getLogger(PersistentMessageExpiryMonitor.class);

private final MarkDeleteCallback markDeleteCallback = new MarkDeleteCallback() {
@Override
public void markDeleteComplete(Object ctx) {
long numMessagesExpired = (long) ctx - cursor.getNumberOfEntriesInBacklog(false);
msgExpired.recordMultipleEvents(numMessagesExpired, 0 /* no value stats */);
totalMsgExpired.add(numMessagesExpired);
updateRates();
// If the subscription is a Key_Shared subscription, we should to trigger message dispatch.
if (subscription != null && subscription.getType() == PulsarApi.CommandSubscribe.SubType.Key_Shared) {
Expand Down
Expand Up @@ -1025,6 +1025,7 @@ public SubscriptionStats getStats(Boolean getPreciseBacklog) {
subStats.msgBacklog = getNumberOfEntriesInBacklog(getPreciseBacklog);
subStats.msgBacklogNoDelayed = subStats.msgBacklog - subStats.msgDelayed;
subStats.msgRateExpired = expiryMonitor.getMessageExpiryRate();
subStats.totalMsgExpired = expiryMonitor.getTotalMessageExpired();
subStats.isReplicated = isReplicated();
subStats.isDurable = cursor.isDurable();
if (getType() == SubType.Key_Shared && dispatcher instanceof PersistentStickyKeyDispatcherMultipleConsumers) {
Expand Down
Expand Up @@ -45,5 +45,11 @@ public class AggregatedSubscriptionStats {

long bytesOutCounter;

long lastExpireTimestamp;

double msgRateExpired;

long totalMsgExpired;

public Map<Consumer, AggregatedConsumerStats> consumerStat = new HashMap<>();
}
Expand Up @@ -135,6 +135,9 @@ private static void getTopicStats(Topic topic, TopicStats stats, boolean include
.computeIfAbsent(subName, k -> new AggregatedSubscriptionStats());
subsStats.msgBacklog = subscriptionStats.msgBacklog;
subsStats.msgDelayed = subscriptionStats.msgDelayed;
subsStats.lastExpireTimestamp = subscriptionStats.lastExpireTimestamp;
subsStats.msgRateExpired = subscriptionStats.msgRateExpired;
subsStats.totalMsgExpired = subscriptionStats.totalMsgExpired;
subsStats.msgBacklogNoDelayed = subsStats.msgBacklog - subsStats.msgDelayed;
subscriptionStats.consumers.forEach(cStats -> {
stats.consumersCount++;
Expand Down
Expand Up @@ -137,16 +137,32 @@ static void printTopicStats(SimpleTextOutputStream stream, String cluster, Strin
metric(stream, cluster, namespace, topic, "pulsar_entry_size_sum", stats.entrySizeBuckets.getSum());

stats.subscriptionStats.forEach((n, subsStats) -> {
metric(stream, cluster, namespace, topic, n, "pulsar_subscription_back_log", subsStats.msgBacklog);
metric(stream, cluster, namespace, topic, n, "pulsar_subscription_back_log_no_delayed", subsStats.msgBacklogNoDelayed);
metric(stream, cluster, namespace, topic, n, "pulsar_subscription_delayed", subsStats.msgDelayed);
metric(stream, cluster, namespace, topic, n, "pulsar_subscription_msg_rate_redeliver", subsStats.msgRateRedeliver);
metric(stream, cluster, namespace, topic, n, "pulsar_subscription_unacked_messages", subsStats.unackedMessages);
metric(stream, cluster, namespace, topic, n, "pulsar_subscription_blocked_on_unacked_messages", subsStats.blockedSubscriptionOnUnackedMsgs ? 1 : 0);
metric(stream, cluster, namespace, topic, n, "pulsar_subscription_msg_rate_out", subsStats.msgRateOut);
metric(stream, cluster, namespace, topic, n, "pulsar_subscription_msg_throughput_out", subsStats.msgThroughputOut);
metric(stream, cluster, namespace, topic, n, "pulsar_out_bytes_total", subsStats.bytesOutCounter);
metric(stream, cluster, namespace, topic, n, "pulsar_out_messages_total", subsStats.msgOutCounter);
metric(stream, cluster, namespace, topic, n, "pulsar_subscription_back_log",
subsStats.msgBacklog);
metric(stream, cluster, namespace, topic, n, "pulsar_subscription_back_log_no_delayed",
subsStats.msgBacklogNoDelayed);
metric(stream, cluster, namespace, topic, n, "pulsar_subscription_delayed",
subsStats.msgDelayed);
metric(stream, cluster, namespace, topic, n, "pulsar_subscription_msg_rate_redeliver",
subsStats.msgRateRedeliver);
metric(stream, cluster, namespace, topic, n, "pulsar_subscription_unacked_messages",
subsStats.unackedMessages);
metric(stream, cluster, namespace, topic, n, "pulsar_subscription_blocked_on_unacked_messages",
subsStats.blockedSubscriptionOnUnackedMsgs ? 1 : 0);
metric(stream, cluster, namespace, topic, n, "pulsar_subscription_msg_rate_out",
subsStats.msgRateOut);
metric(stream, cluster, namespace, topic, n, "pulsar_subscription_msg_throughput_out",
subsStats.msgThroughputOut);
metric(stream, cluster, namespace, topic, n, "pulsar_out_bytes_total",
subsStats.bytesOutCounter);
metric(stream, cluster, namespace, topic, n, "pulsar_out_messages_total",
subsStats.msgOutCounter);
metric(stream, cluster, namespace, topic, n, "pulsar_subscription_last_expire_timestamp",
subsStats.lastExpireTimestamp);
metric(stream, cluster, namespace, topic, n, "pulsar_subscription_msg_rate_expired",
subsStats.msgRateExpired);
metric(stream, cluster, namespace, topic, n, "pulsar_subscription_total_msg_expired",
subsStats.totalMsgExpired);
subsStats.consumerStat.forEach((c, consumerStats) -> {
metric(stream, cluster, namespace, topic, n, c.consumerName(), c.consumerId(), "pulsar_consumer_msg_rate_redeliver", consumerStats.msgRateRedeliver);
metric(stream, cluster, namespace, topic, n, c.consumerName(), c.consumerId(), "pulsar_consumer_unacked_messages", consumerStats.unackedMessages);
Expand Down
Expand Up @@ -25,19 +25,29 @@
import static org.testng.Assert.fail;

import java.io.ByteArrayOutputStream;
import java.lang.reflect.Field;
import java.math.RoundingMode;
import java.text.NumberFormat;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.HashMap;
import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import org.apache.pulsar.broker.service.BrokerTestBase;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentMessageExpiryMonitor;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsGenerator;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Producer;
import org.awaitility.Awaitility;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
Expand Down Expand Up @@ -185,6 +195,100 @@ public void testPerTopicStats() throws Exception {
c2.close();
}

@Test
public void testPerTopicExpiredStat() throws Exception {
String ns = "prop/ns-abc1";
admin.namespaces().createNamespace(ns);
String topic1 = "persistent://" + ns + "/testPerTopicExpiredStat1";
String topic2 = "persistent://" + ns + "/testPerTopicExpiredStat2";
List<String> topicList = Arrays.asList(topic2,topic1);
Producer<byte[]> p1 = pulsarClient.newProducer().topic(topic1).create();
Producer<byte[]> p2 = pulsarClient.newProducer().topic(topic2).create();
final String subName = "test";
for (String topic : topicList) {
pulsarClient.newConsumer()
.topic(topic)
.subscriptionName(subName)
.subscribe().close();
}

final int messages = 10;

for (int i = 0; i < messages; i++) {
String message = "my-message-" + i;
p1.send(message.getBytes());
p2.send(message.getBytes());
}

p1.close();
p2.close();
// Let the message expire
for (String topic : topicList) {
PersistentTopic persistentTopic = (PersistentTopic)pulsar.getBrokerService().getTopicIfExists(topic).get().get();
persistentTopic.getBrokerService().getPulsar().getConfiguration().setTtlDurationDefaultInSeconds(-1);
}
pulsar.getBrokerService().forEachTopic(Topic::checkMessageExpiry);
//wait for checkMessageExpiry
PersistentSubscription sub = (PersistentSubscription)
pulsar.getBrokerService().getTopicIfExists(topic1).get().get().getSubscription(subName);
PersistentSubscription sub2 = (PersistentSubscription)
pulsar.getBrokerService().getTopicIfExists(topic2).get().get().getSubscription(subName);
Awaitility.await().atMost(2, TimeUnit.SECONDS).until(() -> sub.getExpiredMessageRate() != 0.0);
Awaitility.await().atMost(2, TimeUnit.SECONDS).until(() -> sub2.getExpiredMessageRate() != 0.0);

ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
PrometheusMetricsGenerator.generate(pulsar, true, false, statsOut);
String metricsStr = new String(statsOut.toByteArray());
Multimap<String, Metric> metrics = parseMetrics(metricsStr);
// There should be 2 metrics with different tags for each topic
List<Metric> cm = (List<Metric>) metrics.get("pulsar_subscription_last_expire_timestamp");
assertEquals(cm.size(), 2);
assertEquals(cm.get(0).tags.get("topic"), topic2);
assertEquals(cm.get(0).tags.get("namespace"), ns);
assertEquals(cm.get(1).tags.get("topic"), topic1);
assertEquals(cm.get(1).tags.get("namespace"), ns);

//check value
Field field = PersistentSubscription.class.getDeclaredField("lastExpireTimestamp");
field.setAccessible(true);
for (int i = 0; i < topicList.size(); i++) {
PersistentSubscription subscription = (PersistentSubscription) pulsar.getBrokerService()
.getTopicIfExists(topicList.get(i)).get().get().getSubscription(subName);
assertEquals((long) field.get(subscription), (long) cm.get(i).value);
}

cm = (List<Metric>) metrics.get("pulsar_subscription_msg_rate_expired");
assertEquals(cm.size(), 2);
assertEquals(cm.get(0).tags.get("topic"), topic2);
assertEquals(cm.get(0).tags.get("namespace"), ns);
assertEquals(cm.get(1).tags.get("topic"), topic1);
assertEquals(cm.get(1).tags.get("namespace"), ns);
//check value
field = PersistentSubscription.class.getDeclaredField("expiryMonitor");
field.setAccessible(true);
NumberFormat nf = NumberFormat.getNumberInstance();
nf.setMaximumFractionDigits(3);
nf.setRoundingMode(RoundingMode.DOWN);
for (int i = 0; i < topicList.size(); i++) {
PersistentSubscription subscription = (PersistentSubscription) pulsar.getBrokerService()
.getTopicIfExists(topicList.get(i)).get().get().getSubscription(subName);
PersistentMessageExpiryMonitor monitor = (PersistentMessageExpiryMonitor) field.get(subscription);
assertEquals(Double.valueOf(nf.format(monitor.getMessageExpiryRate())).doubleValue(), cm.get(i).value);
}

cm = (List<Metric>) metrics.get("pulsar_subscription_total_msg_expired");
assertEquals(cm.size(), 2);
assertEquals(cm.get(0).tags.get("topic"), topic2);
assertEquals(cm.get(0).tags.get("namespace"), ns);
assertEquals(cm.get(1).tags.get("topic"), topic1);
assertEquals(cm.get(1).tags.get("namespace"), ns);
//check value
for (int i = 0; i < topicList.size(); i++) {
assertEquals(messages, (long)cm.get(i).value);
}

}

@Test
public void testPerNamespaceStats() throws Exception {
Producer<byte[]> p1 = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic1").create();
Expand Down
Expand Up @@ -74,6 +74,9 @@ public class SubscriptionStats {
/** Total rate of messages expired on this subscription (msg/s). */
public double msgRateExpired;

/** Total messages expired on this subscription. */
public long totalMsgExpired;

/** Last message expire execution timestamp. */
public long lastExpireTimestamp;

Expand Down Expand Up @@ -119,6 +122,7 @@ public void reset() {
msgBacklogNoDelayed = 0;
unackedMessages = 0;
msgRateExpired = 0;
totalMsgExpired = 0;
lastExpireTimestamp = 0L;
consumers.clear();
consumersAfterMarkDeletePosition.clear();
Expand All @@ -139,6 +143,7 @@ public SubscriptionStats add(SubscriptionStats stats) {
this.msgBacklogNoDelayed += stats.msgBacklogNoDelayed;
this.unackedMessages += stats.unackedMessages;
this.msgRateExpired += stats.msgRateExpired;
this.totalMsgExpired += stats.totalMsgExpired;
this.isReplicated |= stats.isReplicated;
this.isDurable |= stats.isDurable;
if (this.consumers.size() != stats.consumers.size()) {
Expand Down

0 comments on commit 63b3741

Please sign in to comment.