Skip to content

Commit

Permalink
feat: add max task usage and num stateful tasks metrics (#8549)
Browse files Browse the repository at this point in the history
* feat: add max task usage metric

* checkstyl
e

* add number stateful tasks

* change from atomic int to int

* handle NPEs

* review comments

* review comments

* make name constant
  • Loading branch information
lct45 committed Jan 4, 2022
1 parent 631d875 commit ba27e3a
Show file tree
Hide file tree
Showing 2 changed files with 113 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,10 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
Expand All @@ -47,10 +49,12 @@ public class StorageUtilizationMetricsReporter implements MetricsReporter {
private static final Logger LOGGER
= LoggerFactory.getLogger(StorageUtilizationMetricsReporter.class);
private static final String METRIC_GROUP = "ksqldb_utilization";
private static final String TASK_STORAGE_USED_BYTES = "task_storage_used_bytes";

private Map<String, Map<String, TaskStorageMetric>> metricsSeen;
private Metrics metricRegistry;
private static Map<String, String> customTags = new HashMap<>();
private static AtomicInteger numberStatefulTasks = new AtomicInteger(0);

public StorageUtilizationMetricsReporter() {
}
Expand Down Expand Up @@ -82,7 +86,10 @@ public static void configureShared(
metricRegistry.metricName("node_storage_used_bytes", METRIC_GROUP, customTags);
final MetricName nodePct =
metricRegistry.metricName("storage_utilization", METRIC_GROUP, customTags);

final MetricName maxTaskPerNode =
metricRegistry.metricName("max_task_storage_used_bytes", METRIC_GROUP, customTags);
final MetricName numStatefulTasks =
metricRegistry.metricName("num_stateful_tasks", METRIC_GROUP, customTags);
metricRegistry.addMetric(
nodeAvailable,
(Gauge<Long>) (config, now) -> baseDir.getFreeSpace()
Expand All @@ -101,6 +108,14 @@ public static void configureShared(
(((double) baseDir.getTotalSpace() - (double) baseDir.getFreeSpace())
/ (double) baseDir.getTotalSpace())
);
metricRegistry.addMetric(
maxTaskPerNode,
(Gauge<BigInteger>) (config, now) -> (getMaxTaskUsage(metricRegistry))
);
metricRegistry.addMetric(
numStatefulTasks,
(Gauge<Integer>) (config, now) -> (numberStatefulTasks.get())
);
}

@Override
Expand Down Expand Up @@ -176,10 +191,11 @@ private synchronized void handleNewSstFilesSizeMetric(
final TaskStorageMetric newMetric;
// We haven't seen a metric for this query's task before
if (!metricsSeen.get(queryId).containsKey(taskId)) {
numberStatefulTasks.getAndIncrement();
// create a new task level metric to track state store storage usage
newMetric = new TaskStorageMetric(
metricRegistry.metricName(
"task_storage_used_bytes",
TASK_STORAGE_USED_BYTES,
METRIC_GROUP,
taskMetricTags
));
Expand All @@ -205,6 +221,7 @@ private synchronized void handleRemovedSstFileSizeMetric(
) {
// remove storage metric for this task
taskMetric.remove(metric);
numberStatefulTasks.getAndDecrement();
if (taskMetric.metrics.size() == 0) {
// no more storage metrics for this task, can remove task gauge
metricRegistry.removeMetric(taskMetric.metricName);
Expand All @@ -227,6 +244,21 @@ private BigInteger computeQueryMetric(final String queryId) {
}
return queryMetricSum;
}

public static synchronized BigInteger getMaxTaskUsage(final Metrics metricRegistry) {
final Collection<KafkaMetric> taskMetrics = metricRegistry
.metrics()
.entrySet()
.stream()
.filter(e -> e.getKey().name().contains(TASK_STORAGE_USED_BYTES))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))
.values();
final Optional<BigInteger> maxOfTaskMetrics = taskMetrics
.stream()
.map(e -> (BigInteger) e.metricValue())
.reduce(BigInteger::max);
return maxOfTaskMetrics.orElse(BigInteger.ZERO);
}

private synchronized Collection<Supplier<BigInteger>> getGaugesForQuery(final String queryId) {
return metricsSeen.get(queryId).values().stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.Mockito.mock;
Expand All @@ -16,12 +19,16 @@
import java.io.File;
import java.io.IOException;
import java.math.BigInteger;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.metrics.Gauge;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.MetricValueProvider;
import org.apache.kafka.common.metrics.Metrics;
import org.codehaus.janino.Java;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
Expand All @@ -35,6 +42,7 @@ public class StorageUtilizationMetricsReporterTest {

private static final String KAFKA_METRIC_NAME = "total-sst-files-size";
private static final String KAFKA_METRIC_GROUP = "streams-metric";
private static final String KSQL_METRIC_GROUP = "ksqldb_utilization";
private static final String THREAD_ID = "_confluent_blahblah_query_CTAS_TEST_1-blahblah";
private static final String TRANSIENT_THREAD_ID = "_confluent_blahblah_transient_blahblah_4-blahblah";
private static final String TASK_STORAGE_METRIC = "task_storage_used_bytes";
Expand Down Expand Up @@ -85,12 +93,18 @@ public void shouldAddNodeMetricsOnConfigure() throws IOException {
final Object storageUsedValue = storageUsedGauge.value(null, 0);
final Gauge<?> pctUsedGauge = verifyAndGetRegisteredMetric("storage_utilization", BASE_TAGS);
final Object pctUsedValue = pctUsedGauge.value(null, 0);

final Gauge<?> maxTaskUsageGauge = verifyAndGetRegisteredMetric("max_task_storage_used_bytes", BASE_TAGS);
final Object maxTaskUsageValue = maxTaskUsageGauge.value(null, 0);
final Gauge<?> numStatefulTasksGauge = verifyAndGetRegisteredMetric("num_stateful_tasks", BASE_TAGS);
final Object numStatefulTasksValue = numStatefulTasksGauge.value(null, 0);

// Then:
assertThat((Long) storageFreeValue, greaterThan(0L));
assertThat((Long) storageTotalValue, greaterThan(0L));
assertThat((Long) storageUsedValue, greaterThan(0L));
assertThat((Double) pctUsedValue, greaterThan(0.0));
assertThat((BigInteger) maxTaskUsageValue, greaterThanOrEqualTo(BigInteger.ZERO));
assertEquals(numStatefulTasksValue, 7);
}

@Test
Expand Down Expand Up @@ -253,6 +267,70 @@ public void shouldIgnoreNonSSTMetrics() {
// Then:
assertThrows(AssertionError.class, () -> verifyAndGetRegisteredMetric(TASK_STORAGE_METRIC, TASK_ONE_TAGS));
}

@Test
public void shouldRecordMaxTaskUsage() {
// Given:
KafkaMetric m1 = mockMetric(
KSQL_METRIC_GROUP,
TASK_STORAGE_METRIC,
BigInteger.valueOf(2),
ImmutableMap.of("task-id", "t1"));
KafkaMetric m2 = mockMetric(
KSQL_METRIC_GROUP,
TASK_STORAGE_METRIC,
BigInteger.valueOf(5),
ImmutableMap.of("task-id", "t2"));
MetricName n1 = m1.metricName();
MetricName n2 = m2.metricName();
when(metrics.metrics()).thenReturn(ImmutableMap.of(n1, m1, n2, m2));

// When:
listener.metricChange(m1);
listener.metricChange(m2);

// Then:
BigInteger maxVal = StorageUtilizationMetricsReporter.getMaxTaskUsage(metrics);
assertTrue(maxVal.equals(BigInteger.valueOf(5)));
}

@Test
public void shouldRecordMaxTaskUsageWithNoTasks() {
// Given:
when(metrics.metrics()).thenReturn(Collections.EMPTY_MAP);

// When:

// Then:
BigInteger maxVal = StorageUtilizationMetricsReporter.getMaxTaskUsage(metrics);
assertTrue(maxVal.equals(BigInteger.valueOf(0)));
}

@Test
public void shouldRecordNumStatefulTasks() {
// Given:
final File f = new File("/tmp/storage-test/");
listener.configureShared(f, metrics, BASE_TAGS);
final Gauge<?> numStatefulTasksGauge = verifyAndGetRegisteredMetric("num_stateful_tasks", BASE_TAGS);

// When:
listener.metricChange(mockMetric(
KAFKA_METRIC_GROUP,
KAFKA_METRIC_NAME,
BigInteger.valueOf(2),
ImmutableMap.of("store-id", "s1", "task-id", "t2", "thread-id", TRANSIENT_THREAD_ID))
);
listener.metricChange(mockMetric(
KAFKA_METRIC_GROUP,
KAFKA_METRIC_NAME,
BigInteger.valueOf(5),
ImmutableMap.of("store-id", "s2", "task-id", "t1", "thread-id", TRANSIENT_THREAD_ID))
);

// Then:
final Object numStatefulTasksValue = numStatefulTasksGauge.value(null, 0);
assertEquals((int) numStatefulTasksValue, 2);
}

private KafkaMetric mockMetric(
final String group, final String name, Object value, final Map<String, String> tags) {
Expand Down

0 comments on commit ba27e3a

Please sign in to comment.