-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Introduce TaskContextReport for reporting task context #16041
Conversation
default String getLabel() | ||
{ | ||
return getType(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since a label is a subjective thing, I can imagine Druid operators wanting to implement their own custom labeling strategy.
Have you considered adding an interface that is extensible to allow Druid operators to define their own labeling? I think this will give operators the flexibility to define their own labels by implementing an extension. Druid can have a default labeling strategy that does something like what is being proposed in this change.
Can you please update the PR with a description of how the interface is expected to work, and any design decisions that were made in choosing the interface as well as docs that describe where the labels are emitted and how they are set.
Some things in particular I'm interested in knowing more about
- This label appears to be a system defined label. Is this change meant to support system and user defined labels eventually?
- Why is there just one label? Many times there is a reason to have multiple labels for something - why not have a list of labels?
- How are Druid operators meant to customize this?
- Where is the label expected to be emitted? Are there any future changes needed so that the label is in all the places it needs to be
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 on this but instead of a label, can we just make it a generic Map<String, Object>
that can be enriched however one feels?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@arunramani , if we want to have a generic Map<String, Object>
, wouldn't the existing task context suffice for that?
As @suneet-s suggests, an extensible labeling mechanism is worth a thought. For simpler use cases though, we could simply try to add a new task context parameter as it is already a flexible structure that allows Druid operators complete control over their tasks. It has the added benefit of not having to educate operators on new mechanisms.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@kfaraz copying the task context over to the report would be an immediate win and might actually take care of the issue. I was thinking more about any context that needs to be injected after a spec has been planned, which will require some kind of extension point.
indexing-service/src/main/java/org/apache/druid/indexing/common/task/Tasks.java
Outdated
Show resolved
Hide resolved
indexing-service/src/main/java/org/apache/druid/guice/IndexingServiceModuleHelper.java
Outdated
Show resolved
Hide resolved
indexing-service/src/main/java/org/apache/druid/guice/IndexingServiceModuleHelper.java
Outdated
Show resolved
Hide resolved
@@ -240,13 +241,14 @@ public TaskStatus runTask(final TaskToolbox toolbox) throws Exception | |||
injector.getInstance(Key.get(ServiceClientFactory.class, EscalatedGlobal.class)); | |||
final OverlordClient overlordClient = injector.getInstance(OverlordClient.class) | |||
.withRetryPolicy(StandardRetryPolicy.unlimited()); | |||
final TaskIdentitiesProvider taskIdentitiesProvider = injector.getInstance(TaskIdentitiesProvider.class); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should use the provider to get ALL instances of TaskIdentitiesProvider
and then provide that as a list to the controller.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There should be only one type of provider configured and in use, otherwise the usage will become too complicated.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It could be implemented in a way that isn't so complicated. This is a tagging provider and it makes sense to have multiple providers depending on what the implemented finds useful. We do this with loggers and emitters and such. Does this need to be any different?
* performance and characteristics clearer in reports and summaries. | ||
*/ | ||
@ExtensionPoint | ||
public interface TaskIdentitiesProvider |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think TaskInformationProvider
a simpler and cleaner name. Did we consider that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe Information
might be too broad and doesn't capture the specific intent of this ExtensionPoint, which is focused on delivering task identity details for monitoring and management purposes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The javadoc here talks about adding tags to tasks. Based on that statement alone - I think this should be called TaskTagsProvider
. This can then build on top of an existing concept of user provided tags for tasks that was added in #13760
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Design this as an ExtensionPoint, it will be used for areas other than tags, so don't want to limit what it can do in the name of the class.
@Override | ||
public Map<String, Object> getTaskMetricTags(Task task) | ||
{ | ||
String taskIdentifier = task.getType(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we consider adding a groupType
to the task object? Parsing the group IDs name is pretty fragile.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a valuable suggestion, however, implementing it could require modifications across numerous task types. I'd like to keep the scope of this PR focused, and consider that as potential follow-up work.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for this feature @YongGang. I like the concept and can see some potential uses. I'm -1 on this current implementation. The biggest challenges changes I'd like to see are:
- Updates to the interface so that it is focused on system generated tags instead of all tags
- It looks like in this current form, the tags are only applied on MSQ tasks. I think this should be hooked into all tasks given what the interface talks about.
- Integration tests are missing so it is possible this functionality could break
- Docs are missing indicating where the system tags could show up.
* performance and characteristics clearer in reports and summaries. | ||
*/ | ||
@ExtensionPoint | ||
public interface TaskIdentitiesProvider |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The javadoc here talks about adding tags to tasks. Based on that statement alone - I think this should be called TaskTagsProvider
. This can then build on top of an existing concept of user provided tags for tasks that was added in #13760
import java.util.Map; | ||
|
||
/** | ||
* The TaskIdentitiesProvider interface helps add metric tags to tasks. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
* The TaskIdentitiesProvider interface helps add metric tags to tasks. | |
* The TaskIdentitiesProvider interface provides tags for a task that are reported in metrics and task reports. |
...xing-service/src/main/java/org/apache/druid/indexing/common/task/TaskIdentitiesProvider.java
Outdated
Show resolved
Hide resolved
...xing-service/src/main/java/org/apache/druid/indexing/common/task/TaskIdentitiesProvider.java
Outdated
Show resolved
Hide resolved
...xing-service/src/main/java/org/apache/druid/indexing/common/task/TaskIdentitiesProvider.java
Outdated
Show resolved
Hide resolved
|
||
Map<String, Object> tags = task.getContextValue(DruidMetrics.TAGS); | ||
Map<String, Object> metricTags = tags == null ? new HashMap<>() : new HashMap<>(tags); | ||
metricTags.put(TASK_IDENTIFIER, taskIdentifier); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we should add this for tag for tasks that do not need it
} | ||
} | ||
|
||
Map<String, Object> tags = task.getContextValue(DruidMetrics.TAGS); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See comment in interface about making this just about system generated tags.
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
Outdated
Show resolved
Hide resolved
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
Outdated
Show resolved
Hide resolved
indexing-service/src/main/java/org/apache/druid/guice/IndexingServiceModuleHelper.java
Outdated
Show resolved
Hide resolved
aa1cbf1
to
fb77fc7
Compare
Make some design changes:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@YongGang , I took another look at the PR, please find my notes below.
Feedback on the current implementation
As it currently stands, I concur with @suneet-s on the following points:
TaskIdentitiesProvider
is not an appropriate name as the information has nothing to do with the identity of the task. It is only meant to add some extra optional information to theTask
(most likely in the task context). Something likeTaskContextEnricher
or something or evenTaskTagsProvider
would be more appropriate.taskIdentifier
is not an appropriate name if it is going to take values such ascompact
ormsq_select
.ingestionType
or something else would be more apt.
Also,
- Why do tags need to be a part of the task reports? User specified tags would already be present in the task payload (in the context).
- If there is a clear use case of system-generated tags, they should probably go under a separate header and not inside
ingestionStatsAndErrors
.
Suggested implementation to meet the current requirements
I am not entirely convinced on why we need the new interface. It seems a bit of an overkill, at least for the current requirements.
The only usage of the new interface right now is to populate taskIdentifier
as a dimension while emitting the heartbeat metric.
Why not do just that? Essentially the following logic would live in CliPeon
(with more appropriate dimension name and values):
if (!Strings.isNullOrEmpty(groupId)) {
if (groupId.startsWith("coordinator-issued_compact")) {
taskIdentifier = "compact";
} else if (groupId.startsWith("coordinator-issued_kill")) {
taskIdentifier = "kill";
}
}
It is already hardcoded in the proposed implementation, so we might as well just hard-code it in CliPeon
for the time being.
Note on design
I think it is a little premature for us to be designing the TaskIdentitiesProvider
extension right now as we don't have clarity on a real world use case, other than adding dimensions to a metric.
Once we get some clarity on a use case (a reasonable way to justify this would be an implementation of TaskIdentitiesProvider
other than the default impl), we would be able to get consensus on the design. Until then, we should simply add a new dimension to the heartbeat metric.
This is especially important since you intend to expose this as an extension point. Extension points need a little more thought when designing as they cannot be changed freely once defined due to backwards compatibility reasons.
TaskReport.buildTaskReports(new MSQTaskReport(id(), taskReportPayload)) | ||
TaskReport.buildTaskReports(new MSQTaskReport( | ||
id(), | ||
task.getContextValue(DruidMetrics.TAGS), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why does the report need the tags? Aren't these meant only for metrics?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess you can put tags as well here. But the correct location should be
MSQTaskReportPayload
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since the tags would be in the context, and the report already has the task context so I think no change should be required here no ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But it would be already part of context so this should not be required.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The tags can contain some specific info about the task such as msq_async_download
can be a tag, it will be generated from code based on task spec not provided by user through context.
Tags are also included in task report as external systems can read task report and rely on the tags to know the purpose of the task and take actions based on it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since the tags would be in the context, and the report already has the task context so I think no change should be required here no ?
I think you mean task query context
field, it's different from Task.context
which all task types can have on the top level.
@kfaraz , thanks for your input. The design of
These are real questions we have and it's tightly coupled with the business requirements, that's why it's designed as an |
indexing-service/src/main/java/org/apache/druid/indexing/common/task/TaskContextEnricher.java
Fixed
Show fixed
Hide fixed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left some feedback.
...ing-service/src/main/java/org/apache/druid/indexing/common/task/NoopTaskContextEnricher.java
Outdated
Show resolved
Hide resolved
indexing-service/src/main/java/org/apache/druid/indexing/common/task/TaskContextEnricher.java
Outdated
Show resolved
Hide resolved
indexing-service/src/main/java/org/apache/druid/indexing/common/IngestionStatsAndErrors.java
Outdated
Show resolved
Hide resolved
TaskReport.buildTaskReports(new MSQTaskReport( | ||
id(), | ||
taskReportPayload | ||
)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Formatting:
TaskReport.buildTaskReports(new MSQTaskReport( | |
id(), | |
taskReportPayload | |
)) | |
TaskReport.buildTaskReports( | |
new MSQTaskReport(id(), taskReportPayload) | |
) |
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
Outdated
Show resolved
Hide resolved
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
Show resolved
Hide resolved
...lti-stage-query/src/main/java/org/apache/druid/msq/indexing/report/MSQTaskReportPayload.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the updates @YongGang I have removed my -1, but there are still some changes that I think would be good.
Reading the PR, it seems like the main change now is that task contexts are now being passed from the user into the task report. Can you please update the title of the PR and the description to call this out as the key change.
The fact that the TaskContextEnricher interface can be leveraged in future patches is good to add in the description, but should not be part of the release notes as the interface is not yet marked with an ExtensionPoint annotation. Instead, you could describe an example of task context enriching that will be possible in future - like the example implementation done in your previous patch (adding a tag to identify all auto-compaction tasks instead of relying on matching the start of the group-id with coordinator-issued_compact
)
Also, can we please add docs to https://github.com/apache/druid/blob/master/docs/ingestion/tasks.md#completion-report documenting the fact that tags are now an expected part of the task report.
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
Outdated
Show resolved
Hide resolved
indexing-service/src/main/java/org/apache/druid/indexing/common/IngestionStatsAndErrors.java
Outdated
Show resolved
Hide resolved
Assert.assertTrue( | ||
reportData.getTaskContext() != null && !reportData.getTaskContext().isEmpty(), | ||
"Report data does not contain task context. Ensure that TaskContextEnricher is correctly bound." | ||
); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of just checking that the context is non empty - can we validate that certain fields we expect are there and contain the expected values?
Similar changes to existing integration tests for streaming, compaction and MSQ tasks please. I also noticed there are 2 AbstractITBatchIndexTest
classes, this one, and another one in the druid-it-cases
sub module. Please make a similar change there so that we get good integration test coverage that the task context is being included in the report for all the different test cases.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some minor pending comments, otherwise looks good.
indexing-service/src/main/java/org/apache/druid/indexing/common/task/TaskContextEnricher.java
Show resolved
Hide resolved
@@ -81,6 +82,22 @@ public void testSerdeOfKillTaskReport() throws Exception | |||
Assert.assertEquals(originalReport, deserializedReport); | |||
} | |||
|
|||
@Test | |||
public void testSerdeOfTaskContextReport() throws Exception |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for adding the test!
|
||
TaskContextReport taskContextReport = (TaskContextReport) indexer.getTaskReport(taskID).get(TaskContextReport.REPORT_KEY); | ||
|
||
Assert.assertFalse(taskContextReport.getPayload().isEmpty()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This assertion doesn't seem to add much value.
if (taskReport == null) { | ||
throw new ISE("Unable to fetch the status report for the task [%]", resultTaskStatus.getTaskId()); | ||
} | ||
TaskContextReport taskContextReport = (TaskContextReport) statusReport.get(TaskContextReport.REPORT_KEY); | ||
Assert.assertFalse(taskContextReport.getPayload().isEmpty()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we have some more meaningful assertions? If not relevant here, then we can just skip this assertion too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is to assure TaskContextReport is generated for the task. Since the payload is task context which is hard to know the exact value for comparison so here it just check whether the field is populated.
TaskContextReport taskContextReport = | ||
(TaskContextReport) indexer.getTaskReport(taskID).get(TaskContextReport.REPORT_KEY); | ||
|
||
Assert.assertFalse(taskContextReport.getPayload().isEmpty()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add some more concrete assertions or skip this one too.
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
Show resolved
Hide resolved
Co-authored-by: Kashif Faraz <kashif.faraz@gmail.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will be unavailable to review further updates to this PR for the next 2 weeks, so I am marking my review as +1 so that my review is not blocking.
I do agree with @kfaraz's suggestion that more specific checks in the ITs for a few specific key value pairs in the context would be helpful.
Co-authored-by: Suneet Saldanha <suneet@apache.org>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The tests need to be refined, but that can be addressed later too.
docs/operations/metrics.md
Outdated
field in the `context` field of the ingestion spec. `tags` is expected to be a map of string to object. | ||
The `tags` dimension enhances metrics, task reports, and the Peon service/heartbeat metric associated with ingestion tasks. These tags are derived from both the ingestion specification—when the `tags` field is specified within the task's context—and system-generated sources. The tags field within a task's `context` should be a map with string keys and object values, facilitating the inclusion of custom metadata. However, tags can also be automatically generated and added by the system, providing a comprehensive set of metadata for monitoring and analysis purposes. | ||
|
||
To further customize and enrich task metadata, developers can implement the `TaskContextEnricher` interface. By implementing custom logic within this interface, additional context fields can be introduced. This capability allows for enhanced observability and management of tasks, as these additional context fields can offer deeper insights into task execution, performance, and outcomes. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should also mention that TaskContextProvider runs in the critical path. So use carefully.
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
Show resolved
Hide resolved
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
Show resolved
Hide resolved
* Facilitates sharing the controller's execution environment | ||
* and configurations with its associated worker tasks. | ||
*/ | ||
public static final String CTX_OF_CONTROLLER = "controllerCtx"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should not be configurable.
We should revert this. Having another configuration parameter does not make sense to me.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My thinking on this:
#16041 (comment)
My follow up work will need to read the Controller's ctx when managing/scheduling Worker task as only Controller task has the full task info (e.g. hard to know if it's an async download task from the Worker task itself), these labels/tags need to be generated based on Controller task then pass on to Worker to guide the task execution.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just pass only the labels/tag to the worker task always. No need to add a new configuration parameter.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yup, after thinking this again, this new param in Worker cxt is better to be removed for now.
Will put up a PR to do it on Monday. Thanks @cryptoe
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@@ -512,6 +516,8 @@ public boolean add(final Task task) | |||
SinglePhaseParallelIndexTaskRunner.DEFAULT_USE_LINEAGE_BASED_SEGMENT_ALLOCATION | |||
); | |||
|
|||
taskContextEnricher.enrichContext(task); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes this is a very critical flow for druid. We cannot allow extensions to break this.
I would go one step further and add a future return type to the interface like .
class TaskEnrichContext{
Future<Void> enrichContext(Task task)
}
and then this code waits upto a max of 20 MS to proceed else timeout.
future.get(20MS)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a normal interface (instead of an extension point), the default implementation is Noop
.
Seems to me it will be overly protected by using try-catch and Future for such code.
@YongGang I have updated the release notes in the description to better depict the change. Please review it once. |
Description
This PR initiates enhancements in Druid's task management and monitoring framework. It integrates task context into a new
TaskContextReport
and aligns task context tags with metrics reporting for a unified overview.Additionally, the introduction of the
TaskContextEnricher
interface focuses on enriching tasks with detailed context information, thereby enhancing monitoring, reporting, and providing deeper insights into task dynamics. A potential future implementation of this interface could, for example, utilize the group-id prefixcoordinator-issued_compact
to classify tasks as part of auto-compaction, subsequently generating appropriate tags for easy identification.Release note
TaskContextReport
for reporting task context.TaskContextEnricher
interface which can be used in extensions to enrich taskContext.Key changed/added classes in this PR
TaskContextEnricher.java
an interface helps provide extra context info about tasksTaskContextReport.java
for reporting task contextThis PR has: