diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/internal/StorageUtilizationMetricsReporter.java b/ksqldb-engine/src/main/java/io/confluent/ksql/internal/StorageUtilizationMetricsReporter.java index 0449e295294a..b6b906473a4a 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/internal/StorageUtilizationMetricsReporter.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/internal/StorageUtilizationMetricsReporter.java @@ -27,8 +27,10 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Supplier; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -47,10 +49,12 @@ public class StorageUtilizationMetricsReporter implements MetricsReporter { private static final Logger LOGGER = LoggerFactory.getLogger(StorageUtilizationMetricsReporter.class); private static final String METRIC_GROUP = "ksqldb_utilization"; + private static final String TASK_STORAGE_USED_BYTES = "task_storage_used_bytes"; private Map> metricsSeen; private Metrics metricRegistry; private static Map customTags = new HashMap<>(); + private static AtomicInteger numberStatefulTasks = new AtomicInteger(0); public StorageUtilizationMetricsReporter() { } @@ -82,7 +86,10 @@ public static void configureShared( metricRegistry.metricName("node_storage_used_bytes", METRIC_GROUP, customTags); final MetricName nodePct = metricRegistry.metricName("storage_utilization", METRIC_GROUP, customTags); - + final MetricName maxTaskPerNode = + metricRegistry.metricName("max_task_storage_used_bytes", METRIC_GROUP, customTags); + final MetricName numStatefulTasks = + metricRegistry.metricName("num_stateful_tasks", METRIC_GROUP, customTags); metricRegistry.addMetric( nodeAvailable, (Gauge) (config, now) -> baseDir.getFreeSpace() @@ -101,6 +108,14 @@ public static void configureShared( (((double) baseDir.getTotalSpace() - (double) baseDir.getFreeSpace()) / (double) baseDir.getTotalSpace()) ); + metricRegistry.addMetric( + maxTaskPerNode, + (Gauge) (config, now) -> (getMaxTaskUsage(metricRegistry)) + ); + metricRegistry.addMetric( + numStatefulTasks, + (Gauge) (config, now) -> (numberStatefulTasks.get()) + ); } @Override @@ -176,10 +191,11 @@ private synchronized void handleNewSstFilesSizeMetric( final TaskStorageMetric newMetric; // We haven't seen a metric for this query's task before if (!metricsSeen.get(queryId).containsKey(taskId)) { + numberStatefulTasks.getAndIncrement(); // create a new task level metric to track state store storage usage newMetric = new TaskStorageMetric( metricRegistry.metricName( - "task_storage_used_bytes", + TASK_STORAGE_USED_BYTES, METRIC_GROUP, taskMetricTags )); @@ -205,6 +221,7 @@ private synchronized void handleRemovedSstFileSizeMetric( ) { // remove storage metric for this task taskMetric.remove(metric); + numberStatefulTasks.getAndDecrement(); if (taskMetric.metrics.size() == 0) { // no more storage metrics for this task, can remove task gauge metricRegistry.removeMetric(taskMetric.metricName); @@ -227,6 +244,21 @@ private BigInteger computeQueryMetric(final String queryId) { } return queryMetricSum; } + + public static synchronized BigInteger getMaxTaskUsage(final Metrics metricRegistry) { + final Collection taskMetrics = metricRegistry + .metrics() + .entrySet() + .stream() + .filter(e -> e.getKey().name().contains(TASK_STORAGE_USED_BYTES)) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)) + .values(); + final Optional maxOfTaskMetrics = taskMetrics + .stream() + .map(e -> (BigInteger) e.metricValue()) + .reduce(BigInteger::max); + return maxOfTaskMetrics.orElse(BigInteger.ZERO); + } private synchronized Collection> getGaugesForQuery(final String queryId) { return metricsSeen.get(queryId).values().stream() diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/internal/StorageUtilizationMetricsReporterTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/internal/StorageUtilizationMetricsReporterTest.java index 4f0849aa664f..f5008dd1425e 100644 --- a/ksqldb-engine/src/test/java/io/confluent/ksql/internal/StorageUtilizationMetricsReporterTest.java +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/internal/StorageUtilizationMetricsReporterTest.java @@ -3,7 +3,10 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThrows; +import static org.junit.Assert.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.argThat; import static org.mockito.Mockito.mock; @@ -16,12 +19,16 @@ import java.io.File; import java.io.IOException; import java.math.BigInteger; +import java.util.Collections; import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; + import org.apache.kafka.common.MetricName; import org.apache.kafka.common.metrics.Gauge; import org.apache.kafka.common.metrics.KafkaMetric; import org.apache.kafka.common.metrics.MetricValueProvider; import org.apache.kafka.common.metrics.Metrics; +import org.codehaus.janino.Java; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -35,6 +42,7 @@ public class StorageUtilizationMetricsReporterTest { private static final String KAFKA_METRIC_NAME = "total-sst-files-size"; private static final String KAFKA_METRIC_GROUP = "streams-metric"; + private static final String KSQL_METRIC_GROUP = "ksqldb_utilization"; private static final String THREAD_ID = "_confluent_blahblah_query_CTAS_TEST_1-blahblah"; private static final String TRANSIENT_THREAD_ID = "_confluent_blahblah_transient_blahblah_4-blahblah"; private static final String TASK_STORAGE_METRIC = "task_storage_used_bytes"; @@ -85,12 +93,18 @@ public void shouldAddNodeMetricsOnConfigure() throws IOException { final Object storageUsedValue = storageUsedGauge.value(null, 0); final Gauge pctUsedGauge = verifyAndGetRegisteredMetric("storage_utilization", BASE_TAGS); final Object pctUsedValue = pctUsedGauge.value(null, 0); - + final Gauge maxTaskUsageGauge = verifyAndGetRegisteredMetric("max_task_storage_used_bytes", BASE_TAGS); + final Object maxTaskUsageValue = maxTaskUsageGauge.value(null, 0); + final Gauge numStatefulTasksGauge = verifyAndGetRegisteredMetric("num_stateful_tasks", BASE_TAGS); + final Object numStatefulTasksValue = numStatefulTasksGauge.value(null, 0); + // Then: assertThat((Long) storageFreeValue, greaterThan(0L)); assertThat((Long) storageTotalValue, greaterThan(0L)); assertThat((Long) storageUsedValue, greaterThan(0L)); assertThat((Double) pctUsedValue, greaterThan(0.0)); + assertThat((BigInteger) maxTaskUsageValue, greaterThanOrEqualTo(BigInteger.ZERO)); + assertEquals(numStatefulTasksValue, 7); } @Test @@ -253,6 +267,70 @@ public void shouldIgnoreNonSSTMetrics() { // Then: assertThrows(AssertionError.class, () -> verifyAndGetRegisteredMetric(TASK_STORAGE_METRIC, TASK_ONE_TAGS)); } + + @Test + public void shouldRecordMaxTaskUsage() { + // Given: + KafkaMetric m1 = mockMetric( + KSQL_METRIC_GROUP, + TASK_STORAGE_METRIC, + BigInteger.valueOf(2), + ImmutableMap.of("task-id", "t1")); + KafkaMetric m2 = mockMetric( + KSQL_METRIC_GROUP, + TASK_STORAGE_METRIC, + BigInteger.valueOf(5), + ImmutableMap.of("task-id", "t2")); + MetricName n1 = m1.metricName(); + MetricName n2 = m2.metricName(); + when(metrics.metrics()).thenReturn(ImmutableMap.of(n1, m1, n2, m2)); + + // When: + listener.metricChange(m1); + listener.metricChange(m2); + + // Then: + BigInteger maxVal = StorageUtilizationMetricsReporter.getMaxTaskUsage(metrics); + assertTrue(maxVal.equals(BigInteger.valueOf(5))); + } + + @Test + public void shouldRecordMaxTaskUsageWithNoTasks() { + // Given: + when(metrics.metrics()).thenReturn(Collections.EMPTY_MAP); + + // When: + + // Then: + BigInteger maxVal = StorageUtilizationMetricsReporter.getMaxTaskUsage(metrics); + assertTrue(maxVal.equals(BigInteger.valueOf(0))); + } + + @Test + public void shouldRecordNumStatefulTasks() { + // Given: + final File f = new File("/tmp/storage-test/"); + listener.configureShared(f, metrics, BASE_TAGS); + final Gauge numStatefulTasksGauge = verifyAndGetRegisteredMetric("num_stateful_tasks", BASE_TAGS); + + // When: + listener.metricChange(mockMetric( + KAFKA_METRIC_GROUP, + KAFKA_METRIC_NAME, + BigInteger.valueOf(2), + ImmutableMap.of("store-id", "s1", "task-id", "t2", "thread-id", TRANSIENT_THREAD_ID)) + ); + listener.metricChange(mockMetric( + KAFKA_METRIC_GROUP, + KAFKA_METRIC_NAME, + BigInteger.valueOf(5), + ImmutableMap.of("store-id", "s2", "task-id", "t1", "thread-id", TRANSIENT_THREAD_ID)) + ); + + // Then: + final Object numStatefulTasksValue = numStatefulTasksGauge.value(null, 0); + assertEquals((int) numStatefulTasksValue, 2); + } private KafkaMetric mockMetric( final String group, final String name, Object value, final Map tags) {