Skip to content

Commit

Permalink
[ML] Retry reset if existing task was terminated abruptly (#77328) (#…
Browse files Browse the repository at this point in the history
…77338)

The reset API will wait on the existing reset task to finish
if the job is blocked on reset. If the existing task appears
to have finished it goes on to respond success to the caller.

However, it appears that if the reset task was running on a node
that went down, the task appears finished even though it may have
not completed its work.

This commit adds logic to retry resetting the job if the existing
task appears finished but the job is still blocked on reset.

Relates #77207
  • Loading branch information
dimitris-athanasiou committed Sep 7, 2021
1 parent fa60a09 commit 2d9a218
Showing 1 changed file with 33 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,10 @@ protected void masterOperation(Task task, ResetJobAction.Request request, Cluste
}

if (job.getBlocked().getReason() == Blocked.Reason.RESET) {
waitExistingResetTaskToComplete(job.getBlocked().getTaskId(), request, listener);
waitExistingResetTaskToComplete(job.getBlocked().getTaskId(), request, ActionListener.wrap(
r -> resetIfJobIsStillBlockedOnReset(task, request, listener),
listener::onFailure
));
} else {
ParentTaskAssigningClient taskClient = new ParentTaskAssigningClient(client, taskId);
jobConfigProvider.updateJobBlockReason(job.getId(), new Blocked(Blocked.Reason.RESET, taskId), ActionListener.wrap(
Expand Down Expand Up @@ -146,6 +149,35 @@ private void waitExistingResetTaskToComplete(TaskId existingTaskId, ResetJobActi
));
}

private void resetIfJobIsStillBlockedOnReset(Task task, ResetJobAction.Request request, ActionListener<AcknowledgedResponse> listener) {
ActionListener<Job.Builder> jobListener = ActionListener.wrap(
jobResponse -> {
Job job = jobResponse.build();
if (job.getBlocked().getReason() == Blocked.Reason.NONE) {
// This means the previous reset task finished successfully as it managed to unset the blocked reason.
logger.debug(() -> new ParameterizedMessage("[{}] Existing reset task finished successfully", request.getJobId()));
listener.onResponse(AcknowledgedResponse.TRUE);
} else if (job.getBlocked().getReason() == Blocked.Reason.RESET){
// Seems like the task was removed abruptly as it hasn't unset the block on reset.
// Let us try reset again.
logger.debug(() -> new ParameterizedMessage("[{}] Existing reset task was interrupted; retrying reset",
request.getJobId()));
ParentTaskAssigningClient taskClient = new ParentTaskAssigningClient(client,
new TaskId(clusterService.localNode().getId(), task.getId()));
resetJob(taskClient, (CancellableTask) task, request, listener);
} else {
// Blocked reason is now different. Let us just communicate the conflict.
listener.onFailure(ExceptionsHelper.conflictStatusException(
"cannot reset job while it is blocked with [" + job.getBlocked().getReason() + "]"));
}
},
listener::onFailure
);

// Get job again to check if it is still blocked
jobConfigProvider.getJob(request.getJobId(), jobListener);
}

private void resetJob(ParentTaskAssigningClient taskClient, CancellableTask task, ResetJobAction.Request request,
ActionListener<AcknowledgedResponse> listener) {
String jobId = request.getJobId();
Expand Down

0 comments on commit 2d9a218

Please sign in to comment.