-
Notifications
You must be signed in to change notification settings - Fork 3.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
add query metrics for broker parallel merges, off by default #8981
add query metrics for broker parallel merges, off by default #8981
Conversation
I'm surprised that off by default means the user has to write code to enable them vs using a config option of some sort. Is that latter option something that's not possible? |
Unfortunately, at the moment code is the only way to control the query metrics that are emitted. Related discussion: #6559 |
@@ -247,6 +263,7 @@ public void cleanup(Iterator<T> iterFromMake) | |||
private final long targetTimeNanos; | |||
private final boolean hasTimeout; | |||
private final long timeoutAt; | |||
private final MergeCombineMetricsAccumlator metricsAccumlator; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
metricsAccumlator -> metricsAccumulator
{ | ||
long numInputRows = 0; | ||
long cpuTimeNanos = 0; | ||
// 1 partition task, 1 layer 2 prepare merge inputs task, 1 layer 1 prepare merge inputs task for each partition |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: suggest "one layer 2 prepare merge inputs task" or "1 layer two prepare merge inputs task" and similar, I was confused initially with all the numbers
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
heh, will fix
outputQueue.offer(ResultBatch.TERMINAL); | ||
} else { | ||
// if priority queue is empty, push the final accumulated value into the output batch and push it out | ||
outputBatch.add(currentCombinedValue); | ||
metricsAccumulator.incrementOutputRows(batchCounter + 1L); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the 1L here for the terminal value? Does that need to be counted towards the output rows, since it's not really a row?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, it is for the straggling currentCombinedValue
that is being added to the batch the line before, which comes from here if the inputs were exhausted: https://github.com/apache/incubator-druid/blob/master/core/src/main/java/org/apache/druid/java/util/common/guava/ParallelMergeCombiningSequence.java#L537
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
@@ -62,6 +62,9 @@ public OutType get() | |||
catch (Exception e) { | |||
t.addSuppressed(e); | |||
} | |||
if (t instanceof RuntimeException) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what prompted these changes ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, the 'withBaggage', https://github.com/apache/incubator-druid/pull/8981/files#diff-84792f9d3cefe47cbb471669dce2a276R145, caused all of the RuntimeException
being thrown to be wrapped in an additional RuntimeException
which didn't seem very useful. Otherwise, I would have needed to change the expected cause of the expected exception in the tests to expect RuntimeException
instead of TimeoutException
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see, in the surrounding code, Throwables.propagateIfPossible(t);
is used to get that behavior instead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oops, that's a good point, I just threw these in here to fix my tests. Looking closer I will have a pass to clean this up a bit.
LGTM overall non blocking commentary....
actually periodic snapshot of FJP state would be nice to see to identify potential bugs e.g. number of running threads , total number of threads, queued work etc. it sucks that to enable these metrics, I would have to write an extension. I haven't read through the thread in #6559 yet but will hopefully check that out sometime this week. |
I definitely agree here, I've been looking at the recently added Dropwizard emitter for inspiration, to see if it would maybe make sense to have something based on Dropwizard (or something like that) available in core to allow re-use for any sorts of periodic/rate driven metrics we might want to collect. Though rather than the Dropwizard emitter, instead a common core piece would collect these metrics and periodically emit the snapshot of their current values to whatever the actual emitters are. I plan to look into this deeper sometime after this PR.
I agree here as well; I think maybe having some sorts of common profile implementations of |
for periodic collection we already have the |
That's pretty much what I had in mind 👍 |
@clintropolis not sure if you plan to add the periodic metrics in this PR or later, current changes LGTM , so feel free to proceed whichever way you need. |
Thanks, I'm going to merge this and do in a follow-up 👍 |
Description
This PR is a follow-up to #8578, adding a handful of query metrics that I believe are interesting, but taking the conservative approach in that all of them are off by default, meaning a custom extension implementing
QueryMetrics
is necessary to actually emit them.ParallelMergeCombiningSequence
is indruid-core
where asQueryMetrics
and friends are indruid-processing
, so this is done mechanically via aConsumer<ParallelMergeCombiningSequence.MergeCombineMetrics>
that is supplied to to the sequence, whereParallelMergeCombiningSequence.MergeCombineMetrics
is the type that all of the metrics from the fork join tasks are accumulated. This allows the consumer,CachingClusteredClient
in our case, to define how to report the metrics.New
QueryMetrics
metrics methods:reportParallelMergeParallelism
- Reports number of parallel tasks the broker used to process the query during parallel mergereportParallelMergeInputSequences
- Reports total number of input sequences processed by the broker during parallel mergereportParallelMergeInputRows
- Reports total number of input rows processed by the broker during parallel mergereportParallelMergeOutputRows
- Reports broker total number of output rows after merging and combining input sequencesreportParallelMergeTaskCount
- Reports broker total number of fork join pool tasks required to complete queryreportParallelMergeTotalCpuTime
- Reports broker total CPU time in nanoseconds where fork join merge combine tasks were doing workAdditionally, since parallelism will always be a fixed range of values between 1 and the number of cores, it can also be added as a dimension instead through
QueryMetrics.parallelMergeParallelism
.I did not document these metrics because they are off by default, and don't want to give operators false hope before they get in deep and realize they need to make a custom extension or whatever. This omission is in favor of someday refining this process so that maybe we bundle several different 'profiles' to enabling emitting a set of metrics based on the profile, or some other system that allows operators to customize without a custom extension.
Also absent are any sort of aggregate metrics, such as pool utilization over some periodic collection interval or whatever. I would like to look into this as a follow-up, since it seems like there are potentially many metrics that would maybe make sense to collect like this, so I'd like to think a bit harder about how to do this so it's not a one-off solution.
This PR also modifies the parallel merge config to disable using the fork join pool if the computed level of pool parallelism isn't more than 2, since you need at least 3 tasks to do the 2 layer merge in parallel.
This PR has:
Key changed/added classes in this PR
ParallelMergeCombiningSequence
QueryMetrics
DruidProcessingConfig