Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ML] Ignore exceptions while opening job after SIGTERM to JVM #75850

Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,10 @@ public void killAllProcessesOnThisNode() {
}
}

public boolean isNodeDying() {
return nodeDying;
}

/**
* Makes open jobs on this node go through the motions of closing but
* without completing the persistent task and instead telling the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,9 @@ public void validate(OpenJobAction.JobParams params, ClusterState clusterState)
}

@Override
// Exceptions that occur while the node is dying, i.e. after the JVM has received a SIGTERM,
// are ignored. Core services will be stopping in response to the SIGTERM and we want the
// job to try to open again on another node, not spuriously fail on the dying node.
protected void nodeOperation(AllocatedPersistentTask task, OpenJobAction.JobParams params, PersistentTaskState state) {
JobTask jobTask = (JobTask) task;
jobTask.setAutodetectProcessManager(autodetectProcessManager);
Expand All @@ -205,13 +208,17 @@ protected void nodeOperation(AllocatedPersistentTask task, OpenJobAction.JobPara
mappingsUpdate -> jobResultsProvider.setRunningForecastsToFailed(params.getJobId(), ActionListener.wrap(
r -> runJob(jobTask, jobState, params),
e -> {
logger.warn(new ParameterizedMessage("[{}] failed to set forecasts to failed", params.getJobId()), e);
runJob(jobTask, jobState, params);
if (autodetectProcessManager.isNodeDying() == false) {
logger.warn(new ParameterizedMessage("[{}] failed to set forecasts to failed", params.getJobId()), e);
runJob(jobTask, jobState, params);
}
}
)),
e -> {
logger.error(new ParameterizedMessage("[{}] Failed to update results mapping", params.getJobId()), e);
jobTask.markAsFailed(e);
if (autodetectProcessManager.isNodeDying() == false) {
logger.error(new ParameterizedMessage("[{}] Failed to update results mapping", params.getJobId()), e);
jobTask.markAsFailed(e);
}
}
);
// We need to update the results index as we MAY update the current forecast results, setting the running forcasts to failed
Expand All @@ -225,6 +232,9 @@ protected void nodeOperation(AllocatedPersistentTask task, OpenJobAction.JobPara
resultsMappingUpdateHandler);
}

// Exceptions that occur while the node is dying, i.e. after the JVM has received a SIGTERM,
// are ignored. Core services will be stopping in response to the SIGTERM and we want the
// job to try to open again on another node, not spuriously fail on the dying node.
private void runJob(JobTask jobTask, JobState jobState, OpenJobAction.JobParams params) {
// If the job is closing, simply stop and return
if (JobState.CLOSING.equals(jobState)) {
Expand All @@ -246,15 +256,28 @@ private void runJob(JobTask jobTask, JobState jobState, OpenJobAction.JobParams

// This job has a running datafeed attached to it.
// In order to prevent gaps in the model we revert to the current snapshot deleting intervening results.
revertToCurrentSnapshot(jobTask.getJobId(), ActionListener.wrap(response -> openJob(jobTask), jobTask::markAsFailed));
revertToCurrentSnapshot(jobTask.getJobId(), ActionListener.wrap(
response -> openJob(jobTask),
e -> {
if (autodetectProcessManager.isNodeDying() == false) {
jobTask.markAsFailed(e);
}
}
));
} else {
openJob(jobTask);
}
},
jobTask::markAsFailed
e -> {
if (autodetectProcessManager.isNodeDying() == false) {
jobTask.markAsFailed(e);
}
}
);

hasRunningDatafeedTask(jobTask.getJobId(), hasRunningDatafeedTaskListener);
if (autodetectProcessManager.isNodeDying() == false) {
hasRunningDatafeedTask(jobTask.getJobId(), hasRunningDatafeedTaskListener);
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I personally would like there to be a predicate before hasRunningDatafeedTaskListener is even created. Similar to the FAILED check.

if (autodetectProcessManager.isNodeDying()) {
    return;
}

But that is not a huge deal, it just reads a little strange.

}

private boolean isMasterNodeVersionOnOrAfter(Version version) {
Expand Down Expand Up @@ -322,6 +345,9 @@ private void revertToCurrentSnapshot(String jobId, ActionListener<Boolean> liste
executeAsyncWithOrigin(client, ML_ORIGIN, GetJobsAction.INSTANCE, request, jobListener);
}

// Exceptions that occur while the node is dying, i.e. after the JVM has received a SIGTERM,
// are ignored. Core services will be stopping in response to the SIGTERM and we want the
// job to try to open again on another node, not spuriously fail on the dying node.
private void openJob(JobTask jobTask) {
String jobId = jobTask.getJobId();
autodetectProcessManager.openJob(jobTask, clusterState, PERSISTENT_TASK_MASTER_NODE_TIMEOUT, (e2, shouldFinalizeJob) -> {
Expand All @@ -337,19 +363,25 @@ private void openJob(JobTask jobTask) {
ActionListener.wrap(
response -> jobTask.markAsCompleted(),
e -> {
// This error is logged even if the node is dying. This is a nasty place for the node to get killed,
// as most of the job's close sequence has executed, just not the finalization step. The job will
// restart on a different node. If the coordinating node for the close request notices that the job
// changed nodes while waiting for it to close then it will remove the persistent task, which should
// stop the job doing anything significant on its new node. However, the finish time of the job will
// not be set correctly.
logger.error(new ParameterizedMessage("[{}] error finalizing job", jobId), e);
Throwable unwrapped = ExceptionsHelper.unwrapCause(e);
if (unwrapped instanceof DocumentMissingException || unwrapped instanceof ResourceNotFoundException) {
jobTask.markAsCompleted();
} else {
} else if (autodetectProcessManager.isNodeDying() == false) {
jobTask.markAsFailed(e);
}
}
));
} else {
jobTask.markAsCompleted();
}
} else {
} else if (autodetectProcessManager.isNodeDying() == false) {
jobTask.markAsFailed(e2);
}
});
Expand Down