-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: add max task usage and num stateful tasks metrics #8549
Conversation
@@ -101,6 +108,14 @@ public static void configureShared( | |||
(((double) baseDir.getTotalSpace() - (double) baseDir.getFreeSpace()) | |||
/ (double) baseDir.getTotalSpace()) | |||
); | |||
metricRegistry.addMetric( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
actually getting NPEs from these two, it seems like configureShared
is called before configure
to the metricRegistry
and numberStatefulTasks
aren't initialized before they're called in the gauges
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated the gauges to handle the if the items are null, moving them to configure
means they won't get set at start-up and you'll have to check to see if it's already been registered so we don't get errors about double registering metrics
> SELECT profileId,
> LATEST_BY_OFFSET(latitude) AS la,
> LATEST_BY_OFFSET(longitude) AS lo
> FROM riderlocations
> GROUP BY profileId
> EMIT CHANGES;
Could not write the statement 'CREATE TABLE currentLocation AS
SELECT profileId,
LATEST_BY_OFFSET(latitude) AS la,
LATEST_BY_OFFSET(longitude) AS lo
FROM riderlocations
GROUP BY profileId
EMIT CHANGES;' into the command topic.
Caused by: A metric named 'MetricName [name=max_task_storage_used_bytes,
group=ksqldb_utilization, description=, tags={}]' already exists, can't register
another one.```
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure I understand the above issue: what object the NPE is referring to? The metric names like numberStatefulTasks
should be inside the registry, and here we are just adding a new metric under that name, and numberStatefulTasks
are already static so it should have been initialized at the time this line is executed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The NPE was when the gauge was being sampled, so when getMaxTaskUsage
was called it tried to access the metrics registry before it had been initialized since it gets initialized in configure
. This actually shouldn't happen now that I pass the metrics registry into getMaxTaskUsage
. The other NPE was for numberStatefulTasks
which was initialized in configure
so when the gauge called numberStatefulTasks.get()
it returned a NPE. This is now initialized to 0 so that NPE is also resolved
@@ -49,8 +51,9 @@ | |||
private static final String METRIC_GROUP = "ksqldb_utilization"; | |||
|
|||
private Map<String, Map<String, TaskStorageMetric>> metricsSeen; | |||
private Metrics metricRegistry; | |||
private static Metrics metricRegistry; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should not be static - just leave it local to each member. We intentionally made it not-static to avoid re-using the instance across test runs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a way to still reference it from a static context?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't change this because I'm not sure how to handle it, configureShared
is called from a static context so it has to be static, meaning getMaxTaskUsage
has to be static which calls metricRegistry.metrics() which intellij is also telling me needs to be static to be called from this context. I can create a helper function to get
metricRegistry.metrics()` but it seems like it would have the same issue
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
configureShared
is passed a MetricsRegistry
instance that you can register the metrics in.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah makes sense, just updated
@@ -65,6 +68,7 @@ public synchronized void configure(final Map<String, ?> map) { | |||
map.get(KsqlConfig.KSQL_INTERNAL_METRICS_CONFIG) | |||
); | |||
this.metricsSeen = new HashMap<>(); | |||
this.numberStatefulTasks = new AtomicInteger(0); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a static, so we shouldn't re-initialize for each instance. We should just initialize this statically (e.g. private static AtomicInteger numberStatefulTasks = new AtomicInteger(0);
) or from configureShared
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just changed it to be initialized statically, updated the gauge as well
if (metricRegistry == null) { | ||
return BigInteger.ZERO; | ||
} | ||
final Collection<KafkaMetric> taskMetrics = metricRegistry |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is kind of expensive. Once this is shipped can you just see what happens to the cpu of an idle instance? I don't think it will make much difference because it only runs every 30 seconds. But better to check anyway. Alternatively we could keep a static set of the tasks and a max that's updated on every new or removed task.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
idle instance as in an instance with no queries or instance with queries but no data? I can check after shipping, it's definitely ... unideal, but storing the tasks / max had a longer LOE so went with this first.
Would the static max update as the underlying task metrics do? I wanted to make sure we were getting the max of all the most recent gauge values
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
idle instance as in an instance with no queries or instance with queries but no data? I can check after shipping, it's definitely ... unideal, but storing the tasks / max had a longer LOE so went with this first.
Yep. An idle cluster will still have loads of metrics, so seeing if the cpu usage for an idle cluster is higher than normal is a good test.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would the static max update as the underlying task metrics do? I wanted to make sure we were getting the max of all the most recent gauge values
Good point - the space used is not known at the time the metric is added. If the current impl turns out too expensive we can just maintain a static mapping of task to sst file used metrics and then take the max every time the metric is sampled (rather than filtering over every metric like we're doing here).
@@ -253,6 +267,58 @@ public void shouldIgnoreNonSSTMetrics() { | |||
// Then: | |||
assertThrows(AssertionError.class, () -> verifyAndGetRegisteredMetric(TASK_STORAGE_METRIC, TASK_ONE_TAGS)); | |||
} | |||
|
|||
@Test |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you add a case for max task when there are 0 tasks?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yup
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM with one nit inline
.metrics() | ||
.entrySet() | ||
.stream() | ||
.filter(e -> e.getKey().name().contains("task_storage_used_bytes")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
put the metric name in a constant
Description
We're adding a metric (
max_task_storage_used_bytes
) that takes the maximum task disk usage on a node and emits that. This allows users to see the maximum disk space a single task is taking up without having to filter through all the task metrics on their side.We're also adding a metric
num_stateful_tasks
that records the number of stateful tasks on a node. These two metrics combined will allow users to better understand their task work distribution and understand how to tune their system for their needs.Testing done
unit tests, tested on jconsole
Reviewer checklist