Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import io.micrometer.core.instrument.binder.kafka.KafkaStreamsMetrics;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.streams.KafkaStreams;
Expand All @@ -45,6 +47,7 @@
import org.hypertrace.core.serviceframework.PlatformService;
import org.hypertrace.core.serviceframework.config.ConfigClient;
import org.hypertrace.core.serviceframework.config.ConfigUtils;
import org.hypertrace.core.serviceframework.metrics.PlatformMetricsRegistry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -55,6 +58,7 @@ public abstract class KafkaStreamsApp extends PlatformService {
public static final String KAFKA_STREAMS_CONFIG_KEY = "kafka.streams.config";
private static final Logger logger = LoggerFactory.getLogger(KafkaStreamsApp.class);
protected KafkaStreams app;
private KafkaStreamsMetrics metrics;

// Visible for testing only
protected Topology topology;
Expand Down Expand Up @@ -87,6 +91,10 @@ protected void doInit() {
// create kstream app
app = new KafkaStreams(topology, streamsConfigProps);

// export kafka streams metrics
metrics = new KafkaStreamsMetrics(app);
metrics.bindTo(PlatformMetricsRegistry.getMeterRegistry());

// useful for resetting local state - during testing or any other scenarios where
// state (rocksdb) needs to be reset
if (streamsConfig.containsKey(CLEANUP_LOCAL_STATE)) {
Expand Down Expand Up @@ -125,6 +133,9 @@ protected void doStart() {

@Override
protected void doStop() {
if (metrics != null) {
metrics.close();
}
app.close(Duration.ofSeconds(30));
}

Expand Down