Skip to content

Commit

Permalink
[7.14][ML] Retain job task upon fail during opening (#77206) (#77239)
Browse files Browse the repository at this point in the history
There are a few failures handled in `OpenJobPersistentTasksExecutor`
while the job is opened that are not handled properly. Currently,
the persistent task is marked as failed which effectively completes
the task. Thus the job appears as `closed` and the failure reason
is lost.

This commit changes `OpenJobPersistentTasksExecutor` so that when
failures occur during job opening the job task is not completed.
Instead, the job task's state is updated to `failed` with a
corresponding reason.

Backport of #77206
  • Loading branch information
dimitris-athanasiou committed Sep 3, 2021
1 parent d1264ad commit 14850af
Showing 1 changed file with 30 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ protected void nodeOperation(AllocatedPersistentTask task, OpenJobAction.JobPara
)),
e -> {
logger.error(new ParameterizedMessage("[{}] Failed to update results mapping", params.getJobId()), e);
jobTask.markAsFailed(e);
failTask(jobTask, "failed to update results mapping");
}
);
// We need to update the results index as we MAY update the current forecast results, setting the running forcasts to failed
Expand Down Expand Up @@ -246,17 +246,38 @@ private void runJob(JobTask jobTask, JobState jobState, OpenJobAction.JobParams

// This job has a running datafeed attached to it.
// In order to prevent gaps in the model we revert to the current snapshot deleting intervening results.
revertToCurrentSnapshot(jobTask.getJobId(), ActionListener.wrap(response -> openJob(jobTask), jobTask::markAsFailed));
revertToCurrentSnapshot(jobTask.getJobId(), ActionListener.wrap(
response -> openJob(jobTask),
e -> {
logger.error(new ParameterizedMessage("[{}] failed to revert to current snapshot", jobTask.getJobId()), e);
failTask(jobTask, "failed to revert to current snapshot");
}
));
} else {
openJob(jobTask);
}
},
jobTask::markAsFailed
e -> {
logger.error(new ParameterizedMessage("[{}] failed to search for associated datafeed", jobTask.getJobId()), e);
failTask(jobTask, "failed to search for associated datafeed");
}
);

hasRunningDatafeedTask(jobTask.getJobId(), hasRunningDatafeedTaskListener);
}

private void failTask(JobTask jobTask, String reason) {
JobTaskState failedState = new JobTaskState(JobState.FAILED, jobTask.getAllocationId(), reason);
jobTask.updatePersistentTaskState(failedState, ActionListener.wrap(
r -> logger.debug(() -> new ParameterizedMessage("[{}] updated task state to failed", jobTask.getJobId())),
e -> {
logger.error(new ParameterizedMessage("[{}] error while setting task state to failed; marking task as failed",
jobTask.getJobId()), e);
jobTask.markAsFailed(e);
}
));
}

private boolean isMasterNodeVersionOnOrAfter(Version version) {
return clusterState.nodes().getMasterNode().getVersion().onOrAfter(version);
}
Expand Down Expand Up @@ -338,6 +359,10 @@ private void openJob(JobTask jobTask) {
if (unwrapped instanceof DocumentMissingException || unwrapped instanceof ResourceNotFoundException) {
jobTask.markAsCompleted();
} else {
// In this case we prefer to mark the task as failed, which means the job
// will appear closed. The reason is that the job closed successfully and
// we just failed to update some fields like the finish time. It is preferable
// to let the job close than setting it to failed.
jobTask.markAsFailed(e);
}
}
Expand All @@ -346,7 +371,8 @@ private void openJob(JobTask jobTask) {
jobTask.markAsCompleted();
}
} else {
jobTask.markAsFailed(e2);
logger.error(new ParameterizedMessage("[{}] failed to open job", jobTask.getJobId()), e2);
failTask(jobTask, "failed to open job");
}
});
}
Expand Down

0 comments on commit 14850af

Please sign in to comment.