Skip to content

Commit

Permalink
[7.7][ML] Allow force stopping failed and stopping DF analytics (#54650
Browse files Browse the repository at this point in the history
…) (#54715)

Force stopping a failed job used to work but it
now puts the job in `stopping` state and hangs.
In addition, force stopping a `stopping` job is
not handled.

This commit addresses those issues with force
stopping data frame analytics. It inlines the
approach with that followed for anomaly detection
jobs.

Backport of #54650
  • Loading branch information
dimitris-athanasiou committed Apr 3, 2020
1 parent 38f4b03 commit 8579eda
Show file tree
Hide file tree
Showing 6 changed files with 196 additions and 111 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,6 @@ public static String dataFrameAnalyticsTaskId(String id) {
return DATA_FRAME_ANALYTICS_TASK_ID_PREFIX + id;
}

public static String dataFrameAnalyticsIdFromTaskId(String taskId) {
return taskId.replaceFirst(DATA_FRAME_ANALYTICS_TASK_ID_PREFIX, "");
}

@Nullable
public static PersistentTasksCustomMetaData.PersistentTask<?> getJobTask(String jobId, @Nullable PersistentTasksCustomMetaData tasks) {
return tasks == null ? null : tasks.getTask(jobTaskId(jobId));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,12 +169,6 @@ public void testUnallocatedDatafeedIds() {
containsInAnyOrder("datafeed_without_assignment", "datafeed_without_node"));
}

public void testDataFrameAnalyticsTaskIds() {
String taskId = MlTasks.dataFrameAnalyticsTaskId("foo");
assertThat(taskId, equalTo("data_frame_analytics-foo"));
assertThat(MlTasks.dataFrameAnalyticsIdFromTaskId(taskId), equalTo("foo"));
}

public void testGetDataFrameAnalyticsState_GivenNullTask() {
DataFrameAnalyticsState state = MlTasks.getDataFrameAnalyticsState(null);
assertThat(state, equalTo(DataFrameAnalyticsState.STOPPED));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ActionListener;
Expand All @@ -21,6 +22,7 @@
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.discovery.MasterNotDiscoveredException;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.persistent.PersistentTasksService;
Expand All @@ -40,13 +42,14 @@
import org.elasticsearch.xpack.ml.notifications.DataFrameAnalyticsAuditor;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.Stream;

/**
* Stops the persistent task for running data frame analytics.
Expand Down Expand Up @@ -93,73 +96,27 @@ protected void doExecute(Task task, StopDataFrameAnalyticsAction.Request request
ActionListener<Set<String>> expandedIdsListener = ActionListener.wrap(
expandedIds -> {
logger.debug("Resolved data frame analytics to stop: {}", expandedIds);
if (expandedIds.isEmpty()) {
listener.onResponse(new StopDataFrameAnalyticsAction.Response(true));
return;
}

PersistentTasksCustomMetaData tasks = state.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
Set<String> analyticsToStop = findAnalyticsToStop(tasks, expandedIds, request.isForce());
request.setExpandedIds(analyticsToStop);
request.setNodes(findAllocatedNodesAndRemoveUnassignedTasks(analyticsToStop, tasks));
AnalyticsByTaskState analyticsByTaskState = AnalyticsByTaskState.build(expandedIds, tasks);

ActionListener<StopDataFrameAnalyticsAction.Response> finalListener = ActionListener.wrap(
r -> waitForTaskRemoved(expandedIds, request, r, listener),
listener::onFailure
);
if (analyticsByTaskState.isEmpty()) {
listener.onResponse(new StopDataFrameAnalyticsAction.Response(true));
return;
}

super.doExecute(task, request, finalListener);
if (request.isForce()) {
forceStop(request, listener, tasks, analyticsByTaskState.getNonStopped());
} else {
normalStop(task, request, listener, tasks, analyticsByTaskState);
}
},
listener::onFailure
);

expandIds(state, request, expandedIdsListener);
}

/** Visible for testing */
static Set<String> findAnalyticsToStop(PersistentTasksCustomMetaData tasks, Set<String> ids, boolean force) {
Set<String> startedAnalytics = new HashSet<>();
Set<String> stoppingAnalytics = new HashSet<>();
Set<String> failedAnalytics = new HashSet<>();
sortAnalyticsByTaskState(ids, tasks, startedAnalytics, stoppingAnalytics, failedAnalytics);

if (force == false && failedAnalytics.isEmpty() == false) {
ElasticsearchStatusException e = failedAnalytics.size() == 1 ? ExceptionsHelper.conflictStatusException(
"cannot close data frame analytics [{}] because it failed, use force stop instead", failedAnalytics.iterator().next()) :
ExceptionsHelper.conflictStatusException("one or more data frame analytics are in failed state, " +
"use force stop instead");
throw e;
}

startedAnalytics.addAll(failedAnalytics);
return startedAnalytics;
}

private static void sortAnalyticsByTaskState(Set<String> analyticsIds, PersistentTasksCustomMetaData tasks,
Set<String> startedAnalytics, Set<String> stoppingAnalytics,
Set<String> failedAnalytics) {
for (String analyticsId : analyticsIds) {
switch (MlTasks.getDataFrameAnalyticsState(analyticsId, tasks)) {
case STARTING:
case STARTED:
case REINDEXING:
case ANALYZING:
startedAnalytics.add(analyticsId);
break;
case STOPPING:
stoppingAnalytics.add(analyticsId);
break;
case STOPPED:
break;
case FAILED:
failedAnalytics.add(analyticsId);
break;
default:
break;
}
}
}

private void expandIds(ClusterState clusterState, StopDataFrameAnalyticsAction.Request request,
ActionListener<Set<String>> expandedIdsListener) {
ActionListener<List<DataFrameAnalyticsConfig>> configsListener = ActionListener.wrap(
Expand All @@ -179,7 +136,107 @@ private void expandIds(ClusterState clusterState, StopDataFrameAnalyticsAction.R
configProvider.getMultiple(request.getId(), request.allowNoMatch(), configsListener);
}

private String[] findAllocatedNodesAndRemoveUnassignedTasks(Set<String> analyticsIds, PersistentTasksCustomMetaData tasks) {
private void normalStop(Task task, StopDataFrameAnalyticsAction.Request request,
ActionListener<StopDataFrameAnalyticsAction.Response> listener,
PersistentTasksCustomMetaData tasks, AnalyticsByTaskState analyticsByTaskState) {
if (analyticsByTaskState.failed.isEmpty() == false) {
ElasticsearchStatusException e = analyticsByTaskState.failed.size() == 1 ? ExceptionsHelper.conflictStatusException(
"cannot close data frame analytics [{}] because it failed, use force stop instead",
analyticsByTaskState.failed.iterator().next()) :
ExceptionsHelper.conflictStatusException("one or more data frame analytics are in failed state, use force stop instead");
listener.onFailure(e);
return;
}

request.setExpandedIds(new HashSet<>(analyticsByTaskState.started));
request.setNodes(findAllocatedNodesAndRemoveUnassignedTasks(analyticsByTaskState.started, tasks));

// Wait for started and stopping analytics
Set<String> allAnalyticsToWaitFor = Stream.concat(
analyticsByTaskState.started.stream().map(MlTasks::dataFrameAnalyticsTaskId),
analyticsByTaskState.stopping.stream().map(MlTasks::dataFrameAnalyticsTaskId)
).collect(Collectors.toSet());

ActionListener<StopDataFrameAnalyticsAction.Response> finalListener = ActionListener.wrap(
r -> waitForTaskRemoved(allAnalyticsToWaitFor, request, r, listener),
e -> {
if (ExceptionsHelper.unwrapCause(e) instanceof FailedNodeException) {
// A node has dropped out of the cluster since we started executing the requests.
// Since stopping an already stopped analytics is not an error we can try again.
// The analytics that were running on the node that dropped out of the cluster
// will just have their persistent tasks cancelled. Analytics that were stopped
// by the previous attempt will be noops in the subsequent attempt.
doExecute(task, request, listener);
} else {
listener.onFailure(e);
}
}
);

super.doExecute(task, request, finalListener);
}

private void forceStop(StopDataFrameAnalyticsAction.Request request, ActionListener<StopDataFrameAnalyticsAction.Response> listener,
PersistentTasksCustomMetaData tasks, List<String> nonStoppedAnalytics) {

final AtomicInteger counter = new AtomicInteger();
final AtomicArray<Exception> failures = new AtomicArray<>(nonStoppedAnalytics.size());

for (String analyticsId : nonStoppedAnalytics) {
PersistentTasksCustomMetaData.PersistentTask<?> analyticsTask = MlTasks.getDataFrameAnalyticsTask(analyticsId, tasks);
if (analyticsTask != null) {
persistentTasksService.sendRemoveRequest(analyticsTask.getId(), ActionListener.wrap(
removedTask -> {
if (counter.incrementAndGet() == nonStoppedAnalytics.size()) {
sendResponseOrFailure(request.getId(), listener, failures);
}
},
e -> {
final int slot = counter.incrementAndGet();
// We validated that the analytics ids supplied in the request existed when we started processing the action.
// If the related tasks don't exist at this point then they must have been stopped by a simultaneous stop request.
// This is not an error.
if (ExceptionsHelper.unwrapCause(e) instanceof ResourceNotFoundException == false) {
failures.set(slot - 1, e);
}
if (slot == nonStoppedAnalytics.size()) {
sendResponseOrFailure(request.getId(), listener, failures);
}
}
));
} else {
// This should not happen, because nonStoppedAnalytics
// were derived from the same tasks that were passed to this method
String msg = "Requested data frame analytics [" + analyticsId + "] be force-stopped, but no task could be found.";
assert analyticsTask != null : msg;
logger.error(msg);
final int slot = counter.incrementAndGet();
failures.set(slot - 1, new RuntimeException(msg));
if (slot == nonStoppedAnalytics.size()) {
sendResponseOrFailure(request.getId(), listener, failures);
}
}
}
}

private void sendResponseOrFailure(String analyticsId, ActionListener<StopDataFrameAnalyticsAction.Response> listener,
AtomicArray<Exception> failures) {
List<Exception> caughtExceptions = failures.asList();
if (caughtExceptions.size() == 0) {
listener.onResponse(new StopDataFrameAnalyticsAction.Response(true));
return;
}

String msg = "Failed to stop data frame analytics [" + analyticsId + "] with [" + caughtExceptions.size()
+ "] failures, rethrowing last, all Exceptions: ["
+ caughtExceptions.stream().map(Exception::getMessage).collect(Collectors.joining(", "))
+ "]";

ElasticsearchException e = new ElasticsearchException(msg, caughtExceptions.get(0));
listener.onFailure(e);
}

private String[] findAllocatedNodesAndRemoveUnassignedTasks(List<String> analyticsIds, PersistentTasksCustomMetaData tasks) {
List<String> nodes = new ArrayList<>();
for (String analyticsId : analyticsIds) {
PersistentTasksCustomMetaData.PersistentTask<?> task = MlTasks.getDataFrameAnalyticsTask(analyticsId, tasks);
Expand Down Expand Up @@ -259,11 +316,11 @@ protected void doRun() {
}));
}

void waitForTaskRemoved(Set<String> analyticsIds, StopDataFrameAnalyticsAction.Request request,
StopDataFrameAnalyticsAction.Response response,
ActionListener<StopDataFrameAnalyticsAction.Response> listener) {
void waitForTaskRemoved(Set<String> taskIds, StopDataFrameAnalyticsAction.Request request,
StopDataFrameAnalyticsAction.Response response,
ActionListener<StopDataFrameAnalyticsAction.Response> listener) {
persistentTasksService.waitForPersistentTasksCondition(persistentTasks ->
filterPersistentTasks(persistentTasks, analyticsIds).isEmpty(),
persistentTasks.findTasks(MlTasks.DATA_FRAME_ANALYTICS_TASK_NAME, t -> taskIds.contains(t.getId())).isEmpty(),
request.getTimeout(), ActionListener.wrap(
booleanResponse -> {
auditor.info(request.getId(), Messages.DATA_FRAME_ANALYTICS_AUDIT_STOPPED);
Expand All @@ -273,9 +330,58 @@ void waitForTaskRemoved(Set<String> analyticsIds, StopDataFrameAnalyticsAction.R
));
}

private static Collection<PersistentTasksCustomMetaData.PersistentTask<?>> filterPersistentTasks(
PersistentTasksCustomMetaData persistentTasks, Set<String> analyticsIds) {
return persistentTasks.findTasks(MlTasks.DATA_FRAME_ANALYTICS_TASK_NAME,
t -> analyticsIds.contains(MlTasks.dataFrameAnalyticsIdFromTaskId(t.getId())));
// Visible for testing
static class AnalyticsByTaskState {

final List<String> started;
final List<String> stopping;
final List<String> failed;

private AnalyticsByTaskState(List<String> started, List<String> stopping, List<String> failed) {
this.started = Collections.unmodifiableList(started);
this.stopping = Collections.unmodifiableList(stopping);
this.failed = Collections.unmodifiableList(failed);
}

boolean isEmpty() {
return started.isEmpty() && stopping.isEmpty() && failed.isEmpty();
}

List<String> getNonStopped() {
List<String> nonStopped = new ArrayList<>();
nonStopped.addAll(started);
nonStopped.addAll(stopping);
nonStopped.addAll(failed);
return nonStopped;
}

static AnalyticsByTaskState build(Set<String> analyticsIds, PersistentTasksCustomMetaData tasks) {
List<String> started = new ArrayList<>();
List<String> stopping = new ArrayList<>();
List<String> failed = new ArrayList<>();

for (String analyticsId : analyticsIds) {
DataFrameAnalyticsState taskState = MlTasks.getDataFrameAnalyticsState(analyticsId, tasks);
switch (taskState) {
case STARTING:
case STARTED:
case REINDEXING:
case ANALYZING:
started.add(analyticsId);
break;
case STOPPING:
stopping.add(analyticsId);
break;
case STOPPED:
break;
case FAILED:
failed.add(analyticsId);
break;
default:
assert false : "unknown task state " + taskState;
}
}
return new AnalyticsByTaskState(started, stopping, failed);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
Expand All @@ -29,6 +30,7 @@
import org.elasticsearch.index.reindex.ReindexRequest;
import org.elasticsearch.script.Script;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskCancelledException;
import org.elasticsearch.xpack.core.ClientHelper;
import org.elasticsearch.xpack.core.ml.MlStatsIndex;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig;
Expand Down Expand Up @@ -217,7 +219,14 @@ private void reindexDataframeAndStartAnalysis(DataFrameAnalyticsTask task, DataF
reindexResponse.getTook()));
startAnalytics(task, config);
},
error -> task.setFailed(ExceptionsHelper.unwrapCause(error).getMessage())
error -> {
if (error instanceof TaskCancelledException && task.isStopping()) {
LOGGER.debug(new ParameterizedMessage("[{}] Caught task cancelled exception while task is stopping",
config.getId()), error);
} else {
task.setFailed(ExceptionsHelper.unwrapCause(error).getMessage());
}
}
);

// Reindex
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ public StatsHolder getStatsHolder() {
@Override
protected void onCancelled() {
stop(getReasonCancelled(), TimeValue.ZERO);
markAsCompleted();
}

@Override
Expand Down

0 comments on commit 8579eda

Please sign in to comment.