Skip to content

Commit

Permalink
[ML] Fix minor race condition in dataframe analytics _stop (#53029)
Browse files Browse the repository at this point in the history
Tests have been periodically failing due to a race condition on checking a recently `STOPPED` task's state. The `.ml-state` index is not created until the task has already been transitioned to `STARTED`. This allows the `_start` API call to return. But, if a user (or test) immediately attempts to `_stop` that job, the job could stop and the task removed BEFORE the `.ml-state|stats` indices are created/updated.

This change moves towards the task cleaning up itself in its main execution thread. `stop` flips the flag of the task to `isStopping` and now we check `isStopping` at every necessary method. Allowing the task to gracefully stop.

closes #53007
  • Loading branch information
benwtrent committed Mar 5, 2020
1 parent dda18fa commit 932f81b
Show file tree
Hide file tree
Showing 5 changed files with 49 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,6 @@ public void testOutlierDetectionWithMoreFieldsThanDocValueFieldLimit() throws Ex
"Finished analysis");
}

@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/53007")
public void testStopOutlierDetectionWithEnoughDocumentsToScroll() throws Exception {
String sourceIndex = "test-stop-outlier-detection-with-enough-docs-to-scroll";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.elasticsearch.xpack.core.XPackField;
import org.elasticsearch.xpack.core.common.validation.SourceDestValidator;
import org.elasticsearch.xpack.core.ml.MlMetadata;
import org.elasticsearch.xpack.core.ml.MlStatsIndex;
import org.elasticsearch.xpack.core.ml.MlTasks;
import org.elasticsearch.xpack.core.ml.action.ExplainDataFrameAnalyticsAction;
import org.elasticsearch.xpack.core.ml.action.GetDataFrameAnalyticsStatsAction;
Expand Down Expand Up @@ -510,6 +511,10 @@ static List<String> verifyIndicesPrimaryShardsAreActive(ClusterState clusterStat
String[] concreteIndices = resolver.concreteIndexNames(clusterState, IndicesOptions.lenientExpandOpen(), indexNames);
List<String> unavailableIndices = new ArrayList<>(concreteIndices.length);
for (String index : concreteIndices) {
// This is OK as indices are created on demand
if (clusterState.metaData().hasIndex(index) == false) {
continue;
}
IndexRoutingTable routingTable = clusterState.getRoutingTable().index(index);
if (routingTable == null || routingTable.allPrimaryShardsActive() == false) {
unavailableIndices.add(index);
Expand Down Expand Up @@ -572,7 +577,11 @@ public PersistentTasksCustomMetaData.Assignment getAssignment(StartDataFrameAnal
String id = params.getId();

List<String> unavailableIndices =
verifyIndicesPrimaryShardsAreActive(clusterState, resolver, AnomalyDetectorsIndex.configIndexName());
verifyIndicesPrimaryShardsAreActive(clusterState,
resolver,
AnomalyDetectorsIndex.configIndexName(),
MlStatsIndex.indexPattern(),
AnomalyDetectorsIndex.jobStateIndexPattern());
if (unavailableIndices.size() != 0) {
String reason = "Not opening data frame analytics job [" + id +
"], because not all primary shards are active for the following indices [" + String.join(",", unavailableIndices) + "]";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,11 @@ private void createStatsIndexAndUpdateMappingsIfNecessary(ClusterState clusterSt
}

private void executeStartingJob(DataFrameAnalyticsTask task, DataFrameAnalyticsConfig config) {
if (task.isStopping()) {
LOGGER.debug("[{}] task is stopping. Marking as complete before starting job.", task.getParams().getId());
task.markAsCompleted();
return;
}
DataFrameAnalyticsTaskState reindexingState = new DataFrameAnalyticsTaskState(DataFrameAnalyticsState.REINDEXING,
task.getAllocationId(), null);
DataFrameAnalyticsTask.StartingState startingState = DataFrameAnalyticsTask.determineStartingState(
Expand Down Expand Up @@ -163,6 +168,11 @@ private void executeStartingJob(DataFrameAnalyticsTask task, DataFrameAnalyticsC
}

private void executeJobInMiddleOfReindexing(DataFrameAnalyticsTask task, DataFrameAnalyticsConfig config) {
if (task.isStopping()) {
LOGGER.debug("[{}] task is stopping. Marking as complete before restarting reindexing.", task.getParams().getId());
task.markAsCompleted();
return;
}
ClientHelper.executeAsyncWithOrigin(client,
ML_ORIGIN,
DeleteIndexAction.INSTANCE,
Expand All @@ -182,16 +192,21 @@ private void executeJobInMiddleOfReindexing(DataFrameAnalyticsTask task, DataFra

private void reindexDataframeAndStartAnalysis(DataFrameAnalyticsTask task, DataFrameAnalyticsConfig config) {
if (task.isStopping()) {
// The task was requested to stop before we started reindexing
LOGGER.debug("[{}] task is stopping. Marking as complete before starting reindexing and analysis.",
task.getParams().getId());
task.markAsCompleted();
return;
}

// Reindexing is complete; start analytics
ActionListener<BulkByScrollResponse> reindexCompletedListener = ActionListener.wrap(
reindexResponse -> {
// If the reindex task is canceled, this listener is called.
// Consequently, we should not signal reindex completion.
if (task.isStopping()) {
LOGGER.debug("[{}] Stopping before starting analytics process", config.getId());
LOGGER.debug("[{}] task is stopping. Marking as complete before marking reindex as finished.",
task.getParams().getId());
task.markAsCompleted();
return;
}
task.setReindexingTaskId(null);
Expand Down Expand Up @@ -256,13 +271,26 @@ private void reindexDataframeAndStartAnalysis(DataFrameAnalyticsTask task, DataF
}

private void startAnalytics(DataFrameAnalyticsTask task, DataFrameAnalyticsConfig config) {
if (task.isStopping()) {
LOGGER.debug("[{}] task is stopping. Marking as complete before starting analysis.", task.getParams().getId());
task.markAsCompleted();
return;
}
// Update state to ANALYZING and start process
ActionListener<DataFrameDataExtractorFactory> dataExtractorFactoryListener = ActionListener.wrap(
dataExtractorFactory -> {
DataFrameAnalyticsTaskState analyzingState = new DataFrameAnalyticsTaskState(DataFrameAnalyticsState.ANALYZING,
task.getAllocationId(), null);
task.updatePersistentTaskState(analyzingState, ActionListener.wrap(
updatedTask -> processManager.runJob(task, config, dataExtractorFactory),
updatedTask -> {
if (task.isStopping()) {
LOGGER.debug("[{}] task is stopping. Marking as complete before starting native process.",
task.getParams().getId());
task.markAsCompleted();
return;
}
processManager.runJob(task, config, dataExtractorFactory);
},
error -> {
Throwable cause = ExceptionsHelper.unwrapCause(error);
if (cause instanceof ResourceNotFoundException) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,8 @@ public void runJob(DataFrameAnalyticsTask task, DataFrameAnalyticsConfig config,
ProcessContext processContext = new ProcessContext(config);
synchronized (processContextByAllocation) {
if (task.isStopping()) {
LOGGER.debug("[{}] task is stopping. Marking as complete before creating process context.",
task.getParams().getId());
// The task was requested to stop before we created the process context
auditor.info(config.getId(), Messages.DATA_FRAME_ANALYTICS_AUDIT_FINISHED_ANALYSIS);
task.markAsCompleted();
Expand Down Expand Up @@ -329,7 +331,6 @@ public void stop(DataFrameAnalyticsTask task) {
processContext.stop();
} else {
LOGGER.debug("[{}] No process context to stop", task.getParams().getId());
task.markAsCompleted();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,15 @@
*/
package org.elasticsearch.xpack.ml.dataframe.process;

import org.elasticsearch.Version;
import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.ml.action.StartDataFrameAnalyticsAction;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfigTests;
import org.elasticsearch.xpack.core.ml.dataframe.analyses.OutlierDetectionTests;
Expand All @@ -27,6 +29,7 @@
import org.junit.Before;
import org.mockito.InOrder;

import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutorService;

Expand Down Expand Up @@ -107,12 +110,15 @@ public void setUpMocks() {

public void testRunJob_TaskIsStopping() {
when(task.isStopping()).thenReturn(true);
when(task.getParams()).thenReturn(
new StartDataFrameAnalyticsAction.TaskParams("data_frame_id", Version.CURRENT, Collections.emptyList(), false));

processManager.runJob(task, dataFrameAnalyticsConfig, dataExtractorFactory);
assertThat(processManager.getProcessContextCount(), equalTo(0));

InOrder inOrder = inOrder(task);
inOrder.verify(task).isStopping();
inOrder.verify(task).getParams();
inOrder.verify(task).markAsCompleted();
verifyNoMoreInteractions(task);
}
Expand Down

0 comments on commit 932f81b

Please sign in to comment.