Skip to content

Commit

Permalink
[ML] Fix race condition when stopping a recently relocated datafeed (#…
Browse files Browse the repository at this point in the history
…84636)

It was possible that a request to stop a datafeed that had very recently
relocated from one node to another could get ignored and leave the
datafeed running.

The changes in this PR fix this by closing a loophole where a stop request
would not get recorded if the DatafeedRunner for the datafeed was set but
its DatafeedRunner.Holder was not. Now we always record that a datafeed
task has been told to stop, and abort the startup process when we see this
has happened.

Relates #81649
  • Loading branch information
droberts195 committed Mar 3, 2022
1 parent d081f14 commit 6e1abdf
Show file tree
Hide file tree
Showing 5 changed files with 85 additions and 70 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/84636.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 84636
summary: Fix race condition when stopping a recently relocated datafeed
area: Machine Learning
type: bug
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -563,7 +563,7 @@ protected AllocatedPersistentTask createTask(

public static class DatafeedTask extends AllocatedPersistentTask implements StartDatafeedAction.DatafeedTaskMatcher {

public enum StoppedOrIsolatedBeforeRunning {
public enum StoppedOrIsolated {
NEITHER,
ISOLATED,
STOPPED
Expand All @@ -577,7 +577,7 @@ public enum StoppedOrIsolatedBeforeRunning {
* the value of the {@code stoppedOrIsolatedBeforeRunning} flag.
*/
private DatafeedRunner datafeedRunner;
private StoppedOrIsolatedBeforeRunning stoppedOrIsolatedBeforeRunning = StoppedOrIsolatedBeforeRunning.NEITHER;
private StoppedOrIsolated stoppedOrIsolated = StoppedOrIsolated.NEITHER;

DatafeedTask(
long id,
Expand Down Expand Up @@ -612,16 +612,27 @@ public boolean isLookbackOnly() {

/**
* Set the datafeed runner <em>if</em> the task has not already been told to stop or isolate.
* @return A {@link StoppedOrIsolatedBeforeRunning} object that indicates whether the
* @return A {@link StoppedOrIsolated} object that indicates whether the
* datafeed task had previously been told to stop or isolate. {@code datafeedRunner}
* will only be set to the supplied value if the return value of this method is
* {@link StoppedOrIsolatedBeforeRunning#NEITHER}.
* {@link StoppedOrIsolated#NEITHER}.
*/
synchronized StoppedOrIsolatedBeforeRunning setDatafeedRunner(DatafeedRunner datafeedRunner) {
if (stoppedOrIsolatedBeforeRunning == StoppedOrIsolatedBeforeRunning.NEITHER) {
this.datafeedRunner = Objects.requireNonNull(datafeedRunner);
StoppedOrIsolated setDatafeedRunner(DatafeedRunner datafeedRunner) {
return executeIfNotStoppedOrIsolated(() -> this.datafeedRunner = Objects.requireNonNull(datafeedRunner));
}

/**
* Run a command <em>if</em> the task has not already been told to stop or isolate.
* @param runnable The command to run.
* @return A {@link StoppedOrIsolated} object that indicates whether the datafeed task
* had previously been told to stop or isolate. {@code runnable} will only be
* run if the return value of this method is {@link StoppedOrIsolated#NEITHER}.
*/
public synchronized StoppedOrIsolated executeIfNotStoppedOrIsolated(Runnable runnable) {
if (stoppedOrIsolated == StoppedOrIsolated.NEITHER) {
runnable.run();
}
return stoppedOrIsolatedBeforeRunning;
return stoppedOrIsolated;
}

@Override
Expand All @@ -641,32 +652,32 @@ public boolean shouldCancelChildrenOnCancellation() {

public void stop(String reason, TimeValue timeout) {
synchronized (this) {
stoppedOrIsolated = StoppedOrIsolated.STOPPED;
if (datafeedRunner == null) {
stoppedOrIsolatedBeforeRunning = StoppedOrIsolatedBeforeRunning.STOPPED;
return;
}
}
datafeedRunner.stopDatafeed(this, reason, timeout);
}

public synchronized StoppedOrIsolatedBeforeRunning getStoppedOrIsolatedBeforeRunning() {
return stoppedOrIsolatedBeforeRunning;
public synchronized StoppedOrIsolated getStoppedOrIsolated() {
return stoppedOrIsolated;
}

public void isolate() {
synchronized (this) {
// Stopped takes precedence over isolated for what we report externally,
// as stopped needs to cause the persistent task to be marked as completed
// (regardless of whether it was isolated) whereas isolated but not stopped
// mustn't do this.
if (stoppedOrIsolated == StoppedOrIsolated.NEITHER) {
stoppedOrIsolated = StoppedOrIsolated.ISOLATED;
}
if (datafeedRunner == null) {
// Stopped takes precedence over isolated for what we report externally,
// as stopped needs to cause the persistent task to be marked as completed
// (regardless of whether it was isolated) whereas isolated but not stopped
// mustn't do this.
if (stoppedOrIsolatedBeforeRunning == StoppedOrIsolatedBeforeRunning.NEITHER) {
stoppedOrIsolatedBeforeRunning = StoppedOrIsolatedBeforeRunning.ISOLATED;
}
return;
}
}
datafeedRunner.isolateDatafeed(getAllocationId());
datafeedRunner.isolateDatafeed(this);
}

void completeOrFailIfRequired(Exception error) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.action.TransportStartDatafeedAction;
import org.elasticsearch.xpack.ml.action.TransportStartDatafeedAction.DatafeedTask.StoppedOrIsolatedBeforeRunning;
import org.elasticsearch.xpack.ml.action.TransportStartDatafeedAction.DatafeedTask.StoppedOrIsolated;
import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager;
import org.elasticsearch.xpack.ml.notifications.AnomalyDetectionAuditor;

Expand Down Expand Up @@ -97,9 +97,11 @@ public void run(TransportStartDatafeedAction.DatafeedTask task, Consumer<Excepti
ActionListener<DatafeedJob> datafeedJobHandler = ActionListener.wrap(datafeedJob -> {
String jobId = datafeedJob.getJobId();
Holder holder = new Holder(task, task.getDatafeedId(), datafeedJob, new ProblemTracker(auditor, jobId), finishHandler);
if (task.getStoppedOrIsolatedBeforeRunning() == StoppedOrIsolatedBeforeRunning.NEITHER) {
runningDatafeedsOnThisNode.put(task.getAllocationId(), holder);
task.updatePersistentTaskState(DatafeedState.STARTED, new ActionListener<PersistentTask<?>>() {
StoppedOrIsolated stoppedOrIsolated = task.executeIfNotStoppedOrIsolated(
() -> runningDatafeedsOnThisNode.put(task.getAllocationId(), holder)
);
if (stoppedOrIsolated == StoppedOrIsolated.NEITHER) {
task.updatePersistentTaskState(DatafeedState.STARTED, new ActionListener<>() {
@Override
public void onResponse(PersistentTask<?> persistentTask) {
taskRunner.runWhenJobIsOpened(task, jobId);
Expand All @@ -121,20 +123,21 @@ public void onFailure(Exception e) {
logger.info(
"[{}] Datafeed has been {} before running",
task.getDatafeedId(),
task.getStoppedOrIsolatedBeforeRunning().toString().toLowerCase(Locale.ROOT)
stoppedOrIsolated.toString().toLowerCase(Locale.ROOT)
);
finishHandler.accept(null);
}
}, finishHandler);

ActionListener<DatafeedContext> datafeedContextListener = ActionListener.wrap(datafeedContext -> {
if (task.getStoppedOrIsolatedBeforeRunning() == StoppedOrIsolatedBeforeRunning.NEITHER) {
StoppedOrIsolated stoppedOrIsolated = task.getStoppedOrIsolated();
if (stoppedOrIsolated == StoppedOrIsolated.NEITHER) {
datafeedJobBuilder.build(task, datafeedContext, datafeedJobHandler);
} else {
logger.info(
"[{}] Datafeed has been {} while building context",
task.getDatafeedId(),
task.getStoppedOrIsolatedBeforeRunning().toString().toLowerCase(Locale.ROOT)
stoppedOrIsolated.toString().toLowerCase(Locale.ROOT)
);
finishHandler.accept(null);
}
Expand Down Expand Up @@ -180,10 +183,10 @@ public void prepareForImmediateShutdown() {
}
}

public void isolateDatafeed(long allocationId) {
public void isolateDatafeed(TransportStartDatafeedAction.DatafeedTask task) {
// This calls get() rather than remove() because we expect that the persistent task will
// be removed shortly afterwards and that operation needs to be able to find the holder
Holder holder = runningDatafeedsOnThisNode.get(allocationId);
Holder holder = runningDatafeedsOnThisNode.get(task.getAllocationId());
if (holder != null) {
holder.isolateDatafeed();
}
Expand Down Expand Up @@ -372,8 +375,8 @@ private TimeValue computeNextDelay(long next) {
/**
* Visible for testing
*/
boolean isRunning(long allocationId) {
return runningDatafeedsOnThisNode.containsKey(allocationId);
boolean isRunning(TransportStartDatafeedAction.DatafeedTask task) {
return runningDatafeedsOnThisNode.containsKey(task.getAllocationId());
}

public class Holder {
Expand Down Expand Up @@ -582,33 +585,27 @@ public void onResponse(PersistentTask<StartDatafeedAction.DatafeedParams> persis
for the close job api call.
*/
closeJobRequest.setLocal(true);
executeAsyncWithOrigin(
client,
ML_ORIGIN,
CloseJobAction.INSTANCE,
closeJobRequest,
new ActionListener<CloseJobAction.Response>() {

@Override
public void onResponse(CloseJobAction.Response response) {
if (response.isClosed() == false) {
logger.error("[{}] job close action was not acknowledged", getJobId());
}
executeAsyncWithOrigin(client, ML_ORIGIN, CloseJobAction.INSTANCE, closeJobRequest, new ActionListener<>() {

@Override
public void onResponse(CloseJobAction.Response response) {
if (response.isClosed() == false) {
logger.error("[{}] job close action was not acknowledged", getJobId());
}
}

@Override
public void onFailure(Exception e) {
// Given that the UI force-deletes the datafeed and then force-deletes the job, it's
// quite likely that the auto-close here will get interrupted by a process kill request,
// and it's misleading/worrying to log an error in this case.
if (e instanceof ElasticsearchStatusException exception && exception.status() == RestStatus.CONFLICT) {
logger.debug("[{}] {}", getJobId(), e.getMessage());
} else {
logger.error("[" + getJobId() + "] failed to auto-close job", e);
}
@Override
public void onFailure(Exception e) {
// Given that the UI force-deletes the datafeed and then force-deletes the job, it's
// quite likely that the auto-close here will get interrupted by a process kill request,
// and it's misleading/worrying to log an error in this case.
if (e instanceof ElasticsearchStatusException exception && exception.status() == RestStatus.CONFLICT) {
logger.debug("[{}] {}", getJobId(), e.getMessage());
} else {
logger.error("[" + getJobId() + "] failed to auto-close job", e);
}
}
);
});
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,10 +177,7 @@ public static TransportStartDatafeedAction.DatafeedTask createDatafeedTask(
params,
Collections.emptyMap()
);
assertThat(
task.setDatafeedRunner(datafeedRunner),
is(TransportStartDatafeedAction.DatafeedTask.StoppedOrIsolatedBeforeRunning.NEITHER)
);
assertThat(task.setDatafeedRunner(datafeedRunner), is(TransportStartDatafeedAction.DatafeedTask.StoppedOrIsolated.NEITHER));
return task;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public class DatafeedRunnerTests extends ESTestCase {
private DatafeedJobBuilder datafeedJobBuilder;
private long currentTime = 120000;
private AnomalyDetectionAuditor auditor;
private ArgumentCaptor<ClusterStateListener> capturedClusterStateListener = ArgumentCaptor.forClass(ClusterStateListener.class);
private final ArgumentCaptor<ClusterStateListener> capturedClusterStateListener = ArgumentCaptor.forClass(ClusterStateListener.class);
private AtomicBoolean hasOpenAutodetectCommunicator;

@Before
Expand Down Expand Up @@ -132,8 +132,7 @@ public void setUpTests() {
when(datafeedJob.getMaxEmptySearches()).thenReturn(null);
datafeedJobBuilder = mock(DatafeedJobBuilder.class);
doAnswer(invocationOnMock -> {
@SuppressWarnings("rawtypes")
ActionListener listener = (ActionListener) invocationOnMock.getArguments()[2];
ActionListener<DatafeedJob> listener = (ActionListener<DatafeedJob>) invocationOnMock.getArguments()[2];
listener.onResponse(datafeedJob);
return null;
}).when(datafeedJobBuilder).build(any(), any(), any());
Expand Down Expand Up @@ -232,7 +231,7 @@ public void testRealTime_GivenStoppingAnalysisProblem() throws Exception {
verify(handler).accept(analysisProblemCaptor.capture());
assertThat(analysisProblemCaptor.getValue().getCause(), equalTo(cause));
verify(auditor).error(JOB_ID, "Datafeed is encountering errors submitting data for analysis: stopping");
assertThat(datafeedRunner.isRunning(task.getAllocationId()), is(false));
assertThat(datafeedRunner.isRunning(task), is(false));
}

public void testRealTime_GivenNonStoppingAnalysisProblem() throws Exception {
Expand All @@ -248,7 +247,7 @@ public void testRealTime_GivenNonStoppingAnalysisProblem() throws Exception {
datafeedRunner.run(task, handler);

verify(auditor).error(JOB_ID, "Datafeed is encountering errors submitting data for analysis: non-stopping");
assertThat(datafeedRunner.isRunning(task.getAllocationId()), is(true));
assertThat(datafeedRunner.isRunning(task), is(true));
}

public void testStart_GivenNewlyCreatedJobLookBackAndRealtime() throws Exception {
Expand All @@ -266,10 +265,10 @@ public void testStart_GivenNewlyCreatedJobLookBackAndRealtime() throws Exception
if (cancelled) {
task.stop("test", StopDatafeedAction.DEFAULT_TIMEOUT);
verify(handler).accept(null);
assertThat(datafeedRunner.isRunning(task.getAllocationId()), is(false));
assertThat(datafeedRunner.isRunning(task), is(false));
} else {
verify(threadPool, times(1)).schedule(any(), eq(new TimeValue(1)), eq(MachineLearning.DATAFEED_THREAD_POOL_NAME));
assertThat(datafeedRunner.isRunning(task.getAllocationId()), is(true));
assertThat(datafeedRunner.isRunning(task), is(true));
}
}

Expand Down Expand Up @@ -454,7 +453,7 @@ public void testDatafeedGetsStoppedWhileStarting() {

Consumer<Exception> handler = mockConsumer();
DatafeedTask task = createDatafeedTask(DATAFEED_ID, 0L, 60000L);
when(task.getStoppedOrIsolatedBeforeRunning()).thenReturn(DatafeedTask.StoppedOrIsolatedBeforeRunning.STOPPED);
when(task.getStoppedOrIsolated()).thenReturn(DatafeedTask.StoppedOrIsolated.STOPPED);
datafeedRunner.run(task, handler);

// Verify datafeed aborted after creating context but before doing anything else
Expand Down Expand Up @@ -483,18 +482,24 @@ public static Job.Builder createDatafeedJob() {
return builder;
}

@SuppressWarnings({ "rawtypes", "unchecked" })
@SuppressWarnings("unchecked")
private static DatafeedTask createDatafeedTask(String datafeedId, long startTime, Long endTime) {
DatafeedTask task = mock(DatafeedTask.class);
final DatafeedTask.StoppedOrIsolated stoppedOrIsolated = DatafeedTask.StoppedOrIsolated.NEITHER;
when(task.getDatafeedId()).thenReturn(datafeedId);
when(task.getDatafeedStartTime()).thenReturn(startTime);
when(task.getEndTime()).thenReturn(endTime);
doAnswer(invocationOnMock -> {
ActionListener listener = (ActionListener) invocationOnMock.getArguments()[1];
ActionListener<PersistentTask<?>> listener = (ActionListener<PersistentTask<?>>) invocationOnMock.getArguments()[1];
listener.onResponse(mock(PersistentTask.class));
return null;
}).when(task).updatePersistentTaskState(any(), any());
when(task.getStoppedOrIsolatedBeforeRunning()).thenReturn(DatafeedTask.StoppedOrIsolatedBeforeRunning.NEITHER);
when(task.getStoppedOrIsolated()).thenReturn(stoppedOrIsolated);
doAnswer(invocationOnMock -> {
Runnable runnable = (Runnable) invocationOnMock.getArguments()[0];
runnable.run();
return stoppedOrIsolated;
}).when(task).executeIfNotStoppedOrIsolated(any(Runnable.class));
return task;
}

Expand All @@ -503,11 +508,11 @@ private Consumer<Exception> mockConsumer() {
return mock(Consumer.class);
}

@SuppressWarnings({ "rawtypes", "unchecked" })
@SuppressWarnings("unchecked")
private DatafeedTask spyDatafeedTask(DatafeedTask task) {
task = spy(task);
doAnswer(invocationOnMock -> {
ActionListener listener = (ActionListener) invocationOnMock.getArguments()[1];
ActionListener<PersistentTask<?>> listener = (ActionListener<PersistentTask<?>>) invocationOnMock.getArguments()[1];
listener.onResponse(mock(PersistentTask.class));
return null;
}).when(task).updatePersistentTaskState(any(), any());
Expand Down

0 comments on commit 6e1abdf

Please sign in to comment.