-
Notifications
You must be signed in to change notification settings - Fork 3.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Moving averages for ingestion row stats #5748
Conversation
5f85da5
to
2e02ce1
Compare
If these are becoming more common, would it make sense to have a Dropwizard compatible metrics registry interface so we don't have to code our own on-heap metrics collectors? |
@drcrallen I've not used Dropwizard before, do you have an example implementation in mind? |
http://metrics.dropwizard.io/4.0.0/manual/core.html Has a bunch of examples. Such a metrics interface is common among a few other projects. https://beam.apache.org/documentation/sdks/javadoc/2.4.0/org/apache/beam/sdk/metrics/Metrics.html https://spark.apache.org/docs/latest/monitoring.html#metrics |
Thanks, Dropwizard looks useful, I'll check it out |
@drcrallen i believe drop-wizard still uses onheap space, but i agree maybe it is a good idea to use a common library to do something like moving average or histograms |
@b-slim agreed, that's my main concern. That we would need to maintain a metrics collection library (histograms / moving averages / rate calculators) when there are already good and widely used ones out there. |
3d7039b
to
93a0f3f
Compare
@drcrallen Changed this to use Dropwizard metrics Meter objects to collect row stats (instead of FireDepartmentMetrics) Dropwizard has a dependency on slf4j 1.7.25 (https://github.com/dropwizard/metrics/blob/4.1-development/pom.xml), Druid uses 1.7.12, but there shouldn't be any compatibility issues between those versions (https://www.slf4j.org/compatibility.html). |
93a0f3f
to
a5e3913
Compare
a5e3913
to
78dd24e
Compare
@@ -37,6 +38,11 @@ | |||
|
|||
SupervisorReport getStatus(); | |||
|
|||
default Map<String, Object> getStats() | |||
{ | |||
return null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why not an empty map?
@@ -919,12 +920,14 @@ private TaskStatus generateAndPublishSegments( | |||
) { | |||
driver.startJob(); | |||
|
|||
buildSegmentsMeters = new RowIngestionMeters(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
final?
@@ -618,6 +617,7 @@ public void run() | |||
|
|||
Set<Integer> assignment = assignPartitionsAndSeekToNext(consumer, topic); | |||
|
|||
rowIngestionMeters = new RowIngestionMeters(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
final ?
@@ -965,6 +964,7 @@ private TaskStatus runInternalLegacy(final TaskToolbox toolbox) throws Exception | |||
) | |||
); | |||
|
|||
rowIngestionMeters = new RowIngestionMeters(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh i see you are reusing it here ...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we just reset the counters ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
those are different code paths (one is a "legacy" mode), so only one of them would get called in a task
I don't make this final and set this here because each Meter object has a "startTime" field that's set when the object is created, so I want the startTime to be as close to the actual ingestion work
@@ -254,6 +254,46 @@ public DateTime getStartTime(final String id) | |||
} | |||
} | |||
|
|||
public Map<String, Object> getMovingAverages(final String id) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you add nullable annotation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I made this return an empty map instead for those cases
return null; | ||
} | ||
catch (IOException e) { | ||
throw Throwables.propagate(e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we start moving away form propagates it is going to be removed form guava.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed this and other Throwables.propagate
calls in this file
@@ -216,7 +215,7 @@ public boolean resume(final String id) | |||
} | |||
catch (IOException | InterruptedException e) { | |||
log.error("Exception [%s] while pausing Task [%s]", e.getMessage(), id); | |||
throw Throwables.propagate(e); | |||
throw new RuntimeException(e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it is odd to log and throw. Such a thing usually leads to double logging. Would it make more sense just to put the log message in the RuntimeException
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed the log and added the message to the RuntimeException
@@ -622,7 +663,7 @@ private FullResponseHolder submitRequest( | |||
Thread.sleep(sleepTime); | |||
} | |||
catch (InterruptedException e2) { | |||
Throwables.propagate(e2); | |||
throw new RuntimeException(e2); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This needs to set Thread.currentThread().interrupt()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added the interrupt call
|
||
import java.util.Map; | ||
|
||
public class RowIngestionMeters |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
without increasing the scope too much, can this be a bind-able interface
with a DropwizardRowIngestionMeter
as the default bound thing? this would allow extensions to specify their own if desired.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Essentially, this class would be renamed to DropwizardRowIngestionMeter
and RowIngestionMeters
would be an interface with the methods you need
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, I'll look into making this extensible
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The RowIngestionMeters are now created by an injected RowIngestionMetersFactory that can be replaced by extensions
return totals; | ||
} | ||
|
||
public Map<String, Object> getMovingAverages() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
these don't have strong guarantees. is there any way the method invocations can have something that has guarantees around it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What guarantees do you think this method should provide?
@@ -143,7 +143,7 @@ private static String makeTaskId(RealtimeAppenderatorIngestionSpec spec) | |||
private volatile FireDepartmentMetrics metrics = null; | |||
|
|||
@JsonIgnore | |||
private TaskMetricsGetter metricsGetter; | |||
private RowIngestionMeters rowIngestionMeters; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if this is injectable this could simply be @JacksonInject
Alternatively, it might make more sense to have a Factory
object as the thing that is injected, and create the meters objects as needed in the classes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I went with injecting a factory for the meters
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cool
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can this be final
and created in the constructor?
Throwables.propagateIfInstanceOf(e.getCause(), IOException.class); | ||
Throwables.propagateIfInstanceOf(e.getCause(), ChannelException.class); | ||
throw Throwables.propagate(e); | ||
if (e.getCause() instanceof IOException || e.getCause() instanceof ChannelException) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can this be more explicit? things that catch Exception
are dangerous.
For example, catching catch(IOException ioe){throw ioe;}
first should handle the IOE cases. Then if it is the httpClient.go
call that could be the issue, things that would need the IOException
unwrapped can be handled explicitly. Then catch(Exception re){throw new RuntimeException(e, "Unhandled exception");}
is probably good enough to handle all the other items that might crop up. Also make sure nothing here can throw InterruptedException
or add a special handler for it please.
Things that catch(Exception e)
can cause issues :-(
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added an explicit handler for IOException and ChannelException, added a handler for InterruptedException
@@ -622,7 +665,8 @@ private FullResponseHolder submitRequest( | |||
Thread.sleep(sleepTime); | |||
} | |||
catch (InterruptedException e2) { | |||
Throwables.propagate(e2); | |||
Thread.currentThread().interrupt(); | |||
throw new RuntimeException(e2); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggest e.addSuppressed(e2)
and throwing the original exception
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed to throw original exception with addSuppressed
@@ -633,7 +677,7 @@ private FullResponseHolder submitRequest( | |||
} | |||
catch (Exception e) { | |||
log.warn(e, "Exception while sending request"); | |||
throw e; | |||
throw new RuntimeException(e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
doh... another Exception
catch
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed the RuntimeException wrapper here
try { | ||
return getCurrentTotalStats(); | ||
} | ||
catch (Exception e) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does this need to catch all exceptions?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
InterruptedException
is not handled properly here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added handler for InterruptedException and another handler for ExecutionException and TimeoutException, removed general Exception catch
for (String taskId : group.taskIds()) { | ||
futures.add( | ||
Futures.transform( | ||
taskClient.getMovingAveragesAsync(taskId), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this really so compute intensive that parallel even helps?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
or is this based on network IO?
Either way please add a javadoc on the method
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is network IO, the supervisor is retrieving stats from each of the tasks it manages
added javadoc
allStats.put(String.valueOf(groupId), groupStats); | ||
} | ||
|
||
final Map<String, Object> eGroupStats = groupStats; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is concurrently modified from who knows how many threads. Can the Futures.allAsList
result be used instead? something like a Stream
of Pair<String, Map<String, Object>>
with a Collectors.toMap
should work
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
changed this to return a result object from each future, map is built only after all results are retrieved
fifteenMinute.put(UNPARSEABLE, unparseable.getFifteenMinuteRate()); | ||
fifteenMinute.put(THROWN_AWAY, thrownAway.getFifteenMinuteRate()); | ||
|
||
movingAverages.put("1m", oneMinute); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can these be static final strings somewhere?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Made these static final strings
); | ||
} | ||
} | ||
|
||
returnMap.put("totals", totalsMap); | ||
returnMap.put("averages", averagesMap); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
movingAverages
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
renamed these to movingAverages
@@ -496,16 +510,16 @@ public TaskStatus run(final TaskToolbox toolbox) throws Exception | |||
private Map<String, Object> getTaskCompletionRowStats() | |||
{ | |||
Map<String, Object> metrics = Maps.newHashMap(); | |||
if (determinePartitionsMetricsGetter != null) { | |||
if (determinePartitionsMeters != null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggest moving both of these to final
objects instead of having them nullable
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
moved to constructor
c279755
to
3d26d5a
Compare
3d26d5a
to
77b183b
Compare
LGTM |
👍 |
@drcrallen Did you have any more comments on this? |
a61f381
to
5e3cd6d
Compare
5e3cd6d
to
ad71384
Compare
@b-slim @drcrallen The patch had a bug where metrics emission was broken because the RealtimeMetricsMonitor was not hooked up to the new ingestion meters, I added a commit that fixes that. Can you take another look at this PR? |
@@ -511,8 +513,8 @@ private TaskStatus runInternal(final TaskToolbox toolbox) throws Exception | |||
null | |||
); | |||
fireDepartmentMetrics = fireDepartmentForMetrics.getMetrics(); | |||
metricsGetter = new FireDepartmentMetricsTaskMetricsGetter(fireDepartmentMetrics); | |||
toolbox.getMonitorScheduler().addMonitor(TaskRealtimeMetricsMonitorBuilder.build(this, fireDepartmentForMetrics)); | |||
toolbox.getMonitorScheduler() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FireDepartmentMetricsTaskMetricsGetter
is no longer used in any place. Please remove it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed the class
true | ||
); | ||
return response.getContent() == null || response.getContent().isEmpty() | ||
? ImmutableMap.of() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Collections.emptyMap()
can be better to avoid unnecessary map creation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed this to Collections.emptyMap()
: jsonMapper.readValue(response.getContent(), JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT); | ||
} | ||
catch (NoTaskLocationException e) { | ||
return ImmutableMap.of(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed this to Collections.emptyMap()
} | ||
} | ||
|
||
public ListenableFuture<Map<String, Object>> getMovingAveragesAsync(final String id) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's better to convert all ListenableFuture to CompletableFuture in another PR if needed.
} | ||
catch (InterruptedException ie) { | ||
Thread.currentThread().interrupt(); | ||
log.error("getStats() interrupted."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add ie
to the log.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add ie to the log
} | ||
|
||
List<StatsFromTaskResult> results = Futures.successfulAsList(futures).get(futureTimeoutInSeconds, TimeUnit.SECONDS); | ||
for (StatsFromTaskResult result : results) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The list of results is in the same order as the input list, and if any of the provided futures fails or is canceled, its corresponding position will contain null (which is indistinguishable from the future having a successful value of null).
result
can be null if we should use Futures.successfulAsList()
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added a null check for result, an error is logged if result is null
|
||
List<StatsFromTaskResult> results = Futures.successfulAsList(futures).get(futureTimeoutInSeconds, TimeUnit.SECONDS); | ||
for (StatsFromTaskResult result : results) { | ||
Map<String, Object> groupMap = (Map<String, Object>) allStats.putIfAbsent(result.getGroupId(), Maps.newHashMap()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Map<String, Object> groupMap = (Map<String, Object>) allStats.putIfAbsent(result.getGroupId(), Maps.newHashMap());
if (groupMap == null) {
groupMap = (Map<String, Object>) allStats.get(result.getGroupId());
}
Does this mean Map<String, Object> groupMap = (Map<String, Object>) allStats.computeIfAbsent(result.getGroupId(), new HashMap<>());
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed this to use computeIfAbsent
@@ -181,7 +180,7 @@ public void testInternalServerError() | |||
verifyAll(); | |||
} | |||
|
|||
@Test(expected = IAE.class) | |||
@Test(expected = RuntimeException.class) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think RuntimeException
is too broad, and this check might be false positive. Would you improve this test by adding ExpectedException
and checking the cause and the error message of the RuntimeException
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed this and other tests here to use ExpectedException and also check the exception messages
* FireDepartmentMetrics are not currently supported, so we continue to use FireDepartmentMetrics alongside | ||
* RowIngestionMeters to avoid unnecessary overhead from maintaining these moving averages. | ||
*/ | ||
public interface RowIngestionMeters |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would you explain why this is an interface? Is it for smoothly migrating to some other libraries instead of Dropwizard in the future?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's an interface so that extensions can provide custom implementations, see #5748 (comment)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does it make sense to make this as an ExtensionPoint
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added @ExtensionPoint
import java.util.Map; | ||
|
||
/** | ||
* Replaces the old RealtimeMetricsMonitor for indexing tasks that use a single FireDepartment, with changes to |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would you add a comment to RealtimeMetricsMonitor
as well that RealtimeMetricsMonitor
is an old one and TaskRealtimeMetricsMonitor
is recommended to use instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added a comment to RealtimeMetricsMonitor
@jihoonson thanks for the review! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. @jon-wei thanks!
* Moving averages for ingestion row stats * PR comments * Make RowIngestionMeters extensible * test and checkstyle fixes * More PR comments * Fix metrics * Add some comments * PR comments * Comments
This PR adds optional moving average reports to the "rowStats" worker chat handler endpoint, for IndexTask, KafkaIndexTask, and AppenderatorRealtimeIndexTask.
Moving average collection can be configured via the "movingAverageWindowSizeMillis" and "movingAverageNumWindows" properties in the tuning configs for these tasks. By default these are 1000 and 60 (record the current stats every 1s, and store 60 of these records).
For Kafka indexing, the supervisor also provides an endpoint at
/druid/indexer/v1/supervisor/{id}/stats
that will collect stats from each of its indexing tasks and present them as a unified report, as a convenience.