Skip to content

Commit

Permalink
feat: add persistent query saturation metric (#7955)
Browse files Browse the repository at this point in the history
* feat: add persistent query saturation metric

Add a persistent query saturation metric, and report it over JMX. Saturation
is computed by sampling total-blocked-time for each stream thread. We sample
the total blocked time every minute, and compute saturation by looking back
5 minutes and computing how long the thread was blocked in that interval.
Saturation is (5 minutes - blocked time)/(5 minutes).

This patch also adds a reporter that exposes reported data points over JMX.
The reporter adds a new Metric for every new data point name/tag combination
it sees, and implements the metric by reading the latest data point for the
name/tag, with a threshold on staleness.

* add logs

* review feedback

* rename a couple things

* update metric/group names

* fix bug

* checkstyle
  • Loading branch information
rodesai committed Sep 2, 2021
1 parent fd9faf2 commit eed625b
Show file tree
Hide file tree
Showing 7 changed files with 1,055 additions and 5 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* Copyright 2021 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.internal;

import java.time.Duration;
import java.time.Instant;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.metrics.Gauge;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;

public class JmxDataPointsReporter implements MetricsReporter {
private final Metrics metrics;
private final String group;
private final Map<MetricName, DataPointBasedGauge> gauges = new ConcurrentHashMap<>();
private final Duration staleThreshold;

public JmxDataPointsReporter(
final Metrics metrics,
final String group,
final Duration staleThreshold
) {
this.metrics = Objects.requireNonNull(metrics, "metrics");
this.group = Objects.requireNonNull(group, "group");
this.staleThreshold = Objects.requireNonNull(staleThreshold, "staleThreshold");
}

@Override
public void report(final List<DataPoint> dataPoints) {
dataPoints.forEach(this::report);
}

private void report(final DataPoint dataPoint) {
final MetricName metricName
= metrics.metricName(dataPoint.getName(), group, dataPoint.getTags());
if (gauges.containsKey(metricName)) {
gauges.get(metricName).dataPointRef.set(dataPoint);
} else {
gauges.put(metricName, new DataPointBasedGauge(dataPoint, staleThreshold));
metrics.addMetric(metricName, gauges.get(metricName));
}
}

@Override
public void cleanup(final String name, final Map<String, String> tags) {
final MetricName metricName = metrics.metricName(name, group, tags);
metrics.removeMetric(metricName);
gauges.remove(metricName);
}

@Override
public void close() {
}

@Override
public void configure(final Map<String, ?> map) {
}

private static final class DataPointBasedGauge implements Gauge<Object> {
private final AtomicReference<DataPoint> dataPointRef;
private final Duration staleThreshold;

private DataPointBasedGauge(
final DataPoint initial,
final Duration staleThreshold
) {
this.dataPointRef = new AtomicReference<>(initial);
this.staleThreshold = staleThreshold;
}

@Override
public Object value(final MetricConfig metricConfig, final long now) {
final DataPoint dataPoint = dataPointRef.get();
if (dataPoint.getTime().isAfter(Instant.ofEpochMilli(now).minus(staleThreshold))) {
return dataPoint.getValue();
}
return null;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.Supplier;

/**
* This interface is used to report metrics as data points to a
Expand Down Expand Up @@ -107,7 +106,12 @@ public int hashCode() {
/**
* Reports a list of data points.
*
* @param dataPointSupplier supplier of the list of data points
* @param dataPoints the list of data points
*/
void report(Supplier<List<DataPoint>> dataPointSupplier);
}
void report(List<DataPoint> dataPoints);

/**
* Notifies the reporter that the metric with name and tags can be cleaned up
*/
void cleanup(String name, Map<String, String> tags);
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,9 @@ public class QueryMetadataImpl implements QueryMetadata {

private volatile boolean everStarted = false;
protected volatile boolean closed = false;
private volatile KafkaStreams kafkaStreams;
// These fields don't need synchronization because they are initialized in initialize() before
// the object is made available to other threads.
private KafkaStreams kafkaStreams;
private boolean initialized = false;
private boolean corruptionCommandTopic = false;

Expand Down

0 comments on commit eed625b

Please sign in to comment.