diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/internal/JmxDataPointsReporter.java b/ksqldb-engine/src/main/java/io/confluent/ksql/internal/JmxDataPointsReporter.java new file mode 100644 index 000000000000..1843e53359fc --- /dev/null +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/internal/JmxDataPointsReporter.java @@ -0,0 +1,98 @@ +/* + * Copyright 2021 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.internal; + +import java.time.Duration; +import java.time.Instant; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicReference; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.metrics.Gauge; +import org.apache.kafka.common.metrics.MetricConfig; +import org.apache.kafka.common.metrics.Metrics; + +public class JmxDataPointsReporter implements MetricsReporter { + private final Metrics metrics; + private final String group; + private final Map gauges = new ConcurrentHashMap<>(); + private final Duration staleThreshold; + + public JmxDataPointsReporter( + final Metrics metrics, + final String group, + final Duration staleThreshold + ) { + this.metrics = Objects.requireNonNull(metrics, "metrics"); + this.group = Objects.requireNonNull(group, "group"); + this.staleThreshold = Objects.requireNonNull(staleThreshold, "staleThreshold"); + } + + @Override + public void report(final List dataPoints) { + dataPoints.forEach(this::report); + } + + private void report(final DataPoint dataPoint) { + final MetricName metricName + = metrics.metricName(dataPoint.getName(), group, dataPoint.getTags()); + if (gauges.containsKey(metricName)) { + gauges.get(metricName).dataPointRef.set(dataPoint); + } else { + gauges.put(metricName, new DataPointBasedGauge(dataPoint, staleThreshold)); + metrics.addMetric(metricName, gauges.get(metricName)); + } + } + + @Override + public void cleanup(final String name, final Map tags) { + final MetricName metricName = metrics.metricName(name, group, tags); + metrics.removeMetric(metricName); + gauges.remove(metricName); + } + + @Override + public void close() { + } + + @Override + public void configure(final Map map) { + } + + private static final class DataPointBasedGauge implements Gauge { + private final AtomicReference dataPointRef; + private final Duration staleThreshold; + + private DataPointBasedGauge( + final DataPoint initial, + final Duration staleThreshold + ) { + this.dataPointRef = new AtomicReference<>(initial); + this.staleThreshold = staleThreshold; + } + + @Override + public Object value(final MetricConfig metricConfig, final long now) { + final DataPoint dataPoint = dataPointRef.get(); + if (dataPoint.getTime().isAfter(Instant.ofEpochMilli(now).minus(staleThreshold))) { + return dataPoint.getValue(); + } + return null; + } + } +} diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/internal/MetricsReporter.java b/ksqldb-engine/src/main/java/io/confluent/ksql/internal/MetricsReporter.java index 7084521926b2..68ccb09ebd9b 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/internal/MetricsReporter.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/internal/MetricsReporter.java @@ -24,7 +24,6 @@ import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.function.Supplier; /** * This interface is used to report metrics as data points to a @@ -107,7 +106,12 @@ public int hashCode() { /** * Reports a list of data points. * - * @param dataPointSupplier supplier of the list of data points + * @param dataPoints the list of data points */ - void report(Supplier> dataPointSupplier); -} \ No newline at end of file + void report(List dataPoints); + + /** + * Notifies the reporter that the metric with name and tags can be cleaned up + */ + void cleanup(String name, Map tags); +} diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/util/QueryMetadataImpl.java b/ksqldb-engine/src/main/java/io/confluent/ksql/util/QueryMetadataImpl.java index 6904614cf89f..fe210f466270 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/util/QueryMetadataImpl.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/util/QueryMetadataImpl.java @@ -73,9 +73,9 @@ public class QueryMetadataImpl implements QueryMetadata { private volatile boolean everStarted = false; protected volatile boolean closed = false; + private volatile KafkaStreams kafkaStreams; // These fields don't need synchronization because they are initialized in initialize() before // the object is made available to other threads. - private KafkaStreams kafkaStreams; private boolean initialized = false; private boolean corruptionCommandTopic = false; diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/utilization/PersistentQuerySaturationMetrics.java b/ksqldb-engine/src/main/java/io/confluent/ksql/utilization/PersistentQuerySaturationMetrics.java new file mode 100644 index 000000000000..9a7b9047de0d --- /dev/null +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/utilization/PersistentQuerySaturationMetrics.java @@ -0,0 +1,365 @@ +/* + * Copyright 2021 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.utilization; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Sets; +import io.confluent.ksql.engine.KsqlEngine; +import io.confluent.ksql.internal.MetricsReporter; +import io.confluent.ksql.internal.MetricsReporter.DataPoint; +import io.confluent.ksql.query.QueryId; +import io.confluent.ksql.util.PersistentQueryMetadata; +import java.time.Duration; +import java.time.Instant; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.function.Supplier; +import java.util.stream.Collectors; +import org.apache.kafka.common.Metric; +import org.apache.kafka.streams.KafkaStreams; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class PersistentQuerySaturationMetrics implements Runnable { + private static final Logger LOGGER + = LoggerFactory.getLogger(PersistentQuerySaturationMetrics.class); + + private static final String QUERY_SATURATION = "node-query-saturation"; + private static final String NODE_QUERY_SATURATION = "max-node-query-saturation"; + private static final String QUERY_THREAD_SATURATION = "query-thread-saturation"; + private static final String STREAMS_TOTAL_BLOCKED_TIME = "blocked-time-ns-total"; + private static final String STREAMS_THREAD_START_TIME = "thread-start-time"; + private static final String STREAMS_THREAD_METRICS_GROUP = "stream-thread-metrics"; + private static final String THREAD_ID = "thread-id"; + private static final String QUERY_ID = "query-id"; + + private final Map perKafkaStreamsStats = new HashMap<>(); + private final KsqlEngine engine; + private final Supplier time; + private final MetricsReporter reporter; + private final Duration window; + private final Duration sampleMargin; + + public PersistentQuerySaturationMetrics( + final KsqlEngine engine, + final MetricsReporter reporter, + final Duration window, + final Duration sampleMargin + ) { + this(Instant::now, engine, reporter, window, sampleMargin); + } + + @VisibleForTesting + PersistentQuerySaturationMetrics( + final Supplier time, + final KsqlEngine engine, + final MetricsReporter reporter, + final Duration window, + final Duration sampleMargin + ) { + this.time = Objects.requireNonNull(time, "time"); + this.engine = Objects.requireNonNull(engine, "engine"); + this.reporter = Objects.requireNonNull(reporter, "reporter"); + this.window = Objects.requireNonNull(window, "window"); + this.sampleMargin = Objects.requireNonNull(sampleMargin, "sampleMargin"); + } + + @Override + public void run() { + final Instant now = time.get(); + + try { + final Collection queries = engine.getPersistentQueries(); + final Optional saturation = queries.stream() + .collect(Collectors.groupingBy(PersistentQueryMetadata::getQueryApplicationId)) + .entrySet() + .stream() + .map(e -> measure(now, e.getKey(), e.getValue())) + .max(PersistentQuerySaturationMetrics::compareSaturation) + .orElse(Optional.of(0.0)); + saturation.ifPresent(s -> report(now, s)); + + final Set appIds = queries.stream() + .map(PersistentQueryMetadata::getQueryApplicationId) + .collect(Collectors.toSet()); + for (final String appId + : Sets.difference(new HashSet<>(perKafkaStreamsStats.keySet()), appIds)) { + perKafkaStreamsStats.get(appId).cleanup(reporter); + perKafkaStreamsStats.remove(appId); + } + } catch (final RuntimeException e) { + LOGGER.error("Error collecting saturation", e); + throw e; + } + } + + private static int compareSaturation(final Optional a, final Optional b) { + // A missing value means we could not compute saturation for some unit (thread, runtime, etc) + // therefore, the result of the comparison is unknown. + if (!a.isPresent()) { + return 1; + } else if (!b.isPresent()) { + return -1; + } else { + return Double.compare(a.get(), b.get()); + } + } + + private Optional measure( + final Instant now, + final String appId, + final List queryMetadata + ) { + final KafkaStreamsSaturation ksSaturation = perKafkaStreamsStats.computeIfAbsent( + appId, + k -> new KafkaStreamsSaturation(window, sampleMargin) + ); + final Optional kafkaStreams = queryMetadata.stream() + .filter(q -> q.getKafkaStreams() != null) + .map(PersistentQueryMetadata::getKafkaStreams) + .findFirst(); + if (!kafkaStreams.isPresent()) { + return Optional.of(0.0); + } + final List queryIds = queryMetadata.stream() + .map(PersistentQueryMetadata::getQueryId) + .collect(Collectors.toList()); + return ksSaturation.measure(now, queryIds, kafkaStreams.get(), reporter); + } + + private void report(final Instant now, final double saturation) { + LOGGER.info("reporting node-level saturation {}", saturation); + reporter.report( + ImmutableList.of( + new DataPoint( + now, + NODE_QUERY_SATURATION, + saturation, + Collections.emptyMap() + ) + ) + ); + } + + private static final class KafkaStreamsSaturation { + private final Set queryIds = new HashSet<>(); + private final Map perThreadSaturation = new HashMap<>(); + private final Duration window; + private final Duration sampleMargin; + + private KafkaStreamsSaturation( + final Duration window, + final Duration sampleMargin + ) { + this.window = Objects.requireNonNull(window, "window"); + this.sampleMargin = Objects.requireNonNull(sampleMargin, "sampleMargin"); + } + + private void reportThreadSaturation( + final Instant now, + final double saturation, + final String name, + final MetricsReporter reporter + ) { + LOGGER.info("Reporting thread saturation {} for {}", saturation, name); + reporter.report(ImmutableList.of( + new DataPoint( + now, + QUERY_THREAD_SATURATION, + saturation, + ImmutableMap.of(THREAD_ID, name) + ) + )); + } + + private void reportQuerySaturation( + final Instant now, + final double saturation, + final MetricsReporter reporter + ) { + for (final QueryId queryId : queryIds) { + LOGGER.info("Reporting query saturation {} for {}", saturation, queryId); + reporter.report(ImmutableList.of( + new DataPoint( + now, + QUERY_SATURATION, + saturation, + ImmutableMap.of(QUERY_ID, queryId.toString()) + ) + )); + } + } + + private void updateQueryIds(final List newQueryIds, final MetricsReporter reporter) { + for (final QueryId queryId : Sets.difference(queryIds, new HashSet<>(newQueryIds))) { + reporter.cleanup(QUERY_SATURATION, ImmutableMap.of(QUERY_ID, queryId.toString())); + } + queryIds.clear(); + queryIds.addAll(newQueryIds); + } + + private Map> metricsByThread(final KafkaStreams kafkaStreams) { + return kafkaStreams.metrics().values().stream() + .filter(m -> m.metricName().group().equals(STREAMS_THREAD_METRICS_GROUP)) + .collect(Collectors.groupingBy( + m -> m.metricName().tags().get(THREAD_ID), + Collectors.toMap(m -> m.metricName().name(), m -> m, (a, b) -> a) + )); + } + + private Optional measure( + final Instant now, + final List queryIds, + final KafkaStreams kafkaStreams, + final MetricsReporter reporter + ) { + updateQueryIds(queryIds, reporter); + final Map> byThread = metricsByThread(kafkaStreams); + Optional saturation = Optional.of(0.0); + for (final Map.Entry> entry : byThread.entrySet()) { + final String threadName = entry.getKey(); + final Map metricsForThread = entry.getValue(); + if (!metricsForThread.containsKey(STREAMS_TOTAL_BLOCKED_TIME) + || !metricsForThread.containsKey(STREAMS_THREAD_START_TIME)) { + LOGGER.info("Missing required metrics for thread: {}", threadName); + continue; + } + final double totalBlocked = + (double) metricsForThread.get(STREAMS_TOTAL_BLOCKED_TIME).metricValue(); + final long startTime = + (long) metricsForThread.get(STREAMS_THREAD_START_TIME).metricValue(); + final ThreadSaturation threadSaturation = perThreadSaturation.computeIfAbsent( + threadName, + k -> { + LOGGER.debug("Adding saturation for new thread: {}", k); + return new ThreadSaturation(threadName, startTime, window, sampleMargin); + } + ); + LOGGER.debug("Record thread {} sample {} {}", threadName, totalBlocked, startTime); + final BlockedTimeSample blockedTimeSample = new BlockedTimeSample(now, totalBlocked); + final Optional measured = threadSaturation.measure(now, blockedTimeSample); + LOGGER.debug( + "Measured value for thread {}: {}", + threadName, + (measured.map(Object::toString).orElse("")) + ); + measured.ifPresent(m -> reportThreadSaturation(now, m, threadName, reporter)); + saturation = compareSaturation(saturation, measured) > 0 ? saturation : measured; + } + saturation.ifPresent(s -> reportQuerySaturation(now, s, reporter)); + for (final String threadName + : Sets.difference(new HashSet<>(perThreadSaturation.keySet()), byThread.keySet())) { + perThreadSaturation.remove(threadName); + reporter.cleanup(QUERY_THREAD_SATURATION, ImmutableMap.of(THREAD_ID, threadName)); + } + return saturation; + } + + private void cleanup(final MetricsReporter reporter) { + for (final String threadName : perThreadSaturation.keySet()) { + reporter.cleanup(QUERY_THREAD_SATURATION, ImmutableMap.of(THREAD_ID, threadName)); + } + for (final QueryId queryId : queryIds) { + reporter.cleanup(QUERY_SATURATION, ImmutableMap.of(QUERY_ID, queryId.toString())); + } + } + } + + private static final class ThreadSaturation { + private final String threadName; + private final List samples = new LinkedList<>(); + private final Instant startTime; + private final Duration window; + private final Duration sampleMargin; + + private ThreadSaturation( + final String threadName, + final long startTime, + final Duration window, + final Duration sampleMargin + ) { + this.threadName = Objects.requireNonNull(threadName, "threadName"); + this.startTime = Instant.ofEpochMilli(startTime); + this.window = Objects.requireNonNull(window, "window"); + this.sampleMargin = Objects.requireNonNull(sampleMargin, "sampleMargin"); + } + + private boolean inRange(final Instant time, final Instant start, final Instant end) { + return time.isAfter(start) && time.isBefore(end); + } + + private Optional measure(final Instant now, final BlockedTimeSample current) { + final Instant windowStart = now.minus(window); + final Instant earliest = now.minus(window.plus(sampleMargin)); + final Instant latest = now.minus(window.minus(sampleMargin)); + LOGGER.debug( + "{}: record and measure with now {}, window {} ({} : {})", + threadName, + now, + windowStart, + earliest, + latest + ); + samples.add(current); + samples.removeIf(s -> s.timestamp.isBefore(earliest)); + if (!inRange(samples.get(0).timestamp, earliest, latest) && !startTime.isAfter(windowStart)) { + return Optional.empty(); + } + final BlockedTimeSample startSample = samples.get(0); + LOGGER.debug("{}: start sample {}", threadName, startSample); + double blocked = + Math.max(current.totalBlockedTime - startSample.totalBlockedTime, 0); + Instant observedStart = samples.get(0).timestamp; + if (startTime.isAfter(windowStart)) { + LOGGER.debug("{}: start time {} is after window start", threadName, startTime); + blocked += Duration.between(windowStart, startTime).toNanos(); + observedStart = windowStart; + } + + final Duration duration = Duration.between(observedStart, current.timestamp); + final double durationNs = duration.toNanos(); + return Optional.of((durationNs - Math.min(blocked, durationNs)) / durationNs); + } + } + + private static final class BlockedTimeSample { + private final Instant timestamp; + private final double totalBlockedTime; + + private BlockedTimeSample(final Instant timestamp, final double totalBlockedTime) { + this.timestamp = timestamp; + this.totalBlockedTime = totalBlockedTime; + } + + @Override + public String toString() { + return "BlockedTimeSample{" + + "timestamp=" + timestamp + + ", totalBlockedTime=" + totalBlockedTime + + '}'; + } + } +} diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/internal/JmxDataPointsReporterTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/internal/JmxDataPointsReporterTest.java new file mode 100644 index 000000000000..ba4294ad3399 --- /dev/null +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/internal/JmxDataPointsReporterTest.java @@ -0,0 +1,153 @@ +/* + * Copyright 2021 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.internal; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.nullValue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyMap; +import static org.mockito.ArgumentMatchers.same; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import io.confluent.ksql.internal.MetricsReporter.DataPoint; +import java.time.Duration; +import java.time.Instant; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.metrics.Gauge; +import org.apache.kafka.common.metrics.Metrics; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnit; +import org.mockito.junit.MockitoRule; + +public class JmxDataPointsReporterTest { + private static final String GROUP = "group"; + private static final Duration STALE_THRESHOLD = Duration.ofMinutes(1); + private static final Instant A_TIME = Instant.now(); + + @Mock + private Metrics metrics; + @Mock + private MetricName metricName; + @Captor + private ArgumentCaptor> metricCaptor; + private JmxDataPointsReporter reporter; + + @Rule + public final MockitoRule mockitoRule = MockitoJUnit.rule(); + + @Before + public void setup() { + when(metrics.metricName(any(), any(), anyMap())).thenReturn(metricName); + reporter = new JmxDataPointsReporter(metrics, GROUP, STALE_THRESHOLD); + } + + @Test + public void shouldAddGaugeForNewPoint() { + // When: + reporter.report(ImmutableList.of( + new DataPoint( + A_TIME, + "baz", + 123, + ImmutableMap.of("foo", "bar") + ) + )); + + // Then: + verify(metrics).addMetric(same(metricName), metricCaptor.capture()); + assertThat(metricCaptor.getValue().value(null, A_TIME.toEpochMilli()), is(123)); + } + + @Test + public void shouldUpdateGauge() { + // Given: + reporter.report(ImmutableList.of( + new DataPoint( + A_TIME, + "baz", + 123, + ImmutableMap.of("foo", "bar") + ) + )); + + // When: + reporter.report(ImmutableList.of( + new DataPoint( + A_TIME.plusSeconds(1), + "baz", + 456, + ImmutableMap.of("foo", "bar") + ) + )); + + // Then: + verify(metrics).addMetric(same(metricName), metricCaptor.capture()); + assertThat( + metricCaptor.getValue().value(null, A_TIME.plusSeconds(1).toEpochMilli()), + is(456) + ); + } + + @Test + public void shouldCleanup() { + // Given: + reporter.report(ImmutableList.of( + new DataPoint( + A_TIME, + "baz", + 123, + ImmutableMap.of("foo", "bar") + ) + )); + + // When: + reporter.cleanup("baz", ImmutableMap.of("foo", "bar")); + + // Then: + verify(metrics).removeMetric(metricName); + } + + @Test + public void shouldReturnNullForStalePoint() { + // When: + reporter.report(ImmutableList.of( + new DataPoint( + A_TIME, + "baz", + 123, + ImmutableMap.of("foo", "bar") + ) + )); + + // Then: + verify(metrics).addMetric(same(metricName), metricCaptor.capture()); + assertThat( + metricCaptor.getValue().value( + null, + A_TIME.plus(STALE_THRESHOLD.multipliedBy(2)).toEpochMilli()), + nullValue() + ); + } +} \ No newline at end of file diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/utilization/PersistentQuerySaturationMetricsTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/utilization/PersistentQuerySaturationMetricsTest.java new file mode 100644 index 000000000000..90ab3e7cd7e5 --- /dev/null +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/utilization/PersistentQuerySaturationMetricsTest.java @@ -0,0 +1,408 @@ +/* + * Copyright 2021 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.utilization; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.number.IsCloseTo.closeTo; +import static org.mockito.Mockito.atLeastOnce; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoInteractions; +import static org.mockito.Mockito.when; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import io.confluent.ksql.engine.KsqlEngine; +import io.confluent.ksql.internal.MetricsReporter; +import io.confluent.ksql.internal.MetricsReporter.DataPoint; +import io.confluent.ksql.query.QueryId; +import io.confluent.ksql.util.PersistentQueryMetadata; +import java.time.Duration; +import java.time.Instant; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.function.Supplier; +import java.util.stream.Collectors; +import org.apache.kafka.common.Metric; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.streams.KafkaStreams; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnit; +import org.mockito.junit.MockitoRule; + +public class PersistentQuerySaturationMetricsTest { + private static final Duration WINDOW = Duration.ofMinutes(10); + private static final Duration SAMPLE_MARGIN = Duration.ofSeconds(15); + private static final String APP_ID1 = "ping"; + private static final String APP_ID2 = "pong"; + private static final QueryId QUERY_ID1 = new QueryId("hootie"); + private static final QueryId QUERY_ID2 = new QueryId("hoo"); + private static final QueryId QUERY_ID3 = new QueryId("boom"); + + @Mock + private MetricsReporter reporter; + @Mock + private Supplier clock; + @Mock + private KafkaStreams kafkaStreams1; + @Mock + private KafkaStreams kafkaStreams2; + @Mock + private PersistentQueryMetadata query1; + @Mock + private PersistentQueryMetadata query2; + @Mock + private PersistentQueryMetadata query3; + @Mock + private KsqlEngine engine; + @Captor + private ArgumentCaptor> reportedPoints; + + private PersistentQuerySaturationMetrics collector; + + @Rule + public final MockitoRule mockitoRule = MockitoJUnit.rule(); + + @Before + public void setup() { + when(engine.getPersistentQueries()).thenReturn(ImmutableList.of(query1, query2, query3)); + when(query1.getQueryId()).thenReturn(QUERY_ID1); + when(query1.getKafkaStreams()).thenReturn(kafkaStreams1); + when(query1.getQueryApplicationId()).thenReturn(APP_ID1); + when(query2.getQueryId()).thenReturn(QUERY_ID2); + when(query2.getKafkaStreams()).thenReturn(kafkaStreams2); + when(query2.getQueryApplicationId()).thenReturn(APP_ID2); + when(query3.getQueryId()).thenReturn(QUERY_ID3); + when(query3.getKafkaStreams()).thenReturn(kafkaStreams1); + when(query3.getQueryApplicationId()).thenReturn(APP_ID1); + collector = new PersistentQuerySaturationMetrics( + clock, + engine, + reporter, + WINDOW, + SAMPLE_MARGIN + ); + } + + @Test + public void shouldComputeSaturationForThread() { + // Given: + final Instant start = Instant.now(); + when(clock.get()).thenReturn(start); + givenMetrics(kafkaStreams1) + .withThreadStartTime("t1", start.minus(WINDOW.multipliedBy(2))) + .withBlockedTime("t1", Duration.ofMinutes(1)); + collector.run(); + when(clock.get()).thenReturn(start.plus(WINDOW)); + givenMetrics(kafkaStreams1) + .withThreadStartTime("t1", start.minus(WINDOW.multipliedBy(2))) + .withBlockedTime("t1", Duration.ofMinutes(2)); + + // When: + collector.run(); + + // Then: + final DataPoint point = verifyAndGetLatestDataPoint( + "query-thread-saturation", + ImmutableMap.of("thread-id", "t1") + ); + assertThat((Double) point.getValue(), closeTo(.9, .01)); + } + + @Test + public void shouldComputeSaturationForQuery() { + // Given: + final Instant start = Instant.now(); + when(clock.get()).thenReturn(start); + givenMetrics(kafkaStreams1) + .withThreadStartTime("t1", start.minus(WINDOW.multipliedBy(2))) + .withBlockedTime("t1", Duration.ofMinutes(2)) + .withThreadStartTime("t2", start.minus(WINDOW.multipliedBy(2))) + .withBlockedTime("t2", Duration.ofMinutes(2)); + collector.run(); + when(clock.get()).thenReturn(start.plus(WINDOW)); + givenMetrics(kafkaStreams1) + .withThreadStartTime("t1", start.minus(WINDOW.multipliedBy(2))) + .withBlockedTime("t1", Duration.ofMinutes(3)) + .withThreadStartTime("t2", start.minus(WINDOW.multipliedBy(2))) + .withBlockedTime("t2", Duration.ofMinutes(7)); + + // When: + collector.run(); + + // Then: + final DataPoint point = verifyAndGetLatestDataPoint( + "node-query-saturation", + ImmutableMap.of("query-id", "hootie") + ); + assertThat((Double) point.getValue(), closeTo(.9, .01)); + } + + @Test + public void shouldComputeSaturationForNode() { + // Given: + final Instant start = Instant.now(); + when(clock.get()).thenReturn(start); + givenMetrics(kafkaStreams1) + .withThreadStartTime("t1", start.minus(WINDOW.multipliedBy(2))) + .withBlockedTime("t1", Duration.ofMinutes(2)); + givenMetrics(kafkaStreams2) + .withThreadStartTime("t1", start.minus(WINDOW.multipliedBy(2))) + .withBlockedTime("t1", Duration.ofMinutes(2)); + collector.run(); + when(clock.get()).thenReturn(start.plus(WINDOW)); + givenMetrics(kafkaStreams1) + .withThreadStartTime("t1", start.minus(WINDOW.multipliedBy(2))) + .withBlockedTime("t1", Duration.ofMinutes(3)); + givenMetrics(kafkaStreams2) + .withThreadStartTime("t1", start.minus(WINDOW.multipliedBy(2))) + .withBlockedTime("t1", Duration.ofMinutes(7)); + + // When: + collector.run(); + + // Then: + final DataPoint point = verifyAndGetLatestDataPoint( + "max-node-query-saturation", + Collections.emptyMap() + ); + assertThat((Double) point.getValue(), closeTo(.9, .01)); + } + + @Test + public void shouldIgnoreSamplesOutsideMargin() { + // Given: + final Instant start = Instant.now(); + when(clock.get()).thenReturn(start); + givenMetrics(kafkaStreams1) + .withThreadStartTime("t1", start.minus(WINDOW.multipliedBy(2))) + .withBlockedTime("t1", Duration.ofMinutes(2)); + collector.run(); + when(clock.get()).thenReturn(start.plus(WINDOW.plus(SAMPLE_MARGIN.multipliedBy(2)))); + givenMetrics(kafkaStreams1) + .withThreadStartTime("t1", start.minus(WINDOW.multipliedBy(2))) + .withBlockedTime("t1", Duration.ofMinutes(3)); + + // When: + collector.run(); + + // Then: + verifyNoDataPoints("max-node-query-saturation", Collections.emptyMap()); + } + + @Test + public void shouldCountThreadBlockedToStart() { + // Given: + final Instant start = Instant.now(); + when(clock.get()).thenReturn(start); + givenMetrics(kafkaStreams1) + .withThreadStartTime("t1", start) + .withBlockedTime("t1", Duration.ofMinutes(0)); + collector.run(); + when(clock.get()).thenReturn(start.plus(Duration.ofMinutes(2))); + givenMetrics(kafkaStreams1) + .withThreadStartTime("t1", start) + .withBlockedTime("t1", Duration.ofMinutes(0)); + + // When: + collector.run(); + + // Then: + final DataPoint point = verifyAndGetLatestDataPoint( + "query-thread-saturation", + ImmutableMap.of("thread-id", "t1") + ); + assertThat((Double) point.getValue(), closeTo(.2, .01)); + } + + @Test + public void shouldCleanupThreadMetric() { + // Given: + final Instant start = Instant.now(); + when(clock.get()).thenReturn(start); + givenMetrics(kafkaStreams1) + .withThreadStartTime("t1", start) + .withBlockedTime("t1", Duration.ofMinutes(0)); + collector.run(); + when(clock.get()).thenReturn(start.plus(Duration.ofMinutes(2))); + givenMetrics(kafkaStreams1) + .withThreadStartTime("t2", start) + .withBlockedTime("t2", Duration.ofMinutes(0)); + + // When: + collector.run(); + + // Then: + verify(reporter).cleanup("query-thread-saturation", ImmutableMap.of("thread-id", "t1")); + } + + @Test + public void shouldCleanupQueryMetricWhenRuntimeRemoved() { + // Given: + final Instant start = Instant.now(); + when(clock.get()).thenReturn(start); + givenMetrics(kafkaStreams1) + .withThreadStartTime("t1", start) + .withBlockedTime("t1", Duration.ofMinutes(0)); + collector.run(); + when(engine.getPersistentQueries()).thenReturn(ImmutableList.of(query2)); + + // When: + collector.run(); + + // Then: + verify(reporter).cleanup("node-query-saturation", ImmutableMap.of("query-id", "hootie")); + } + + @Test + public void shouldAddPointsForQueriesSharingRuntimes() { + // Given: + final Instant start = Instant.now(); + when(clock.get()).thenReturn(start); + givenMetrics(kafkaStreams1) + .withThreadStartTime("t1", start.minus(WINDOW.multipliedBy(2))) + .withBlockedTime("t1", Duration.ofMinutes(2)); + collector.run(); + when(clock.get()).thenReturn(start.plus(WINDOW)); + givenMetrics(kafkaStreams1) + .withThreadStartTime("t1", start.minus(WINDOW.multipliedBy(2))) + .withBlockedTime("t1", Duration.ofMinutes(3)); + + // When: + collector.run(); + + // Then: + final DataPoint point = verifyAndGetLatestDataPoint( + "node-query-saturation", + ImmutableMap.of("query-id", "boom") + ); + assertThat((Double) point.getValue(), closeTo(.9, .01)); + } + + @Test + public void shouldCleanupPointsForQueriesFromSharedRuntimes() { + // Given: + final Instant start = Instant.now(); + when(clock.get()).thenReturn(start); + givenMetrics(kafkaStreams1) + .withThreadStartTime("t1", start.minus(WINDOW.multipliedBy(2))) + .withBlockedTime("t1", Duration.ofMinutes(2)); + collector.run(); + when(engine.getPersistentQueries()).thenReturn(ImmutableList.of(query1, query2)); + when(clock.get()).thenReturn(start.plus(WINDOW)); + givenMetrics(kafkaStreams1) + .withThreadStartTime("t1", start.minus(WINDOW.multipliedBy(2))) + .withBlockedTime("t1", Duration.ofMinutes(3)); + + // When: + collector.run(); + + // Then: + verify(reporter).cleanup("node-query-saturation", ImmutableMap.of("query-id", "boom")); + verify(reporter, times(0)).cleanup("node-query-saturation", ImmutableMap.of("query-id", "hoo")); + } + + private List verifyAndGetDataPoints(final String name, final Map tag) { + verify(reporter, atLeastOnce()).report(reportedPoints.capture()); + return reportedPoints.getAllValues().stream() + .flatMap(List::stream) + .filter(p -> p.getName().equals(name)) + .filter(p -> p.getTags().entrySet().containsAll(tag.entrySet())) + .collect(Collectors.toList()); + } + + private DataPoint verifyAndGetLatestDataPoint(final String name, final Map tag) { + final List found = verifyAndGetDataPoints(name, tag); + assertThat(found, hasSize(greaterThan(0))); + return found.get(found.size() - 1); + } + + private void verifyNoDataPoints(final String name, final Map tag) { + verify(reporter, atLeastOnce()).report(reportedPoints.capture()); + final List found = verifyAndGetDataPoints(name, tag); + assertThat(found, hasSize(equalTo(0))); + } + + private GivenMetrics givenMetrics(final KafkaStreams kafkaStreams) { + return new GivenMetrics(kafkaStreams); + } + + private static final class GivenMetrics { + final Map metrics = new HashMap<>(); + + private GivenMetrics(final KafkaStreams kafkaStreams) { + when(kafkaStreams.metrics()).thenReturn((Map) metrics); + } + + private GivenMetrics withBlockedTime(final String threadName, final Duration blockedTime) { + final MetricName metricName = new MetricName( + "blocked-time-ns-total", + "stream-thread-metrics", + "", + ImmutableMap.of("thread-id", threadName) + ); + metrics.put( + metricName, + new Metric() { + @Override + public MetricName metricName() { + return metricName; + } + + @Override + public Object metricValue() { + return (double) blockedTime.toNanos(); + } + } + ); + return this; + } + + private GivenMetrics withThreadStartTime(final String threadName, final Instant startTime) { + final MetricName metricName = new MetricName( + "thread-start-time", + "stream-thread-metrics", + "", + ImmutableMap.of("thread-id", threadName) + ); + metrics.put( + metricName, + new Metric() { + @Override + public MetricName metricName() { + return metricName; + } + + @Override + public Object metricValue() { + return startTime.toEpochMilli(); + } + } + ); + return this; + } + } +} \ No newline at end of file diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java index 14b82dc170a1..be300481d6ca 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java @@ -42,6 +42,7 @@ import io.confluent.ksql.function.InternalFunctionRegistry; import io.confluent.ksql.function.MutableFunctionRegistry; import io.confluent.ksql.function.UserFunctionLoader; +import io.confluent.ksql.internal.JmxDataPointsReporter; import io.confluent.ksql.internal.PullQueryExecutorMetrics; import io.confluent.ksql.logging.processing.ProcessingLogConfig; import io.confluent.ksql.logging.processing.ProcessingLogContext; @@ -108,6 +109,7 @@ import io.confluent.ksql.util.ReservedInternalTopics; import io.confluent.ksql.util.RetryUtil; import io.confluent.ksql.util.WelcomeMsgUtils; +import io.confluent.ksql.utilization.PersistentQuerySaturationMetrics; import io.confluent.ksql.version.metrics.KsqlVersionCheckerAgent; import io.confluent.ksql.version.metrics.VersionCheckerAgent; import io.confluent.ksql.version.metrics.collector.KsqlModuleType; @@ -134,6 +136,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; @@ -700,6 +703,11 @@ static KsqlRestApplication buildApplication( final SpecificQueryIdGenerator specificQueryIdGenerator = new SpecificQueryIdGenerator(); + final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1, + new ThreadFactoryBuilder() + .setNameFormat("ksql-csu-metrics-reporter-%d") + .build() + ); final KsqlEngine ksqlEngine = new KsqlEngine( serviceContext, processingLogContext, @@ -710,6 +718,20 @@ static KsqlRestApplication buildApplication( Collections.emptyList() ); + final PersistentQuerySaturationMetrics saturation = new PersistentQuerySaturationMetrics( + ksqlEngine, + new JmxDataPointsReporter( + MetricCollectors.getMetrics(), "ksqldb_utilization", Duration.ofMinutes(1)), + Duration.ofMinutes(5), + Duration.ofSeconds(30) + ); + executorService.scheduleAtFixedRate( + saturation, + 0, + Duration.ofMinutes(1).toMillis(), + TimeUnit.MILLISECONDS + ); + UserFunctionLoader.newInstance(ksqlConfig, functionRegistry, ksqlInstallDir).load(); final String commandTopicName = ReservedInternalTopics.commandTopic(ksqlConfig);