From e8cef6b2b576a0da007fa15f4bcc58bc00bbe5d1 Mon Sep 17 00:00:00 2001 From: Ravi Singal Date: Fri, 25 Dec 2020 20:32:55 +0530 Subject: [PATCH] Export kafka streams metrics --- .../core/kafkastreams/framework/KafkaStreamsApp.java | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/kafka-streams-framework/src/main/java/org/hypertrace/core/kafkastreams/framework/KafkaStreamsApp.java b/kafka-streams-framework/src/main/java/org/hypertrace/core/kafkastreams/framework/KafkaStreamsApp.java index 8fc4fe6..a046125 100644 --- a/kafka-streams-framework/src/main/java/org/hypertrace/core/kafkastreams/framework/KafkaStreamsApp.java +++ b/kafka-streams-framework/src/main/java/org/hypertrace/core/kafkastreams/framework/KafkaStreamsApp.java @@ -29,6 +29,8 @@ import java.util.Properties; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; + +import io.micrometer.core.instrument.binder.kafka.KafkaStreamsMetrics; import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.common.record.CompressionType; import org.apache.kafka.streams.KafkaStreams; @@ -45,6 +47,7 @@ import org.hypertrace.core.serviceframework.PlatformService; import org.hypertrace.core.serviceframework.config.ConfigClient; import org.hypertrace.core.serviceframework.config.ConfigUtils; +import org.hypertrace.core.serviceframework.metrics.PlatformMetricsRegistry; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -55,6 +58,7 @@ public abstract class KafkaStreamsApp extends PlatformService { public static final String KAFKA_STREAMS_CONFIG_KEY = "kafka.streams.config"; private static final Logger logger = LoggerFactory.getLogger(KafkaStreamsApp.class); protected KafkaStreams app; + private KafkaStreamsMetrics metrics; // Visible for testing only protected Topology topology; @@ -87,6 +91,10 @@ protected void doInit() { // create kstream app app = new KafkaStreams(topology, streamsConfigProps); + // export kafka streams metrics + metrics = new KafkaStreamsMetrics(app); + metrics.bindTo(PlatformMetricsRegistry.getMeterRegistry()); + // useful for resetting local state - during testing or any other scenarios where // state (rocksdb) needs to be reset if (streamsConfig.containsKey(CLEANUP_LOCAL_STATE)) { @@ -125,6 +133,9 @@ protected void doStart() { @Override protected void doStop() { + if (metrics != null) { + metrics.close(); + } app.close(Duration.ofSeconds(30)); }