Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add max task usage and num stateful tasks metrics #8549

Merged
merged 8 commits into from
Jan 4, 2022
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,10 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
Expand All @@ -49,8 +51,9 @@ public class StorageUtilizationMetricsReporter implements MetricsReporter {
private static final String METRIC_GROUP = "ksqldb_utilization";

private Map<String, Map<String, TaskStorageMetric>> metricsSeen;
private Metrics metricRegistry;
private static Metrics metricRegistry;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should not be static - just leave it local to each member. We intentionally made it not-static to avoid re-using the instance across test runs.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a way to still reference it from a static context?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't change this because I'm not sure how to handle it, configureShared is called from a static context so it has to be static, meaning getMaxTaskUsage has to be static which calls metricRegistry.metrics() which intellij is also telling me needs to be static to be called from this context. I can create a helper function to get metricRegistry.metrics()` but it seems like it would have the same issue

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

configureShared is passed a MetricsRegistry instance that you can register the metrics in.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah makes sense, just updated

private static Map<String, String> customTags = new HashMap<>();
private static AtomicInteger numberStatefulTasks = new AtomicInteger(0);

public StorageUtilizationMetricsReporter() {
}
Expand Down Expand Up @@ -82,7 +85,10 @@ public static void configureShared(
metricRegistry.metricName("node_storage_used_bytes", METRIC_GROUP, customTags);
final MetricName nodePct =
metricRegistry.metricName("storage_utilization", METRIC_GROUP, customTags);

final MetricName maxTaskPerNode =
metricRegistry.metricName("max_task_storage_used_bytes", METRIC_GROUP, customTags);
final MetricName numStatefulTasks =
metricRegistry.metricName("num_stateful_tasks", METRIC_GROUP, customTags);
metricRegistry.addMetric(
nodeAvailable,
(Gauge<Long>) (config, now) -> baseDir.getFreeSpace()
Expand All @@ -101,6 +107,14 @@ public static void configureShared(
(((double) baseDir.getTotalSpace() - (double) baseDir.getFreeSpace())
/ (double) baseDir.getTotalSpace())
);
metricRegistry.addMetric(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually getting NPEs from these two, it seems like configureShared is called before configure to the metricRegistry and numberStatefulTasks aren't initialized before they're called in the gauges

Copy link
Contributor Author

@lct45 lct45 Dec 21, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated the gauges to handle the if the items are null, moving them to configure means they won't get set at start-up and you'll have to check to see if it's already been registered so we don't get errors about double registering metrics

>  SELECT profileId,
>         LATEST_BY_OFFSET(latitude) AS la,
>         LATEST_BY_OFFSET(longitude) AS lo
>  FROM riderlocations
>  GROUP BY profileId
>  EMIT CHANGES;
Could not write the statement 'CREATE TABLE currentLocation AS
  SELECT profileId,
         LATEST_BY_OFFSET(latitude) AS la,
         LATEST_BY_OFFSET(longitude) AS lo
  FROM riderlocations
  GROUP BY profileId
  EMIT CHANGES;' into the command topic.
Caused by: A metric named 'MetricName [name=max_task_storage_used_bytes,
	group=ksqldb_utilization, description=, tags={}]' already exists, can't register
	another one.```

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure I understand the above issue: what object the NPE is referring to? The metric names like numberStatefulTasks should be inside the registry, and here we are just adding a new metric under that name, and numberStatefulTasks are already static so it should have been initialized at the time this line is executed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The NPE was when the gauge was being sampled, so when getMaxTaskUsage was called it tried to access the metrics registry before it had been initialized since it gets initialized in configure. This actually shouldn't happen now that I pass the metrics registry into getMaxTaskUsage. The other NPE was for numberStatefulTasks which was initialized in configure so when the gauge called numberStatefulTasks.get() it returned a NPE. This is now initialized to 0 so that NPE is also resolved

maxTaskPerNode,
(Gauge<BigInteger>) (config, now) -> (getMaxTaskUsage())
);
metricRegistry.addMetric(
numStatefulTasks,
(Gauge<Integer>) (config, now) -> (numberStatefulTasks.get())
);
}

@Override
Expand Down Expand Up @@ -176,6 +190,7 @@ private synchronized void handleNewSstFilesSizeMetric(
final TaskStorageMetric newMetric;
// We haven't seen a metric for this query's task before
if (!metricsSeen.get(queryId).containsKey(taskId)) {
numberStatefulTasks.getAndIncrement();
// create a new task level metric to track state store storage usage
newMetric = new TaskStorageMetric(
metricRegistry.metricName(
Expand Down Expand Up @@ -205,6 +220,7 @@ private synchronized void handleRemovedSstFileSizeMetric(
) {
// remove storage metric for this task
taskMetric.remove(metric);
numberStatefulTasks.getAndDecrement();
if (taskMetric.metrics.size() == 0) {
// no more storage metrics for this task, can remove task gauge
metricRegistry.removeMetric(taskMetric.metricName);
Expand All @@ -227,6 +243,24 @@ private BigInteger computeQueryMetric(final String queryId) {
}
return queryMetricSum;
}

public static synchronized BigInteger getMaxTaskUsage() {
if (metricRegistry == null) {
return BigInteger.ZERO;
}
final Collection<KafkaMetric> taskMetrics = metricRegistry
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is kind of expensive. Once this is shipped can you just see what happens to the cpu of an idle instance? I don't think it will make much difference because it only runs every 30 seconds. But better to check anyway. Alternatively we could keep a static set of the tasks and a max that's updated on every new or removed task.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

idle instance as in an instance with no queries or instance with queries but no data? I can check after shipping, it's definitely ... unideal, but storing the tasks / max had a longer LOE so went with this first.

Would the static max update as the underlying task metrics do? I wanted to make sure we were getting the max of all the most recent gauge values

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

idle instance as in an instance with no queries or instance with queries but no data? I can check after shipping, it's definitely ... unideal, but storing the tasks / max had a longer LOE so went with this first.

Yep. An idle cluster will still have loads of metrics, so seeing if the cpu usage for an idle cluster is higher than normal is a good test.

Copy link
Contributor

@rodesai rodesai Dec 22, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would the static max update as the underlying task metrics do? I wanted to make sure we were getting the max of all the most recent gauge values

Good point - the space used is not known at the time the metric is added. If the current impl turns out too expensive we can just maintain a static mapping of task to sst file used metrics and then take the max every time the metric is sampled (rather than filtering over every metric like we're doing here).

.metrics()
.entrySet()
.stream()
.filter(e -> e.getKey().name().contains("task_storage_used_bytes"))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

put the metric name in a constant

.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))
.values();
final Optional<BigInteger> maxOfTaskMetrics = taskMetrics
.stream()
.map(e -> (BigInteger) e.metricValue())
.reduce(BigInteger::max);
return maxOfTaskMetrics.orElse(BigInteger.ZERO);
}

private synchronized Collection<Supplier<BigInteger>> getGaugesForQuery(final String queryId) {
return metricsSeen.get(queryId).values().stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.Mockito.mock;
Expand All @@ -16,12 +19,16 @@
import java.io.File;
import java.io.IOException;
import java.math.BigInteger;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.metrics.Gauge;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.MetricValueProvider;
import org.apache.kafka.common.metrics.Metrics;
import org.codehaus.janino.Java;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
Expand All @@ -35,6 +42,7 @@ public class StorageUtilizationMetricsReporterTest {

private static final String KAFKA_METRIC_NAME = "total-sst-files-size";
private static final String KAFKA_METRIC_GROUP = "streams-metric";
private static final String KSQL_METRIC_GROUP = "ksqldb_utilization";
private static final String THREAD_ID = "_confluent_blahblah_query_CTAS_TEST_1-blahblah";
private static final String TRANSIENT_THREAD_ID = "_confluent_blahblah_transient_blahblah_4-blahblah";
private static final String TASK_STORAGE_METRIC = "task_storage_used_bytes";
Expand Down Expand Up @@ -85,12 +93,18 @@ public void shouldAddNodeMetricsOnConfigure() throws IOException {
final Object storageUsedValue = storageUsedGauge.value(null, 0);
final Gauge<?> pctUsedGauge = verifyAndGetRegisteredMetric("storage_utilization", BASE_TAGS);
final Object pctUsedValue = pctUsedGauge.value(null, 0);

final Gauge<?> maxTaskUsageGauge = verifyAndGetRegisteredMetric("max_task_storage_used_bytes", BASE_TAGS);
final Object maxTaskUsageValue = maxTaskUsageGauge.value(null, 0);
final Gauge<?> numStatefulTasksGauge = verifyAndGetRegisteredMetric("num_stateful_tasks", BASE_TAGS);
final Object numStatefulTasksValue = numStatefulTasksGauge.value(null, 0);

// Then:
assertThat((Long) storageFreeValue, greaterThan(0L));
assertThat((Long) storageTotalValue, greaterThan(0L));
assertThat((Long) storageUsedValue, greaterThan(0L));
assertThat((Double) pctUsedValue, greaterThan(0.0));
assertThat((BigInteger) maxTaskUsageValue, greaterThanOrEqualTo(BigInteger.ZERO));
assertEquals((int) numStatefulTasksValue, 0);
}

@Test
Expand Down Expand Up @@ -253,6 +267,70 @@ public void shouldIgnoreNonSSTMetrics() {
// Then:
assertThrows(AssertionError.class, () -> verifyAndGetRegisteredMetric(TASK_STORAGE_METRIC, TASK_ONE_TAGS));
}

@Test
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add a case for max task when there are 0 tasks?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yup

public void shouldRecordMaxTaskUsage() {
// Given:
KafkaMetric m1 = mockMetric(
KSQL_METRIC_GROUP,
TASK_STORAGE_METRIC,
BigInteger.valueOf(2),
ImmutableMap.of("task-id", "t1"));
KafkaMetric m2 = mockMetric(
KSQL_METRIC_GROUP,
TASK_STORAGE_METRIC,
BigInteger.valueOf(5),
ImmutableMap.of("task-id", "t2"));
MetricName n1 = m1.metricName();
MetricName n2 = m2.metricName();
when(metrics.metrics()).thenReturn(ImmutableMap.of(n1, m1, n2, m2));

// When:
listener.metricChange(m1);
listener.metricChange(m2);

// Then:
BigInteger maxVal = StorageUtilizationMetricsReporter.getMaxTaskUsage();
assertTrue(maxVal.equals(BigInteger.valueOf(5)));
}

@Test
public void shouldRecordMaxTaskUsageWithNoTasks() {
// Given:
when(metrics.metrics()).thenReturn(Collections.EMPTY_MAP);

// When:

// Then:
BigInteger maxVal = StorageUtilizationMetricsReporter.getMaxTaskUsage();
assertTrue(maxVal.equals(BigInteger.valueOf(0)));
}

@Test
public void shouldRecordNumStatefulTasks() {
// Given:
final File f = new File("/tmp/storage-test/");
listener.configureShared(f, metrics, BASE_TAGS);
final Gauge<?> numStatefulTasksGauge = verifyAndGetRegisteredMetric("num_stateful_tasks", BASE_TAGS);

// When:
listener.metricChange(mockMetric(
KAFKA_METRIC_GROUP,
KAFKA_METRIC_NAME,
BigInteger.valueOf(2),
ImmutableMap.of("store-id", "s1", "task-id", "t2", "thread-id", TRANSIENT_THREAD_ID))
);
listener.metricChange(mockMetric(
KAFKA_METRIC_GROUP,
KAFKA_METRIC_NAME,
BigInteger.valueOf(5),
ImmutableMap.of("store-id", "s2", "task-id", "t1", "thread-id", TRANSIENT_THREAD_ID))
);

// Then:
final Object numStatefulTasksValue = numStatefulTasksGauge.value(null, 0);
assertEquals((int) numStatefulTasksValue, 2);
}

private KafkaMetric mockMetric(
final String group, final String name, Object value, final Map<String, String> tags) {
Expand Down