Skip to content

Commit

Permalink
Add support of PrometheusRawMetricsProvider for the Pulsar-Proxy (apa…
Browse files Browse the repository at this point in the history
  • Loading branch information
cbornet authored and Aparajita Singh committed Mar 21, 2022
1 parent c3a3a01 commit 5026a30
Show file tree
Hide file tree
Showing 10 changed files with 444 additions and 94 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.stats.prometheus;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.prometheus.client.Collector;
import io.prometheus.client.CollectorRegistry;
import java.io.IOException;
import java.io.OutputStream;
import java.util.Enumeration;
import java.util.List;
import org.apache.pulsar.common.util.SimpleTextOutputStream;

/**
* Generate metrics in a text format suitable to be consumed by Prometheus.
* Format specification can be found at {@link https://prometheus.io/docs/instrumenting/exposition_formats/}
*/
public class PrometheusMetricsGeneratorUtils {

public static void generate(String cluster, OutputStream out,
List<PrometheusRawMetricsProvider> metricsProviders)
throws IOException {
ByteBuf buf = ByteBufAllocator.DEFAULT.heapBuffer();
try {
SimpleTextOutputStream stream = new SimpleTextOutputStream(buf);
generateSystemMetrics(stream, cluster);
if (metricsProviders != null) {
for (PrometheusRawMetricsProvider metricsProvider : metricsProviders) {
metricsProvider.generate(stream);
}
}
out.write(buf.array(), buf.arrayOffset(), buf.readableBytes());
} finally {
buf.release();
}
}

public static void generateSystemMetrics(SimpleTextOutputStream stream, String cluster) {
Enumeration<Collector.MetricFamilySamples> metricFamilySamples =
CollectorRegistry.defaultRegistry.metricFamilySamples();
while (metricFamilySamples.hasMoreElements()) {
Collector.MetricFamilySamples metricFamily = metricFamilySamples.nextElement();

// Write type of metric
stream.write("# TYPE ").write(metricFamily.name).write(' ')
.write(getTypeStr(metricFamily.type)).write('\n');

for (int i = 0; i < metricFamily.samples.size(); i++) {
Collector.MetricFamilySamples.Sample sample = metricFamily.samples.get(i);
stream.write(sample.name);
stream.write("{cluster=\"").write(cluster).write('"');
for (int j = 0; j < sample.labelNames.size(); j++) {
String labelValue = sample.labelValues.get(j);
if (labelValue != null) {
labelValue = labelValue.replace("\"", "\\\"");
}

stream.write(",");
stream.write(sample.labelNames.get(j));
stream.write("=\"");
stream.write(labelValue);
stream.write('"');
}

stream.write("} ");
stream.write(Collector.doubleToGoString(sample.value));
stream.write('\n');
}
}
}

static String getTypeStr(Collector.Type type) {
switch (type) {
case COUNTER:
return "counter";
case GAUGE:
return "gauge";
case SUMMARY :
return "summary";
case HISTOGRAM:
return "histogram";
case UNTYPED:
default:
return "untyped";
}
}

}

Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
*/
package org.apache.pulsar.broker.stats.prometheus;

import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun;
import static org.apache.bookkeeper.util.SafeRunnable.safeRun;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.io.EOFException;
import java.io.IOException;
Expand All @@ -28,36 +28,28 @@
import java.util.concurrent.Executors;
import javax.servlet.AsyncContext;
import javax.servlet.ServletException;
import javax.servlet.ServletOutputStream;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.apache.pulsar.broker.PulsarService;
import org.eclipse.jetty.http.HttpStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PrometheusMetricsServlet extends HttpServlet {

private static final long serialVersionUID = 1L;
private static final int HTTP_STATUS_OK_200 = 200;
private static final int HTTP_STATUS_INTERNAL_SERVER_ERROR_500 = 500;

private final PulsarService pulsar;
private final boolean shouldExportTopicMetrics;
private final boolean shouldExportConsumerMetrics;
private final boolean shouldExportProducerMetrics;
private final long metricsServletTimeoutMs;
private final boolean splitTopicAndPartitionLabel;
private List<PrometheusRawMetricsProvider> metricsProviders;
private final String cluster;
protected List<PrometheusRawMetricsProvider> metricsProviders;

private ExecutorService executor = null;

public PrometheusMetricsServlet(PulsarService pulsar, boolean includeTopicMetrics, boolean includeConsumerMetrics,
boolean shouldExportProducerMetrics, boolean splitTopicAndPartitionLabel) {
this.pulsar = pulsar;
this.shouldExportTopicMetrics = includeTopicMetrics;
this.shouldExportConsumerMetrics = includeConsumerMetrics;
this.shouldExportProducerMetrics = shouldExportProducerMetrics;
this.metricsServletTimeoutMs = pulsar.getConfiguration().getMetricsServletTimeoutMs();
this.splitTopicAndPartitionLabel = splitTopicAndPartitionLabel;
public PrometheusMetricsServlet(long metricsServletTimeoutMs, String cluster) {
this.metricsServletTimeoutMs = metricsServletTimeoutMs;
this.cluster = cluster;
}

@Override
Expand All @@ -66,19 +58,16 @@ public void init() throws ServletException {
}

@Override
protected void doGet(HttpServletRequest request, HttpServletResponse response)
throws ServletException, IOException {
protected void doGet(HttpServletRequest request, HttpServletResponse response) {
AsyncContext context = request.startAsync();
context.setTimeout(metricsServletTimeoutMs);
executor.execute(safeRun(() -> {
long start = System.currentTimeMillis();
HttpServletResponse res = (HttpServletResponse) context.getResponse();
try {
res.setStatus(HttpStatus.OK_200);
res.setStatus(HTTP_STATUS_OK_200);
res.setContentType("text/plain");
PrometheusMetricsGenerator.generate(pulsar, shouldExportTopicMetrics, shouldExportConsumerMetrics,
shouldExportProducerMetrics, splitTopicAndPartitionLabel, res.getOutputStream(),
metricsProviders);
generateMetrics(cluster, res.getOutputStream());
} catch (Exception e) {
long end = System.currentTimeMillis();
long time = end - start;
Expand All @@ -90,7 +79,7 @@ protected void doGet(HttpServletRequest request, HttpServletResponse response)
} else {
log.error("Failed to generate prometheus stats, {} ms elapsed", time, e);
}
res.setStatus(HttpStatus.INTERNAL_SERVER_ERROR_500);
res.setStatus(HTTP_STATUS_INTERNAL_SERVER_ERROR_500);
} finally {
long end = System.currentTimeMillis();
long time = end - start;
Expand All @@ -106,6 +95,10 @@ protected void doGet(HttpServletRequest request, HttpServletResponse response)
}));
}

protected void generateMetrics(String cluster, ServletOutputStream outputStream) throws IOException {
PrometheusMetricsGeneratorUtils.generate(cluster, outputStream, metricsProviders);
}

@Override
public void destroy() {
if (executor != null) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.stats.prometheus;
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,8 @@
import org.apache.pulsar.broker.service.TransactionBufferSnapshotService;
import org.apache.pulsar.broker.service.schema.SchemaRegistryService;
import org.apache.pulsar.broker.stats.MetricsGenerator;
import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsServlet;
import org.apache.pulsar.broker.stats.prometheus.PrometheusRawMetricsProvider;
import org.apache.pulsar.broker.stats.prometheus.PulsarPrometheusMetricsServlet;
import org.apache.pulsar.broker.storage.ManagedLedgerStorage;
import org.apache.pulsar.broker.transaction.buffer.TransactionBufferProvider;
import org.apache.pulsar.broker.transaction.buffer.impl.TransactionBufferClientImpl;
Expand Down Expand Up @@ -244,7 +244,7 @@ public class PulsarService implements AutoCloseable, ShutdownService {

// packages management service
private Optional<PackagesManagement> packagesManagement = Optional.empty();
private PrometheusMetricsServlet metricsServlet;
private PulsarPrometheusMetricsServlet metricsServlet;
private List<PrometheusRawMetricsProvider> pendingMetricsProviders;

private MetadataStoreExtended localMetadataStore;
Expand Down Expand Up @@ -413,7 +413,7 @@ public CompletableFuture<Void> closeAsync() {
}
}

metricsServlet = null;
resetMetricsServlet();

if (this.webSocketService != null) {
this.webSocketService.close();
Expand Down Expand Up @@ -570,6 +570,10 @@ public CompletableFuture<Void> closeAsync() {
}
}

private synchronized void resetMetricsServlet() {
metricsServlet = null;
}

private CompletableFuture<Void> addTimeoutHandling(CompletableFuture<Void> future) {
ScheduledExecutorService shutdownExecutor = Executors.newSingleThreadScheduledExecutor(
new DefaultThreadFactory(getClass().getSimpleName() + "-shutdown"));
Expand Down Expand Up @@ -698,16 +702,7 @@ public void start() throws PulsarServerException {
this.brokerAdditionalServlets = AdditionalServlets.load(config);

this.webService = new WebService(this);
this.metricsServlet = new PrometheusMetricsServlet(
this, config.isExposeTopicLevelMetricsInPrometheus(),
config.isExposeConsumerLevelMetricsInPrometheus(),
config.isExposeProducerLevelMetricsInPrometheus(),
config.isSplitTopicAndPartitionLabelInPrometheus());
if (pendingMetricsProviders != null) {
pendingMetricsProviders.forEach(provider -> metricsServlet.addRawMetricsProvider(provider));
this.pendingMetricsProviders = null;
}

createMetricsServlet();
this.addWebServerHandlers(webService, metricsServlet, this.config);
this.webService.start();
heartbeatNamespaceV1 = NamespaceService.getHeartbeatNamespace(this.advertisedAddress, this.config);
Expand Down Expand Up @@ -825,8 +820,20 @@ public void start() throws PulsarServerException {
}
}

private synchronized void createMetricsServlet() {
this.metricsServlet = new PulsarPrometheusMetricsServlet(
this, config.isExposeTopicLevelMetricsInPrometheus(),
config.isExposeConsumerLevelMetricsInPrometheus(),
config.isExposeProducerLevelMetricsInPrometheus(),
config.isSplitTopicAndPartitionLabelInPrometheus());
if (pendingMetricsProviders != null) {
pendingMetricsProviders.forEach(provider -> metricsServlet.addRawMetricsProvider(provider));
this.pendingMetricsProviders = null;
}
}

private void addWebServerHandlers(WebService webService,
PrometheusMetricsServlet metricsServlet,
PulsarPrometheusMetricsServlet metricsServlet,
ServiceConfiguration config)
throws PulsarServerException, PulsarClientException, MalformedURLException, ServletException,
DeploymentException {
Expand Down Expand Up @@ -1522,7 +1529,7 @@ public ResourceUsageTransportManager getResourceUsageTransportManager() {
return resourceUsageTransportManager;
}

public void addPrometheusRawMetricsProvider(PrometheusRawMetricsProvider metricsProvider) {
public synchronized void addPrometheusRawMetricsProvider(PrometheusRawMetricsProvider metricsProvider) {
if (metricsServlet == null) {
if (pendingMetricsProviders == null) {
pendingMetricsProviders = new LinkedList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@
*/
package org.apache.pulsar.broker.stats.prometheus;

import static org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsGeneratorUtils.generateSystemMetrics;
import static org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsGeneratorUtils.getTypeStr;
import static org.apache.pulsar.common.stats.JvmMetrics.getJvmDirectMemoryUsed;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.prometheus.client.Collector;
import io.prometheus.client.Collector.MetricFamilySamples;
import io.prometheus.client.Collector.MetricFamilySamples.Sample;
import io.prometheus.client.CollectorRegistry;
import io.prometheus.client.Gauge;
import io.prometheus.client.Gauge.Child;
Expand All @@ -34,7 +34,6 @@
import java.io.Writer;
import java.util.Collection;
import java.util.Collections;
import java.util.Enumeration;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -230,53 +229,4 @@ private static void generateManagedLedgerBookieClientMetrics(PulsarService pulsa
}
}

private static void generateSystemMetrics(SimpleTextOutputStream stream, String cluster) {
Enumeration<MetricFamilySamples> metricFamilySamples = CollectorRegistry.defaultRegistry.metricFamilySamples();
while (metricFamilySamples.hasMoreElements()) {
MetricFamilySamples metricFamily = metricFamilySamples.nextElement();

// Write type of metric
stream.write("# TYPE ").write(metricFamily.name).write(' ')
.write(getTypeStr(metricFamily.type)).write('\n');

for (int i = 0; i < metricFamily.samples.size(); i++) {
Sample sample = metricFamily.samples.get(i);
stream.write(sample.name);
stream.write("{cluster=\"").write(cluster).write('"');
for (int j = 0; j < sample.labelNames.size(); j++) {
String labelValue = sample.labelValues.get(j);
if (labelValue != null) {
labelValue = labelValue.replace("\"", "\\\"");
}

stream.write(",");
stream.write(sample.labelNames.get(j));
stream.write("=\"");
stream.write(labelValue);
stream.write('"');
}

stream.write("} ");
stream.write(Collector.doubleToGoString(sample.value));
stream.write('\n');
}
}
}

static String getTypeStr(Collector.Type type) {
switch (type) {
case COUNTER:
return "counter";
case GAUGE:
return "gauge";
case SUMMARY :
return "summary";
case HISTOGRAM:
return "histogram";
case UNTYPED:
default:
return "untyped";
}
}

}
Loading

0 comments on commit 5026a30

Please sign in to comment.