Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add extension for custom metrics (5.3.x) #2997

Merged
merged 4 commits into from Jul 20, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -149,6 +149,11 @@ public class KsqlConfig extends AbstractConfig {
"A list of tags to be included with emitted JMX metrics, formatted as a string of key:value "
+ "pairs separated by commas. For example, 'key1:value1,key2:value2'.";

public static final String KSQL_CUSTOM_METRICS_EXTENSION = "ksql.metrics.extension";
private static final String KSQL_CUSTOM_METRICS_EXTENSION_DOC =
"Extension for supplying custom metrics to be emitted along with "
+ "the engine's default JMX metrics";

public static final String KSQL_STREAMS_PREFIX = "ksql.streams.";

public static final String KSQL_COLLECT_UDF_METRICS = "ksql.udf.collect.metrics";
Expand Down Expand Up @@ -459,6 +464,12 @@ private static ConfigDef buildConfigDef(final ConfigGeneration generation) {
"",
ConfigDef.Importance.LOW,
KSQL_CUSTOM_METRICS_TAGS_DOC
).define(
KSQL_CUSTOM_METRICS_EXTENSION,
ConfigDef.Type.CLASS,
null,
ConfigDef.Importance.LOW,
KSQL_CUSTOM_METRICS_EXTENSION_DOC
)
.withClientSslSupport();
for (final CompatibilityBreakingConfigDef compatibilityBreakingConfigDef
Expand Down
18 changes: 16 additions & 2 deletions ksql-engine/src/main/java/io/confluent/ksql/ServiceInfo.java
Expand Up @@ -15,14 +15,17 @@

package io.confluent.ksql;

import io.confluent.ksql.internal.KsqlMetricsExtension;
import io.confluent.ksql.util.KsqlConfig;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;

public final class ServiceInfo {

private final String serviceId;
private final Map<String, String> customMetricsTags;
private final Optional<KsqlMetricsExtension> metricsExtension;

/**
* Create an object to be passed from the KSQL context down to the KSQL engine.
Expand All @@ -33,16 +36,23 @@ public static ServiceInfo create(final KsqlConfig ksqlConfig) {
final String serviceId = ksqlConfig.getString(KsqlConfig.KSQL_SERVICE_ID_CONFIG);
final Map<String, String> customMetricsTags =
ksqlConfig.getStringAsMap(KsqlConfig.KSQL_CUSTOM_METRICS_TAGS);
final Optional<KsqlMetricsExtension> metricsExtension = Optional.ofNullable(
ksqlConfig.getConfiguredInstance(
KsqlConfig.KSQL_CUSTOM_METRICS_EXTENSION,
KsqlMetricsExtension.class
));

return new ServiceInfo(serviceId, customMetricsTags);
return new ServiceInfo(serviceId, customMetricsTags, metricsExtension);
}

private ServiceInfo(
final String serviceId,
final Map<String, String> customMetricsTags
final Map<String, String> customMetricsTags,
final Optional<KsqlMetricsExtension> metricsExtension
) {
this.serviceId = Objects.requireNonNull(serviceId, "serviceId");
this.customMetricsTags = Objects.requireNonNull(customMetricsTags, "customMetricsTags");
this.metricsExtension = Objects.requireNonNull(metricsExtension, "metricsExtension");
}

public String serviceId() {
Expand All @@ -52,4 +62,8 @@ public String serviceId() {
public Map<String, String> customMetricsTags() {
return customMetricsTags;
}

public Optional<KsqlMetricsExtension> metricsExtension() {
return metricsExtension;
}
}
Expand Up @@ -75,7 +75,8 @@ public KsqlEngine(
processingLogContext,
serviceInfo.serviceId(),
new MetaStoreImpl(functionRegistry),
(engine) -> new KsqlEngineMetrics(engine, serviceInfo.customMetricsTags()));
(engine) -> new KsqlEngineMetrics(
engine, serviceInfo.customMetricsTags(), serviceInfo.metricsExtension()));
}

KsqlEngine(
Expand Down
Expand Up @@ -27,6 +27,7 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Supplier;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.metrics.Gauge;
Expand Down Expand Up @@ -58,41 +59,52 @@ public class KsqlEngineMetrics implements Closeable {

private final String ksqlServiceId;
private final Map<String, String> customMetricsTags;
private final Optional<KsqlMetricsExtension> metricsExtension;

private final KsqlEngine ksqlEngine;
private final Metrics metrics;

public KsqlEngineMetrics(
final KsqlEngine ksqlEngine,
final Map<String, String> customMetricsTags) {
this(METRIC_GROUP_PREFIX, ksqlEngine, MetricCollectors.getMetrics(), customMetricsTags);
final Map<String, String> customMetricsTags,
final Optional<KsqlMetricsExtension> metricsExtension) {
this(
METRIC_GROUP_PREFIX,
ksqlEngine,
MetricCollectors.getMetrics(),
customMetricsTags,
metricsExtension);
}

KsqlEngineMetrics(
final String metricGroupPrefix,
final KsqlEngine ksqlEngine,
final Metrics metrics,
final Map<String, String> customMetricsTags) {
final Map<String, String> customMetricsTags,
final Optional<KsqlMetricsExtension> metricsExtension) {
this.ksqlEngine = ksqlEngine;
this.ksqlServiceId = KsqlConstants.KSQL_INTERNAL_TOPIC_PREFIX + ksqlEngine.getServiceId();
this.sensors = new ArrayList<>();
this.countMetrics = new ArrayList<>();
this.metricGroupName = metricGroupPrefix + "-query-stats";
this.customMetricsTags = customMetricsTags;
this.metricsExtension = metricsExtension;

this.metrics = metrics;

configureNumActiveQueries(metrics);
configureNumPersistentQueries(metrics);
this.messagesIn = configureMessagesIn(metrics);
this.totalMessagesIn = configureTotalMessagesIn(metrics);
this.totalBytesIn = configureTotalBytesIn(metrics);
this.messagesOut = configureMessagesOut(metrics);
this.numIdleQueries = configureIdleQueriesSensor(metrics);
this.messageConsumptionByQuery = configureMessageConsumptionByQuerySensor(metrics);
this.errorRate = configureErrorRate(metrics);
configureNumActiveQueries();
configureNumPersistentQueries();
this.messagesIn = configureMessagesIn();
this.totalMessagesIn = configureTotalMessagesIn();
this.totalBytesIn = configureTotalBytesIn();
this.messagesOut = configureMessagesOut();
this.numIdleQueries = configureIdleQueriesSensor();
this.messageConsumptionByQuery = configureMessageConsumptionByQuerySensor();
this.errorRate = configureErrorRate();
Arrays.stream(State.values())
.forEach(state -> configureNumActiveQueriesForGivenState(metrics, state));
.forEach(state -> configureNumActiveQueriesForGivenState(state));

configureCustomMetrics();
}

@Override
Expand Down Expand Up @@ -152,7 +164,7 @@ private void recordErrorRate(final double value) {
this.errorRate.record(value);
}

private Sensor configureErrorRate(final Metrics metrics) {
private Sensor configureErrorRate() {
final String metricName = "error-rate";
final String description =
"The number of messages which were consumed but not processed. "
Expand All @@ -161,40 +173,37 @@ private Sensor configureErrorRate(final Metrics metrics) {
+ "Alternately, a consumed messages may not have been produced, hence "
+ "being effectively dropped. Such messages would also be counted "
+ "toward the error rate.";
return createSensor(metrics, metricName, description, Value::new);
return createSensor(KsqlMetric.of(metricName, description, Value::new));
}

private Sensor configureMessagesOut(final Metrics metrics) {
private Sensor configureMessagesOut() {
final String metricName = "messages-produced-per-sec";
final String description = "The number of messages produced per second across all queries";
return createSensor(metrics, metricName, description, Value::new);
return createSensor(KsqlMetric.of(metricName, description, Value::new));
}

private Sensor configureMessagesIn(final Metrics metrics) {
private Sensor configureMessagesIn() {
final String metricName = "messages-consumed-per-sec";
final String description = "The number of messages consumed per second across all queries";
return createSensor(metrics, metricName, description, Value::new);
return createSensor(KsqlMetric.of(metricName, description, Value::new));
}

private Sensor configureTotalMessagesIn(final Metrics metrics) {
private Sensor configureTotalMessagesIn() {
final String metricName = "messages-consumed-total";
final String description = "The total number of messages consumed across all queries";
return createSensor(metrics, metricName, description, Value::new);
return createSensor(KsqlMetric.of(metricName, description, Value::new));
}

private Sensor configureTotalBytesIn(final Metrics metrics) {
private Sensor configureTotalBytesIn() {
final String metricName = "bytes-consumed-total";
final String description = "The total number of bytes consumed across all queries";
return createSensor(metrics, metricName, description, Value::new);
return createSensor(KsqlMetric.of(metricName, description, Value::new));
}

private void configureNumActiveQueries(final Metrics metrics) {
private void configureNumActiveQueries() {
final String metricName = "num-active-queries";
final String description = "The current number of active queries running in this engine";
createSensor(
metrics,
metricName,
description,
final Supplier<MeasurableStat> statSupplier =
() -> new MeasurableStat() {
@Override
public double measure(final MetricConfig metricConfig, final long l) {
Expand All @@ -205,17 +214,14 @@ public double measure(final MetricConfig metricConfig, final long l) {
public void record(final MetricConfig metricConfig, final double v, final long l) {
// We don't want to record anything, since the engine tracks query counts internally
}
}
);
};
createSensor(KsqlMetric.of(metricName, description, statSupplier));
}

private void configureNumPersistentQueries(final Metrics metrics) {
private void configureNumPersistentQueries() {
final String metricName = "num-persistent-queries";
final String description = "The current number of persistent queries running in this engine";
createSensor(
metrics,
metricName,
description,
final Supplier<MeasurableStat> statSupplier =
() -> new MeasurableStat() {
@Override
public double measure(final MetricConfig metricConfig, final long l) {
Expand All @@ -226,78 +232,63 @@ public double measure(final MetricConfig metricConfig, final long l) {
public void record(final MetricConfig metricConfig, final double v, final long l) {
// We don't want to record anything, since the engine tracks query counts internally
}
}
);
};
createSensor(KsqlMetric.of(metricName, description, statSupplier));
}

private Sensor configureIdleQueriesSensor(final Metrics metrics) {
private Sensor configureIdleQueriesSensor() {
final String metricName = "num-idle-queries";
final String description = "Number of inactive queries";
final Sensor sensor = createSensor(metrics, metricName, description, Value::new);
return sensor;
return createSensor(KsqlMetric.of(metricName, description, Value::new));
}

private Sensor configureMessageConsumptionByQuerySensor(final Metrics metrics) {
final Sensor sensor = createSensor(metrics, "message-consumption-by-query");
private Sensor configureMessageConsumptionByQuerySensor() {
final Sensor sensor = createSensor("message-consumption-by-query");
configureMetric(
metrics,
sensor,
"messages-consumed-max",
"max msgs consumed by query",
Max::new
KsqlMetric.of("messages-consumed-max", "max msgs consumed by query", Max::new)
);
configureMetric(
metrics,
sensor,
"messages-consumed-min",
"min msgs consumed by query",
Min::new
KsqlMetric.of("messages-consumed-min", "min msgs consumed by query", Min::new)
);
configureMetric(
metrics,
sensor,
"messages-consumed-avg",
"mean msgs consumed by query",
Avg::new
KsqlMetric.of("messages-consumed-avg", "mean msgs consumed by query", Avg::new)
);
return sensor;
}

private void configureMetric(
final Metrics metrics,
final Sensor sensor,
final String metricName,
final String description,
final Supplier<MeasurableStat> statSupplier) {
final KsqlMetric metric) {
// legacy
sensor.add(
metrics.metricName(ksqlServiceId + metricName, metricGroupName, description),
statSupplier.get());
metrics.metricName(ksqlServiceId + metric.name(), metricGroupName, metric.description()),
metric.statSupplier().get());
// new
sensor.add(
metrics.metricName(
metricName, ksqlServiceId + metricGroupName, description, customMetricsTags),
statSupplier.get());
metric.name(),
ksqlServiceId + metricGroupName,
metric.description(),
customMetricsTags),
metric.statSupplier().get());
}

private Sensor createSensor(final Metrics metrics, final String sensorName) {
private Sensor createSensor(final String sensorName) {
final Sensor sensor = metrics.sensor(metricGroupName + "-" + sensorName);
sensors.add(sensor);
return sensor;
}

private Sensor createSensor(
final Metrics metrics,
final String metricName,
final String description,
final Supplier<MeasurableStat> statSupplier) {
final Sensor sensor = createSensor(metrics, metricName);
configureMetric(metrics, sensor, metricName, description, statSupplier);
private Sensor createSensor(final KsqlMetric metric) {
final Sensor sensor = createSensor(metric.name());
configureMetric(sensor, metric);
return sensor;
}

private void configureGaugeForState(
final Metrics metrics,
final String name,
final String group,
final Map<String, String> tags,
Expand All @@ -317,27 +308,33 @@ private void configureGaugeForState(
}

private void configureNumActiveQueriesForGivenState(
final Metrics metrics,
final KafkaStreams.State state) {
final String name = state + "-queries";
// legacy
configureGaugeForState(
metrics,
ksqlServiceId + metricGroupName + "-" + name,
metricGroupName,
Collections.emptyMap(),
state
);
// new
configureGaugeForState(
metrics,
name,
ksqlServiceId + metricGroupName,
customMetricsTags,
state
);
}

private void configureCustomMetrics() {
if (!metricsExtension.isPresent()) {
return;
}

final List<KsqlMetric> customMetrics = metricsExtension.get().getCustomMetrics();
customMetrics.forEach(this::createSensor);
}

private static class CountMetric {
private final Gauge<Long> count;
private final MetricName metricName;
Expand Down