Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1826,6 +1826,39 @@ public class ServiceConfiguration implements PulsarConfiguration {
)
private boolean exposePreciseBacklogInPrometheus = false;

// Metrics Sender to Pulsar Topics
@FieldContext(
category = CATEGORY_METRICS,
doc = "Enable pulsar to send its metrics through topics"
)
private boolean metricsSenderEnabled = false;
@FieldContext(
category = CATEGORY_METRICS,
doc = "Define metrics dedicated tenant to send metrics"
)
private String metricsSenderDestinationTenant = "pulsar-metrics-tenant";

@FieldContext(
category = CATEGORY_METRICS,
doc = "Define metrics dedicated tenant to send metrics"
)
private String metricsSenderDestinationNamespace = "pulsar-metrics-ns";
@FieldContext(
category = CATEGORY_METRICS,
doc = "Define metrics sending interval in seconds"
)
private Integer metricsSenderIntervalInSeconds = 30;
@FieldContext(
category = CATEGORY_METRICS,
doc = "Metrics Sender include topic metrics"
)
private Boolean metricsSenderIncludeTopicMetrics = false;
@FieldContext(
category = CATEGORY_METRICS,
doc = "Metrics Sender include consumer metrics"
)
private Boolean metricsSenderIncludeConsumerMetrics = false;

/**** --- Functions --- ****/
@FieldContext(
category = CATEGORY_FUNCTIONS,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,9 @@
import org.apache.pulsar.broker.service.TopicPoliciesService;
import org.apache.pulsar.broker.service.schema.SchemaRegistryService;
import org.apache.pulsar.broker.stats.MetricsGenerator;
import org.apache.pulsar.broker.stats.sender.MetricsSender;
import org.apache.pulsar.broker.stats.sender.MetricsSenderConfiguration;
import org.apache.pulsar.broker.stats.sender.PulsarMetricsSender;
import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsServlet;
import org.apache.pulsar.broker.stats.prometheus.PrometheusRawMetricsProvider;
import org.apache.pulsar.broker.transaction.buffer.TransactionBufferProvider;
Expand Down Expand Up @@ -197,6 +200,7 @@ public class PulsarService implements AutoCloseable {
private final ShutdownService shutdownService;

private MetricsGenerator metricsGenerator;
private MetricsSender metricsSender;

private TransactionMetadataStoreService transactionMetadataStoreService;
private TransactionBufferProvider transactionBufferProvider;
Expand Down Expand Up @@ -378,6 +382,11 @@ public void close() throws PulsarServerException {
transactionBufferClient.close();
}

if (metricsSender != null) {
metricsSender.close();
metricsSender = null;
}

state = State.Closed;
isClosedCondition.signalAll();
} catch (Exception e) {
Expand Down Expand Up @@ -637,6 +646,12 @@ public Boolean get() {
LOG.info("messaging service is ready, {}, cluster={}, configs={}", bootstrapMessage,
config.getClusterName(), ReflectionToStringBuilder.toString(config));

if (config.isMetricsSenderEnabled()) {
LOG.info("Starting Metrics Sender");
this.metricsSender = new PulsarMetricsSender(this, new MetricsSenderConfiguration(this.config));
this.metricsSender.start();
}

state = State.Started;
} catch (Exception e) {
LOG.error(e.getMessage(), e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.stats.sender.MetricsSender;
import org.apache.pulsar.broker.stats.sender.PulsarMetrics;
import org.apache.pulsar.common.policies.data.ConsumerStats;
import org.apache.pulsar.common.policies.data.ReplicatorStats;
import org.apache.pulsar.common.util.SimpleTextOutputStream;
Expand Down Expand Up @@ -84,6 +86,44 @@ public static void generate(PulsarService pulsar, boolean includeTopicMetrics, b
});
}

public static void generate(PulsarService pulsar, boolean includeTopicMetrics,
boolean includeConsumerMetrics, MetricsSender metricsSender) {
String cluster = pulsar.getConfiguration().getClusterName();
AggregatedNamespaceStats namespaceStats = localNamespaceStats.get();
TopicStats.resetTypes();
TopicStats topicStats = localTopicStats.get();

printDefaultBrokerStats(metricsSender, cluster);

LongAdder topicsCount = new LongAdder();

pulsar.getBrokerService().getMultiLayerTopicMap().forEach((namespace, bundlesMap) -> {
namespaceStats.reset();

bundlesMap.forEach((bundle, topicsMap) -> {
topicsMap.forEach((name, topic) -> {
getTopicStats(topic, topicStats, includeConsumerMetrics,
pulsar.getConfiguration().isExposePreciseBacklogInPrometheus());

if (includeTopicMetrics) {
topicsCount.add(1);
TopicStats.printTopicStats(metricsSender, cluster, namespace, name, topicStats);
} else {
namespaceStats.updateStats(topicStats);
}
});
});

if (!includeTopicMetrics) {
// Only include namespace level stats if we don't have the per-topic, otherwise we're going to report
// the same data twice, and it will make the aggregation difficult
printNamespaceStats(metricsSender, cluster, namespace, namespaceStats);
} else {
printTopicsCountStats(metricsSender, cluster, namespace, topicsCount);
}
});
}

private static void getTopicStats(Topic topic, TopicStats stats, boolean includeConsumerMetrics,
boolean getPreciseBacklog) {
stats.reset();
Expand Down Expand Up @@ -215,11 +255,33 @@ private static void printDefaultBrokerStats(SimpleTextOutputStream stream, Strin
metric(stream, cluster, "pulsar_msg_backlog", 0);
}

private static void printDefaultBrokerStats(MetricsSender metricsSender, String cluster) {
// Print metrics with 0 values. This is necessary to have the available brokers being
// reported in the brokers dashboard even if they don't have any topic or traffic
metric(metricsSender, cluster, "pulsar_topics_count", 0);
metric(metricsSender, cluster, "pulsar_subscriptions_count", 0);
metric(metricsSender, cluster, "pulsar_producers_count", 0);
metric(metricsSender, cluster, "pulsar_consumers_count", 0);
metric(metricsSender, cluster, "pulsar_rate_in", 0);
metric(metricsSender, cluster, "pulsar_rate_out", 0);
metric(metricsSender, cluster, "pulsar_throughput_in", 0);
metric(metricsSender, cluster, "pulsar_throughput_out", 0);
metric(metricsSender, cluster, "pulsar_storage_size", 0);
metric(metricsSender, cluster, "pulsar_storage_write_rate", 0);
metric(metricsSender, cluster, "pulsar_storage_read_rate", 0);
metric(metricsSender, cluster, "pulsar_msg_backlog", 0);
}

private static void printTopicsCountStats(SimpleTextOutputStream stream, String cluster, String namespace,
LongAdder topicsCount) {
metric(stream, cluster, namespace, "pulsar_topics_count", topicsCount.sum());
}

private static void printTopicsCountStats(MetricsSender metricsSender, String cluster, String namespace,
LongAdder topicsCount) {
metric(metricsSender, cluster, namespace, "pulsar_topics_count", topicsCount.sum());
}

private static void printNamespaceStats(SimpleTextOutputStream stream, String cluster, String namespace,
AggregatedNamespaceStats stats) {
metric(stream, cluster, namespace, "pulsar_topics_count", stats.topicsCount);
Expand Down Expand Up @@ -313,34 +375,143 @@ private static void printNamespaceStats(SimpleTextOutputStream stream, String cl
}
}

private static void printNamespaceStats(MetricsSender metricsSender, String cluster, String namespace,
AggregatedNamespaceStats stats) {
metric(metricsSender, cluster, namespace, "pulsar_topics_count", stats.topicsCount);
metric(metricsSender, cluster, namespace, "pulsar_subscriptions_count", stats.subscriptionsCount);
metric(metricsSender, cluster, namespace, "pulsar_producers_count", stats.producersCount);
metric(metricsSender, cluster, namespace, "pulsar_consumers_count", stats.consumersCount);

metric(metricsSender, cluster, namespace, "pulsar_rate_in", stats.rateIn);
metric(metricsSender, cluster, namespace, "pulsar_rate_out", stats.rateOut);
metric(metricsSender, cluster, namespace, "pulsar_throughput_in", stats.throughputIn);
metric(metricsSender, cluster, namespace, "pulsar_throughput_out", stats.throughputOut);

metric(metricsSender, cluster, namespace, "pulsar_in_bytes_total", stats.bytesInCounter);
metric(metricsSender, cluster, namespace, "pulsar_in_messages_total", stats.msgInCounter);
metric(metricsSender, cluster, namespace, "pulsar_out_bytes_total", stats.bytesOutCounter);
metric(metricsSender, cluster, namespace, "pulsar_out_messages_total", stats.msgOutCounter);

metric(metricsSender, cluster, namespace, "pulsar_storage_size", stats.storageSize);
metric(metricsSender, cluster, namespace, "pulsar_storage_backlog_size", stats.backlogSize);
metric(metricsSender, cluster, namespace, "pulsar_storage_offloaded_size", stats.offloadedStorageUsed);

metric(metricsSender, cluster, namespace, "pulsar_storage_write_rate", stats.storageWriteRate);
metric(metricsSender, cluster, namespace, "pulsar_storage_read_rate", stats.storageReadRate);

metric(metricsSender, cluster, namespace, "pulsar_subscription_delayed", stats.msgDelayed);

metricWithRemoteCluster(metricsSender, cluster, namespace, "pulsar_msg_backlog", "local", stats.msgBacklog);

stats.storageWriteLatencyBuckets.refresh();
long[] latencyBuckets = stats.storageWriteLatencyBuckets.getBuckets();
metric(metricsSender, cluster, namespace, "pulsar_storage_write_latency_le_0_5", latencyBuckets[0]);
metric(metricsSender, cluster, namespace, "pulsar_storage_write_latency_le_1", latencyBuckets[1]);
metric(metricsSender, cluster, namespace, "pulsar_storage_write_latency_le_5", latencyBuckets[2]);
metric(metricsSender, cluster, namespace, "pulsar_storage_write_latency_le_10", latencyBuckets[3]);
metric(metricsSender, cluster, namespace, "pulsar_storage_write_latency_le_20", latencyBuckets[4]);
metric(metricsSender, cluster, namespace, "pulsar_storage_write_latency_le_50", latencyBuckets[5]);
metric(metricsSender, cluster, namespace, "pulsar_storage_write_latency_le_100", latencyBuckets[6]);
metric(metricsSender, cluster, namespace, "pulsar_storage_write_latency_le_200", latencyBuckets[7]);
metric(metricsSender, cluster, namespace, "pulsar_storage_write_latency_le_1000", latencyBuckets[8]);
metric(metricsSender, cluster, namespace, "pulsar_storage_write_latency_overflow", latencyBuckets[9]);
metric(metricsSender, cluster, namespace, "pulsar_storage_write_latency_count",
stats.storageWriteLatencyBuckets.getCount());
metric(metricsSender, cluster, namespace, "pulsar_storage_write_latency_sum",
stats.storageWriteLatencyBuckets.getSum());

stats.entrySizeBuckets.refresh();
long[] entrySizeBuckets = stats.entrySizeBuckets.getBuckets();
metric(metricsSender, cluster, namespace, "pulsar_entry_size_le_128", entrySizeBuckets[0]);
metric(metricsSender, cluster, namespace, "pulsar_entry_size_le_512", entrySizeBuckets[1]);
metric(metricsSender, cluster, namespace, "pulsar_entry_size_le_1_kb", entrySizeBuckets[2]);
metric(metricsSender, cluster, namespace, "pulsar_entry_size_le_2_kb", entrySizeBuckets[3]);
metric(metricsSender, cluster, namespace, "pulsar_entry_size_le_4_kb", entrySizeBuckets[4]);
metric(metricsSender, cluster, namespace, "pulsar_entry_size_le_16_kb", entrySizeBuckets[5]);
metric(metricsSender, cluster, namespace, "pulsar_entry_size_le_100_kb", entrySizeBuckets[6]);
metric(metricsSender, cluster, namespace, "pulsar_entry_size_le_1_mb", entrySizeBuckets[7]);
metric(metricsSender, cluster, namespace, "pulsar_entry_size_le_overflow", entrySizeBuckets[8]);
metric(metricsSender, cluster, namespace, "pulsar_entry_size_count", stats.entrySizeBuckets.getCount());
metric(metricsSender, cluster, namespace, "pulsar_entry_size_sum", stats.entrySizeBuckets.getSum());

if (!stats.replicationStats.isEmpty()) {
stats.replicationStats.forEach((remoteCluster, replStats) -> {
metricWithRemoteCluster(metricsSender, cluster, namespace, "pulsar_replication_rate_in", remoteCluster,
replStats.msgRateIn);
metricWithRemoteCluster(metricsSender, cluster, namespace, "pulsar_replication_rate_out", remoteCluster,
replStats.msgRateOut);
metricWithRemoteCluster(metricsSender, cluster, namespace, "pulsar_replication_throughput_in",
remoteCluster,
replStats.msgThroughputIn);
metricWithRemoteCluster(metricsSender, cluster, namespace, "pulsar_replication_throughput_out",
remoteCluster,
replStats.msgThroughputOut);
metricWithRemoteCluster(metricsSender, cluster, namespace, "pulsar_replication_backlog", remoteCluster,
replStats.replicationBacklog);
});
}
}

private static void metric(SimpleTextOutputStream stream, String cluster, String name,
long value) {
long value) {
TopicStats.metricType(stream, name);
stream.write(name)
.write("{cluster=\"").write(cluster).write("\"} ")
.write(value).write(' ').write(System.currentTimeMillis())
.write('\n');
}

private static void metric(MetricsSender metricsSender, String cluster, String name, long value) {
String head = TopicStats.metricType(name);
String body = name + "{broker=\"" + metricsSender.getComponentLabel() + "\",cluster=\"" + cluster
+ "\"} " + value + " " + System.currentTimeMillis();
metricsSender.send(new PulsarMetrics(head, body));
}

private static void metric(SimpleTextOutputStream stream, String cluster, String namespace, String name,
long value) {
TopicStats.metricType(stream, name);
stream.write(name).write("{cluster=\"").write(cluster).write("\",namespace=\"").write(namespace).write("\"} ");
stream.write(value).write(' ').write(System.currentTimeMillis()).write('\n');
}

private static void metric(MetricsSender metricsSender, String cluster, String namespace, String name,
long value) {
String head = TopicStats.metricType(name);
String body = name + "{broker=\"" + metricsSender.getComponentLabel() + "\",cluster=\"" + cluster
+ "\",namespace=\"" + namespace + "\"} " + value + " " + System.currentTimeMillis();
metricsSender.send(new PulsarMetrics(head, body));
}

private static void metric(SimpleTextOutputStream stream, String cluster, String namespace, String name,
double value) {
TopicStats.metricType(stream, name);
stream.write(name).write("{cluster=\"").write(cluster).write("\",namespace=\"").write(namespace).write("\"} ");
stream.write(value).write(' ').write(System.currentTimeMillis()).write('\n');
}

private static void metric(MetricsSender metricsSender, String cluster, String namespace, String name,
double value) {
String head = TopicStats.metricType(name);
String body = name + "{broker=\"" + metricsSender.getComponentLabel() + "\",cluster=\"" + cluster
+ "\",namespace=\"" + namespace + "\"} " + value + " " + System.currentTimeMillis();
metricsSender.send(new PulsarMetrics(head, body));
}

private static void metricWithRemoteCluster(SimpleTextOutputStream stream, String cluster, String namespace,
String name, String remoteCluster, double value) {
TopicStats.metricType(stream, name);
stream.write(name).write("{cluster=\"").write(cluster).write("\",namespace=\"").write(namespace);
stream.write("\",remote_cluster=\"").write(remoteCluster).write("\"} ");
stream.write(value).write(' ').write(System.currentTimeMillis()).write('\n');
}

private static void metricWithRemoteCluster(MetricsSender metricsSender, String cluster, String namespace,
String name, String remoteCluster, double value) {
String head = TopicStats.metricType(name);
String body = name + "{broker=\"" + metricsSender.getComponentLabel() + "\",cluster=\"" + cluster
+ "\",namespace=\"" + namespace + "\",remote_cluster=\"" + remoteCluster + "\"} "
+ value + " " + System.currentTimeMillis();
metricsSender.send(new PulsarMetrics(head, body));
}
}
Loading