Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2377,6 +2377,11 @@ public class ServiceConfiguration implements PulsarConfiguration {
doc = "If true, export buffered metrics"
)
private boolean metricsBufferResponse = false;
@FieldContext(
category = CATEGORY_METRICS,
doc = "If true, export compressed metrics data"
)
private boolean enableMetricsDataCompression = false;
@FieldContext(
category = CATEGORY_METRICS,
doc = "If true, export consumer level metrics otherwise namespace level"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ protected void doGet(HttpServletRequest request, HttpServletResponse response) {
try {
res.setStatus(HTTP_STATUS_OK_200);
res.setContentType("text/plain");
if (enableCompressMetricsData()) {
//@see: https://github.com/prometheus/prometheus/blob/main/scrape/scrape.go
res.setHeader("Content-Encoding", "gzip");
}
generateMetrics(cluster, res.getOutputStream());
} catch (Exception e) {
long end = System.currentTimeMillis();
Expand Down Expand Up @@ -99,6 +103,11 @@ protected void generateMetrics(String cluster, ServletOutputStream outputStream)
PrometheusMetricsGeneratorUtils.generate(cluster, outputStream, metricsProviders);
}


protected boolean enableCompressMetricsData() {
return false;
}

@Override
public void destroy() {
if (executor != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.zip.GZIPOutputStream;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.stats.NullStatsProvider;
import org.apache.bookkeeper.stats.StatsProvider;
Expand Down Expand Up @@ -114,6 +115,7 @@ public static synchronized void generate(PulsarService pulsar, boolean includeTo
List<PrometheusRawMetricsProvider> metricsProviders) throws IOException {
ByteBuf buffer;
boolean exposeBufferMetrics = pulsar.getConfiguration().isMetricsBufferResponse();
boolean enableCompress = pulsar.getConfiguration().isEnableMetricsDataCompression();

if (!exposeBufferMetrics) {
buffer = generate0(pulsar, includeTopicMetrics, includeConsumerMetrics, includeProducerMetrics,
Expand Down Expand Up @@ -149,29 +151,7 @@ public static synchronized void generate(PulsarService pulsar, boolean includeTo
log.debug("Current window start {}, current cached buf size {}", window.start(), buffer.readableBytes());
}

try {
if (out instanceof HttpOutput) {
HttpOutput output = (HttpOutput) out;
//no mem_copy and memory allocations here
ByteBuffer[] buffers = buffer.nioBuffers();
for (ByteBuffer buffer0 : buffers) {
output.write(buffer0);
}
} else {
//read data from buffer and write it to output stream, with no more heap buffer(byte[]) allocation.
//not modify buffer readIndex/writeIndex here.
int readIndex = buffer.readerIndex();
int readableBytes = buffer.readableBytes();
for (int i = 0; i < readableBytes; i++) {
out.write(buffer.getByte(readIndex + i));
}
}
} finally {
if (!exposeBufferMetrics && buffer.refCnt() > 0) {
buffer.release();
log.debug("Metrics buffer released.");
}
}
writeOutMetricData(out, buffer, exposeBufferMetrics, enableCompress);
}

private static ByteBuf generate0(PulsarService pulsar, boolean includeTopicMetrics, boolean includeConsumerMetrics,
Expand Down Expand Up @@ -318,4 +298,40 @@ private static void generateManagedLedgerBookieClientMetrics(PulsarService pulsa
// nop
}
}


private static void writeOutMetricData(OutputStream out, ByteBuf buffer, boolean exposeBufferMetrics,
boolean enableCompress) throws IOException {
try {
if (enableCompress) {
out = new GZIPOutputStream(out);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think GZIPOutputStream will create a lot of garbage on the JVM heap :|

Could we just use org.apache.pulsar.common.compression.CompressionCodecZLib on the buffer?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@merlimat But CompressionCodecZLib not in GZIP format, and Prometheus read metrics data in GZIP format:


	if s.gzipr == nil {
		s.buf = bufio.NewReader(resp.Body)
		s.gzipr, err = gzip.NewReader(s.buf)
		if err != nil {
			return "", err
		}
	} else {
		s.buf.Reset(resp.Body)
		if err = s.gzipr.Reset(s.buf); err != nil {
			return "", err
		}
	}

	n, err := io.Copy(w, io.LimitReader(s.gzipr, s.bodySizeLimit))
	s.gzipr.Close()
	if err != nil {
		return "", err
	}

}
if (out instanceof HttpOutput) {
HttpOutput output = (HttpOutput) out;
//no mem_copy and memory allocations here
ByteBuffer[] buffers = buffer.nioBuffers();
for (ByteBuffer buffer0 : buffers) {
output.write(buffer0);
}
} else {
//read data from buffer and write it to output stream, with no more heap buffer(byte[]) allocation.
//not modify buffer readIndex/writeIndex here.
int readIndex = buffer.readerIndex();
int readableBytes = buffer.readableBytes();
for (int i = 0; i < readableBytes; i++) {
out.write(buffer.getByte(readIndex + i));
}
}
} finally {
try {
out.close();
} finally {
if (!exposeBufferMetrics && buffer.refCnt() > 0) {
buffer.release();
log.debug("Metrics buffer released.");
}
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,12 @@ public PulsarPrometheusMetricsServlet(PulsarService pulsar, boolean includeTopic
this.splitTopicAndPartitionLabel = splitTopicAndPartitionLabel;
}


@Override
protected boolean enableCompressMetricsData() {
return pulsar.getConfiguration().isEnableMetricsDataCompression();
}

@Override
protected void generateMetrics(String cluster, ServletOutputStream outputStream) throws IOException {
PrometheusMetricsGenerator.generate(pulsar, shouldExportTopicMetrics, shouldExportConsumerMetrics,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,10 @@
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Multimap;
import io.jsonwebtoken.SignatureAlgorithm;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.lang.reflect.Field;
import java.math.RoundingMode;
import java.nio.charset.StandardCharsets;
Expand All @@ -49,6 +51,7 @@
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.zip.GZIPInputStream;
import javax.crypto.SecretKey;
import javax.naming.AuthenticationException;
import lombok.Cleanup;
Expand All @@ -72,6 +75,7 @@
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.compaction.Compactor;
import org.apache.pulsar.functions.worker.PulsarFunctionTestUtils;
import org.awaitility.Awaitility;
import org.mockito.Mockito;
import org.testng.Assert;
Expand Down Expand Up @@ -1494,6 +1498,34 @@ public void testSplitTopicAndPartitionLabel() throws Exception {
consumer2.close();
}


@Test
public void testCompressedMetricsData() throws Exception {
pulsar.getConfiguration().setEnableMetricsDataCompression(true);

@Cleanup
ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
PrometheusMetricsGenerator.generate(pulsar, true, false, false, true, statsOut);
byte[] bytes = statsOut.toByteArray();
ByteArrayInputStream input = new ByteArrayInputStream(bytes);
GZIPInputStream gzipInput = new GZIPInputStream(input);
bytes = gzipInput.readAllBytes();
String metricsStr = new String(bytes, StandardCharsets.UTF_8);
parseMetrics(metricsStr);
}

@Test
public void testScrapCompressedMetricsData() throws IOException {
pulsar.getConfiguration().setEnableMetricsDataCompression(true);

InputStream input =
PulsarFunctionTestUtils.getPrometheusMetrics0(pulsar.getConfiguration().getWebServicePort().get());
GZIPInputStream gzip = new GZIPInputStream(input);
byte[] bytes = gzip.readAllBytes();
String metricStr = new String(bytes, StandardCharsets.UTF_8);
parseMetrics(metricStr);
}

private void compareCompactionStateCount(List<Metric> cm, double count) {
assertEquals(cm.size(), 1);
assertEquals(cm.get(0).tags.get("cluster"), "test");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pulsar.functions.worker;

import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import lombok.ToString;
import lombok.extern.slf4j.Slf4j;
Expand Down Expand Up @@ -54,6 +55,13 @@ public static String getPrometheusMetrics(int metricsPort) throws IOException {
return result.toString();
}

public static InputStream getPrometheusMetrics0(int metricsPort) throws IOException {
URL url = new URL(String.format("http://%s:%s/metrics", "localhost", metricsPort));
HttpURLConnection conn = (HttpURLConnection) url.openConnection();
conn.setRequestMethod("GET");
return conn.getInputStream();
}

@Test
void testParseMetrics() throws IOException {
String sampleMetrics = IOUtils.toString(getClass().getClassLoader()
Expand Down