Skip to content

Commit

Permalink
[ML] Fix possible race condition when closing an opening job (elastic…
Browse files Browse the repository at this point in the history
…#42506)

This change fixes a race condition that would result in an
in-memory data structure becoming out-of-sync with persistent
tasks in cluster state.

If repeated often enough this could result in it being
impossible to open any ML jobs on the affected node, as the
master node would think the node had capacity to open another
job but the chosen node would error during the open sequence
due to its in-memory data structure being full.

The race could be triggered by opening a job and then closing
it a tiny fraction of a second later.  It is unlikely a user
of the UI could open and close the job that fast, but a script
or program calling the REST API could.

The nasty thing is, from the externally observable states and
stats everything would appear to be fine - the fast open then
close sequence would appear to leave the job in the closed
state.  It's only later that the leftovers in the in-memory
data structure might build up and cause a problem.
  • Loading branch information
droberts195 authored and Gurkan Kaymak committed May 27, 2019
1 parent 820de40 commit cf58f56
Showing 1 changed file with 19 additions and 15 deletions.
Expand Up @@ -401,16 +401,12 @@ protected void doRun() {
logger.debug("Aborted opening job [{}] as it has been closed", jobId);
return;
}
if (processContext.getState() != ProcessContext.ProcessStateName.NOT_RUNNING) {
logger.debug("Cannot open job [{}] when its state is [{}]",
jobId, processContext.getState().getClass().getName());
return;
}

try {
createProcessAndSetRunning(processContext, job, params, closeHandler);
processContext.getAutodetectCommunicator().restoreState(params.modelSnapshot());
setJobState(jobTask, JobState.OPENED);
if (createProcessAndSetRunning(processContext, job, params, closeHandler)) {
processContext.getAutodetectCommunicator().restoreState(params.modelSnapshot());
setJobState(jobTask, JobState.OPENED);
}
} catch (Exception e1) {
// No need to log here as the persistent task framework will log it
try {
Expand Down Expand Up @@ -447,19 +443,25 @@ protected void doRun() {
ElasticsearchMappings::resultsMapping, client, clusterState, resultsMappingUpdateHandler);
}

private void createProcessAndSetRunning(ProcessContext processContext,
Job job,
AutodetectParams params,
BiConsumer<Exception, Boolean> handler) throws IOException {
private boolean createProcessAndSetRunning(ProcessContext processContext,
Job job,
AutodetectParams params,
BiConsumer<Exception, Boolean> handler) throws IOException {
// At this point we lock the process context until the process has been started.
// The reason behind this is to ensure closing the job does not happen before
// the process is started as that can result to the job getting seemingly closed
// but the actual process is hanging alive.
processContext.tryLock();
try {
if (processContext.getState() != ProcessContext.ProcessStateName.NOT_RUNNING) {
logger.debug("Cannot open job [{}] when its state is [{}]",
job.getId(), processContext.getState().getClass().getName());
return false;
}
AutodetectCommunicator communicator = create(processContext.getJobTask(), job, params, handler);
communicator.writeHeader();
processContext.setRunning(communicator);
return true;
} finally {
// Now that the process is running and we have updated its state we can unlock.
// It is important to unlock before we initialize the communicator (ie. load the model state)
Expand Down Expand Up @@ -592,6 +594,8 @@ public void closeJob(JobTask jobTask, boolean restart, String reason) {
try {
if (processContext.setDying() == false) {
logger.debug("Cannot close job [{}] as it has been marked as dying", jobId);
// The only way we can get here is if 2 close requests are made very close together.
// The other close has done the work so it's safe to return here without doing anything.
return;
}

Expand All @@ -605,10 +609,10 @@ public void closeJob(JobTask jobTask, boolean restart, String reason) {
if (communicator == null) {
logger.debug("Job [{}] is being closed before its process is started", jobId);
jobTask.markAsCompleted();
return;
} else {
communicator.close(restart, reason);
}

communicator.close(restart, reason);
processByAllocation.remove(allocationId);
} catch (Exception e) {
// If the close failed because the process has explicitly been killed by us then just pass on that exception
Expand All @@ -628,7 +632,7 @@ public void closeJob(JobTask jobTask, boolean restart, String reason) {
try {
nativeStorageProvider.cleanupLocalTmpStorage(jobTask.getDescription());
} catch (IOException e) {
logger.error(new ParameterizedMessage("[{}]Failed to delete temporary files", jobId), e);
logger.error(new ParameterizedMessage("[{}] Failed to delete temporary files", jobId), e);
}
}

Expand Down

0 comments on commit cf58f56

Please sign in to comment.