Skip to content

Commit

Permalink
[ML] Fix race condition between job open, close and kill (#75113) (#7…
Browse files Browse the repository at this point in the history
…5116)

This is a followup to #74976.

The changes of #74976 reverted many of the changes of #71656
because #74415 made them redundant. #74415 did this by making
killed jobs as closing so that the standard "job closed immediately
after open" functionality was used instead of reissuing the kill
immediately after opening. However, it turns out that this
"job closed immediately after open" functionality is not
perfect for the case of a job that is killed while it is opening.
It causes AutodetectCommunicator.close() to be called instead
of AutodetectCommunicator.killProcess(). Both do a lot of the
same things, but AutodetectCommunicator.close() finalizes
the job, and this can cause problems if the job is being killed
as part of a feature reset.

This change reinstates some of the functionality of #71656
but in a different place that hopefully won't reintroduce the
problems that led to #74415.

We can detect that a kill has happened early on during an
open or close operation by checking if the task's allocation
ID has been removed from the map after ProcessContext.setDying()
returns true. If ProcessContext.setDying() returns true this
means the job has not been previously closed, so it must have
been killed. Then we can call AutodetectCommunicator.killProcess()
instead of AutodetectCommunicator.close() during the cleanup
that happens when we detect that a recently started process is
no longer wanted.

Relates #75069

Co-authored-by: David Roberts <dave.roberts@elastic.co>
  • Loading branch information
elasticsearchmachine and droberts195 committed Jul 8, 2021
1 parent 73bbc68 commit 5182b9f
Showing 1 changed file with 32 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -688,7 +688,6 @@ AutodetectCommunicator create(JobTask jobTask, Job job, AutodetectParams autodet
}
return new AutodetectCommunicator(job, process, new StateStreamer(client), dataCountsReporter, processor, handler,
xContentRegistry, autodetectWorkerExecutor);

}

private void notifyLoadingSnapshot(String jobId, AutodetectParams autodetectParams) {
Expand Down Expand Up @@ -735,6 +734,12 @@ private Consumer<String> onProcessCrash(JobTask jobTask) {
private void closeProcessAndTask(ProcessContext processContext, JobTask jobTask, String reason) {
String jobId = jobTask.getJobId();
long allocationId = jobTask.getAllocationId();
// We use a lock to prevent simultaneous open and close from conflicting. However, we found
// that we could not use the lock to stop kill from conflicting because that could lead to
// a kill taking an unacceptably long time to have an effect, which largely defeats the point
// of having an option to quickly kill a process. Therefore we have to deal with the effects
// of kill running simultaneously with open and close.
boolean jobKilled = false;
processContext.tryLock();
try {
if (processContext.setDying() == false) {
Expand All @@ -744,29 +749,47 @@ private void closeProcessAndTask(ProcessContext processContext, JobTask jobTask,
return;
}

if (reason == null) {
// If the job was killed early on during its open sequence then
// its context will already have been removed from this map
jobKilled = (processByAllocation.containsKey(allocationId) == false);
if (jobKilled) {
logger.debug("[{}] Cleaning up job opened after kill", jobId);
} else if (reason == null) {
logger.info("Closing job [{}]", jobId);
} else {
logger.info("Closing job [{}], because [{}]", jobId, reason);
}

AutodetectCommunicator communicator = processContext.getAutodetectCommunicator();
if (communicator == null) {
assert jobKilled == false
: "Job " + jobId + " killed before process started yet still had no communicator during cleanup after process started";
logger.debug("Job [{}] is being closed before its process is started", jobId);
jobTask.markAsCompleted();
processByAllocation.remove(allocationId);
} else {
communicator.close();
if (jobKilled) {
communicator.killProcess(true, false, false);
} else {
// communicator.close() may take a long time to run, if the job persists a large model state as a
// result of calling it. We want to leave open the option to kill the job during this time, which
// is why the allocation ID must remain in the map until after the close is complete.
communicator.close();
processByAllocation.remove(allocationId);
}
}

processByAllocation.remove(allocationId);
} catch (Exception e) {
// If the close failed because the process has explicitly been killed by us then just pass on that exception
// If the close failed because the process has explicitly been killed by us then just pass on that exception.
// (Note that jobKilled may be false in this case, if the kill is executed while communicator.close() is running.)
if (e instanceof ElasticsearchStatusException && ((ElasticsearchStatusException) e).status() == RestStatus.CONFLICT) {
throw e;
logger.trace("[{}] Conflict between kill and close during autodetect process cleanup - job {} before cleanup started",
jobId, jobKilled ? "killed" : "not killed");
throw (ElasticsearchStatusException) e;
}
logger.warn("[" + jobId + "] Exception closing autodetect process", e);
String msg = jobKilled ? "Exception cleaning up autodetect process started after kill" : "Exception closing autodetect process";
logger.warn("[" + jobId + "] " + msg, e);
setJobState(jobTask, JobState.FAILED, e.getMessage());
throw ExceptionsHelper.serverError("Exception closing autodetect process", e);
throw ExceptionsHelper.serverError(msg, e);
} finally {
// to ensure the contract that multiple simultaneous close calls for the same job wait until
// the job is closed is honoured, hold the lock throughout the close procedure so that another
Expand Down

0 comments on commit 5182b9f

Please sign in to comment.