-
Notifications
You must be signed in to change notification settings - Fork 3.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add support for task reports, upload reports to deep storage #5524
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jon-wei left a few comments.
One more thing I want to comment is, the way of processing taskReport
s looks complicated because TaskStatusWithReports
is returned by a task upon its completion, but two variables inside it are handled differently. TaskStatus
is returned to the overlord as it does, but taskReport
is uploaded to deep storage and swept out from TaskStatusWithReports
. As a result, from the perspective of overlords, taskReport
is always empty. This is intended to avoid such a large pressure on ZK, but, since they are handled differently, I think it's better to not keep them in a single class.
One of possible alternatives is providing an interface to tasks like adding a taskReporter
to TaskToolbox
. Each task can use taskReporter
if they have something to report. What do you think?
final Path path = getTaskReportsFileFromId(taskId); | ||
log.info("Writing task reports to: %s", path); | ||
pushTaskFile(path, reportFile); | ||
log.info("Wrote task reports to: %s", path); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just out of curiosity, looks like only this type of taskLogs writes two lines of logs before and after pushing task reports. Is this intended?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pushTaskLog
also has two logs there, so I did the same
@PathParam("taskid") final String taskid | ||
) | ||
{ | ||
try { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Doesn't this API need to check authentication and authorization?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's handled by @ResourceFilters(TaskResourceFilter.class)
on the method
TaskStatusWithReports taskStatusWithReports = (TaskStatusWithReports) taskStatus; | ||
final File reportsFileParent = reportsFile.getParentFile(); | ||
if (reportsFileParent != null) { | ||
reportsFileParent.mkdirs(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FileUtils.forceMkdir()
would be better because it has some error checks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed to forceMkdir()
final File statusFileParent = statusFile.getParentFile(); | ||
if (statusFileParent != null) { | ||
statusFileParent.mkdirs(); | ||
} | ||
jsonMapper.writeValue(statusFile, taskStatus); | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unnecessary change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed empty line
@@ -178,7 +178,7 @@ default int getPriority() | |||
* | |||
* @throws Exception if this task failed | |||
*/ | |||
TaskStatus run(TaskToolbox toolbox) throws Exception; | |||
TaskStatusWithReports run(TaskToolbox toolbox) throws Exception; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This might break the compatibility with the existing third-party task implementations.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reverted this to just TaskStatus
public class TaskStatusWithReports extends TaskStatus | ||
{ | ||
@JsonProperty | ||
private TaskStatus taskStatus; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TaskStatus
is duplicated because TaskStatusWithReports
already extends TaskStatus
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Dropped this as a saved field
Hm, I decided to keep the current implementation, since I think of the task log and the task reports as very similar things (unstructured vs. structured task logs) and felt like it would be nicer to handle the file writes/uploads together. Doing the uploads in a common place I felt was simpler than adding a file upload step to each individual task implementation.
I felt this was fine, anything above ExecutorLifecycle in the hierarchy of runners/tasks would only see a plain TaskStatus object, so there wouldn't be an empty "taskReports" field |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm, I decided to keep the current implementation, since I think of the task log and the task reports as very similar things (unstructured vs. structured task logs) and felt like it would be nicer to handle the file writes/uploads together. Doing the uploads in a common place I felt was simpler than adding a file upload step to each individual task implementation.
Oh, yeah I definitely agree with you. My previous comment was about how to get taskReports from inside of a task to outside.
In the current implementation, taskReports are contained TaskStatusWithReports
which extends TaskStatus
. Since a task returns TaskStatusWithReports
once it finishes its work, the report is returned with the task complete status together. Then, ExecutorLifecycle
hijacks TaskStatusWithReports
returned from a task, writes only the report part to a file, and replaces it with TaskStatus
without reports. Once ForkingTaskRunner
recognizes a task complete, it pushes the taskReports file to deep storage along with task logs.
This looks quite complicated to me because
- Since
Task
andTaskRunner
are supposed to returnTaskStatus
, it isn't intuitive that some ofTaskRunner
s andTask
s returnTaskStatusWithReports
instead ofTaskStatus
, but others are not. - Even though a task returns
TaskStatusWithReports
,RemoteTaskRunner
receives onlyTaskStatus
without reports. This makes some other developers confused unless they are familiar with how taskStatus is notified from tasks to overlords. This may cause potential bugs in the future if they try to modify the codes around it.
Probably there is a simpler alternative and it might be possible by passing to a sort of taskReporter to tasks (via TaskToolbox
) which writes taskReports to a file of the predefined path. Then, ForkingTaskRunner
can read and push the taskReport file without hijacking in ExecutorLifecycle
. What do you think?
public TaskStatus getTaskStatus() | ||
{ | ||
return taskStatus; | ||
return new TaskStatus( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This might be possible by simplly casting itself. Is this method supposed to always return a new instance? If so, the method name should be something else like newTaskStatus()
rather than getTaskStatus()
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's meant to return a new base TaskStatus object without the reports, I renamed the method to "makeTaskStatusWithoutReports"
@jihoonson good points, I've changed this to use a TaskReportFileWriter that's injected into the toolbox |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jon-wei thanks. +1 after Travis.
* TaskReport objects contain additional information about an indexing task, such as row statistics, errors, and | ||
* published segments. They are kept in deep storage along with task logs. | ||
*/ | ||
/** |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please remove this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed the extra /** */
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall LGTM after the refactor 🤘
@@ -262,7 +262,7 @@ public void stop() | |||
} | |||
} | |||
final ListenableFuture<TaskStatus> statusFuture = exec.get(taskPriority) | |||
.submit(new ThreadPoolTaskRunnerCallable( | |||
.submit(new ThreadPoolTaskRunnerCallable( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Formatting looks off here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed formatting
@@ -187,6 +188,12 @@ public void configure(Binder binder) | |||
.setStatusFile(new File(taskAndStatusFile.get(1))) | |||
); | |||
|
|||
binder.bind(TaskReportFileWriter.class).toInstance( | |||
new TaskReportFileWriter( | |||
new File(taskAndStatusFile.get(2)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should you update the taskAndStatus arguments annotation to include this 3rd file? Also, maybe we should consider pulling these out into standalone variables at startup, like taskFileName
statusFileName
reportFileName
, so it's more obvious what they are?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated the annotation and made standalone variables for the file paths
…5524) * Add support for task reports, upload reports to deep storage * PR comments * Better name for method * Fix report file upload * Use TaskReportFileWriter * Checkstyle * More PR comments
This PR allows indexing tasks to return a map of TaskReport objects along with the TaskStatus. A TaskReport will contain structured information about a task, such as row stats, parsing errors, or a list of published segments.
The TaskReports will be uploaded to deep storage along with the Task's stdout log, accessible via an overlord endpoint keyed by task ID.
This is intended to support PR #5418 and #5492 which will generate large blocks of structured information upon task completion.
This large TaskReport information is kept in deep storage only, to avoid storing large objects in the metadata storage or in zookeeper nodes (via RemoteTaskRunner).
ExecutorLifecycle will write the TaskReports to disk and strip out the TaskReport objects before returning a TaskStatus, and the deep storage upload is handled in ForkingTaskRunner like the log upload.