diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index 2a0ee8356e1cb..4acac7b4f0e59 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -2377,6 +2377,11 @@ public class ServiceConfiguration implements PulsarConfiguration { doc = "If true, export buffered metrics" ) private boolean metricsBufferResponse = false; + @FieldContext( + category = CATEGORY_METRICS, + doc = "If true, export compressed metrics data" + ) + private boolean enableMetricsDataCompression = false; @FieldContext( category = CATEGORY_METRICS, doc = "If true, export consumer level metrics otherwise namespace level" diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsServlet.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsServlet.java index 1c3277d5ee480..43d4813917ee1 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsServlet.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsServlet.java @@ -67,6 +67,10 @@ protected void doGet(HttpServletRequest request, HttpServletResponse response) { try { res.setStatus(HTTP_STATUS_OK_200); res.setContentType("text/plain"); + if (enableCompressMetricsData()) { + //@see: https://github.com/prometheus/prometheus/blob/main/scrape/scrape.go + res.setHeader("Content-Encoding", "gzip"); + } generateMetrics(cluster, res.getOutputStream()); } catch (Exception e) { long end = System.currentTimeMillis(); @@ -99,6 +103,11 @@ protected void generateMetrics(String cluster, ServletOutputStream outputStream) PrometheusMetricsGeneratorUtils.generate(cluster, outputStream, metricsProviders); } + + protected boolean enableCompressMetricsData() { + return false; + } + @Override public void destroy() { if (executor != null) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java index a321d1fcfe1a7..d855c990c76ba 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java @@ -41,6 +41,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; +import java.util.zip.GZIPOutputStream; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.stats.NullStatsProvider; import org.apache.bookkeeper.stats.StatsProvider; @@ -114,6 +115,7 @@ public static synchronized void generate(PulsarService pulsar, boolean includeTo List metricsProviders) throws IOException { ByteBuf buffer; boolean exposeBufferMetrics = pulsar.getConfiguration().isMetricsBufferResponse(); + boolean enableCompress = pulsar.getConfiguration().isEnableMetricsDataCompression(); if (!exposeBufferMetrics) { buffer = generate0(pulsar, includeTopicMetrics, includeConsumerMetrics, includeProducerMetrics, @@ -149,29 +151,7 @@ public static synchronized void generate(PulsarService pulsar, boolean includeTo log.debug("Current window start {}, current cached buf size {}", window.start(), buffer.readableBytes()); } - try { - if (out instanceof HttpOutput) { - HttpOutput output = (HttpOutput) out; - //no mem_copy and memory allocations here - ByteBuffer[] buffers = buffer.nioBuffers(); - for (ByteBuffer buffer0 : buffers) { - output.write(buffer0); - } - } else { - //read data from buffer and write it to output stream, with no more heap buffer(byte[]) allocation. - //not modify buffer readIndex/writeIndex here. - int readIndex = buffer.readerIndex(); - int readableBytes = buffer.readableBytes(); - for (int i = 0; i < readableBytes; i++) { - out.write(buffer.getByte(readIndex + i)); - } - } - } finally { - if (!exposeBufferMetrics && buffer.refCnt() > 0) { - buffer.release(); - log.debug("Metrics buffer released."); - } - } + writeOutMetricData(out, buffer, exposeBufferMetrics, enableCompress); } private static ByteBuf generate0(PulsarService pulsar, boolean includeTopicMetrics, boolean includeConsumerMetrics, @@ -318,4 +298,40 @@ private static void generateManagedLedgerBookieClientMetrics(PulsarService pulsa // nop } } + + + private static void writeOutMetricData(OutputStream out, ByteBuf buffer, boolean exposeBufferMetrics, + boolean enableCompress) throws IOException { + try { + if (enableCompress) { + out = new GZIPOutputStream(out); + } + if (out instanceof HttpOutput) { + HttpOutput output = (HttpOutput) out; + //no mem_copy and memory allocations here + ByteBuffer[] buffers = buffer.nioBuffers(); + for (ByteBuffer buffer0 : buffers) { + output.write(buffer0); + } + } else { + //read data from buffer and write it to output stream, with no more heap buffer(byte[]) allocation. + //not modify buffer readIndex/writeIndex here. + int readIndex = buffer.readerIndex(); + int readableBytes = buffer.readableBytes(); + for (int i = 0; i < readableBytes; i++) { + out.write(buffer.getByte(readIndex + i)); + } + } + } finally { + try { + out.close(); + } finally { + if (!exposeBufferMetrics && buffer.refCnt() > 0) { + buffer.release(); + log.debug("Metrics buffer released."); + } + } + } + } + } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PulsarPrometheusMetricsServlet.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PulsarPrometheusMetricsServlet.java index 62754f774bf1f..4cd2c3f783276 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PulsarPrometheusMetricsServlet.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PulsarPrometheusMetricsServlet.java @@ -43,6 +43,12 @@ public PulsarPrometheusMetricsServlet(PulsarService pulsar, boolean includeTopic this.splitTopicAndPartitionLabel = splitTopicAndPartitionLabel; } + + @Override + protected boolean enableCompressMetricsData() { + return pulsar.getConfiguration().isEnableMetricsDataCompression(); + } + @Override protected void generateMetrics(String cluster, ServletOutputStream outputStream) throws IOException { PrometheusMetricsGenerator.generate(pulsar, shouldExportTopicMetrics, shouldExportConsumerMetrics, diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java index 66aa335e3083a..f605776ecd692 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java @@ -28,8 +28,10 @@ import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.Multimap; import io.jsonwebtoken.SignatureAlgorithm; +import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.io.InputStream; import java.lang.reflect.Field; import java.math.RoundingMode; import java.nio.charset.StandardCharsets; @@ -49,6 +51,7 @@ import java.util.concurrent.TimeUnit; import java.util.regex.Matcher; import java.util.regex.Pattern; +import java.util.zip.GZIPInputStream; import javax.crypto.SecretKey; import javax.naming.AuthenticationException; import lombok.Cleanup; @@ -72,6 +75,7 @@ import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.compaction.Compactor; +import org.apache.pulsar.functions.worker.PulsarFunctionTestUtils; import org.awaitility.Awaitility; import org.mockito.Mockito; import org.testng.Assert; @@ -1494,6 +1498,34 @@ public void testSplitTopicAndPartitionLabel() throws Exception { consumer2.close(); } + + @Test + public void testCompressedMetricsData() throws Exception { + pulsar.getConfiguration().setEnableMetricsDataCompression(true); + + @Cleanup + ByteArrayOutputStream statsOut = new ByteArrayOutputStream(); + PrometheusMetricsGenerator.generate(pulsar, true, false, false, true, statsOut); + byte[] bytes = statsOut.toByteArray(); + ByteArrayInputStream input = new ByteArrayInputStream(bytes); + GZIPInputStream gzipInput = new GZIPInputStream(input); + bytes = gzipInput.readAllBytes(); + String metricsStr = new String(bytes, StandardCharsets.UTF_8); + parseMetrics(metricsStr); + } + + @Test + public void testScrapCompressedMetricsData() throws IOException { + pulsar.getConfiguration().setEnableMetricsDataCompression(true); + + InputStream input = + PulsarFunctionTestUtils.getPrometheusMetrics0(pulsar.getConfiguration().getWebServicePort().get()); + GZIPInputStream gzip = new GZIPInputStream(input); + byte[] bytes = gzip.readAllBytes(); + String metricStr = new String(bytes, StandardCharsets.UTF_8); + parseMetrics(metricStr); + } + private void compareCompactionStateCount(List cm, double count) { assertEquals(cm.size(), 1); assertEquals(cm.get(0).tags.get("cluster"), "test"); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTestUtils.java b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTestUtils.java index 08708813be9e0..c980c1743fff2 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTestUtils.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTestUtils.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.functions.worker; +import java.io.InputStream; import java.nio.charset.StandardCharsets; import lombok.ToString; import lombok.extern.slf4j.Slf4j; @@ -54,6 +55,13 @@ public static String getPrometheusMetrics(int metricsPort) throws IOException { return result.toString(); } + public static InputStream getPrometheusMetrics0(int metricsPort) throws IOException { + URL url = new URL(String.format("http://%s:%s/metrics", "localhost", metricsPort)); + HttpURLConnection conn = (HttpURLConnection) url.openConnection(); + conn.setRequestMethod("GET"); + return conn.getInputStream(); + } + @Test void testParseMetrics() throws IOException { String sampleMetrics = IOUtils.toString(getClass().getClassLoader()