Skip to content

Commit

Permalink
[ML][Transforms] signal listner early on stop failure (#47954)
Browse files Browse the repository at this point in the history
  • Loading branch information
benwtrent committed Oct 14, 2019
1 parent 63f835d commit 46a5164
Show file tree
Hide file tree
Showing 2 changed files with 117 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.elasticsearch.xpack.core.transform.TransformMessages.CANNOT_STOP_FAILED_TRANSFORM;

Expand Down Expand Up @@ -196,6 +198,13 @@ private ActionListener<Response> waitForStopListener(Request request, ActionList
);
return ActionListener.wrap(
response -> {
// If there were failures attempting to stop the tasks, we don't know if they will actually stop.
// It is better to respond to the user now than allow for the persistent task waiting to timeout
if (response.getTaskFailures().isEmpty() == false || response.getNodeFailures().isEmpty() == false) {
RestStatus status = firstNotOKStatus(response.getTaskFailures(), response.getNodeFailures());
listener.onFailure(buildException(response.getTaskFailures(), response.getNodeFailures(), status));
return;
}
// Wait until the persistent task is stopped
// Switch over to Generic threadpool so we don't block the network thread
threadPool.generic().execute(() ->
Expand All @@ -205,6 +214,46 @@ private ActionListener<Response> waitForStopListener(Request request, ActionList
);
}

static ElasticsearchStatusException buildException(List<TaskOperationFailure> taskOperationFailures,
List<ElasticsearchException> elasticsearchExceptions,
RestStatus status) {
List<Exception> exceptions = Stream.concat(
taskOperationFailures.stream().map(TaskOperationFailure::getCause),
elasticsearchExceptions.stream()).collect(Collectors.toList());

ElasticsearchStatusException elasticsearchStatusException =
new ElasticsearchStatusException(exceptions.get(0).getMessage(), status);

for (int i = 1; i < exceptions.size(); i++) {
elasticsearchStatusException.addSuppressed(exceptions.get(i));
}
return elasticsearchStatusException;
}

static RestStatus firstNotOKStatus(List<TaskOperationFailure> taskOperationFailures, List<ElasticsearchException> exceptions) {
RestStatus status = RestStatus.OK;

for (TaskOperationFailure taskOperationFailure : taskOperationFailures) {
status = taskOperationFailure.getStatus();
if (RestStatus.OK.equals(status) == false) {
break;
}
}
if (status == RestStatus.OK) {
for (ElasticsearchException exception : exceptions) {
// As it stands right now, this will ALWAYS be INTERNAL_SERVER_ERROR.
// FailedNodeException does not overwrite the `status()` method and the logic in ElasticsearchException
// Just returns an INTERNAL_SERVER_ERROR
status = exception.status();
if (RestStatus.OK.equals(status) == false) {
break;
}
}
}
// If all the previous exceptions don't have a valid status, we have an unknown error.
return status == RestStatus.OK ? RestStatus.INTERNAL_SERVER_ERROR : status;
}

private void waitForTransformStopped(Set<String> persistentTaskIds,
TimeValue timeout,
boolean force,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,26 @@
*/
package org.elasticsearch.xpack.transform.action;

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.Version;
import org.elasticsearch.action.TaskOperationFailure;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.core.indexing.IndexerState;
import org.elasticsearch.xpack.core.transform.TransformMessages;
import org.elasticsearch.xpack.core.transform.transforms.TransformTaskParams;
import org.elasticsearch.xpack.core.transform.transforms.TransformState;
import org.elasticsearch.xpack.core.transform.transforms.TransformTaskState;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;

import static org.elasticsearch.rest.RestStatus.CONFLICT;
import static org.hamcrest.Matchers.equalTo;
Expand Down Expand Up @@ -91,4 +96,67 @@ public void testTaskStateValidationWithTransformTasks() {
"task has failed")));
}

public void testFirstNotOKStatus() {
List<ElasticsearchException> nodeFailures = new ArrayList<>();
List<TaskOperationFailure> taskOperationFailures = new ArrayList<>();

nodeFailures.add(new ElasticsearchException("nodefailure",
new ElasticsearchStatusException("failure", RestStatus.UNPROCESSABLE_ENTITY)));
taskOperationFailures.add(new TaskOperationFailure("node",
1,
new ElasticsearchStatusException("failure", RestStatus.BAD_REQUEST)));

assertThat(TransportStopTransformAction.firstNotOKStatus(Collections.emptyList(), Collections.emptyList()),
equalTo(RestStatus.INTERNAL_SERVER_ERROR));

assertThat(TransportStopTransformAction.firstNotOKStatus(taskOperationFailures, Collections.emptyList()),
equalTo(RestStatus.BAD_REQUEST));
assertThat(TransportStopTransformAction.firstNotOKStatus(taskOperationFailures, nodeFailures),
equalTo(RestStatus.BAD_REQUEST));
assertThat(TransportStopTransformAction.firstNotOKStatus(taskOperationFailures,
Collections.singletonList(new ElasticsearchException(new ElasticsearchStatusException("not failure", RestStatus.OK)))),
equalTo(RestStatus.BAD_REQUEST));

assertThat(TransportStopTransformAction.firstNotOKStatus(
Collections.singletonList(new TaskOperationFailure(
"node",
1,
new ElasticsearchStatusException("not failure", RestStatus.OK))),
nodeFailures),
equalTo(RestStatus.INTERNAL_SERVER_ERROR));

assertThat(TransportStopTransformAction.firstNotOKStatus(
Collections.emptyList(),
nodeFailures),
equalTo(RestStatus.INTERNAL_SERVER_ERROR));
}

public void testBuildException() {
List<ElasticsearchException> nodeFailures = new ArrayList<>();
List<TaskOperationFailure> taskOperationFailures = new ArrayList<>();

nodeFailures.add(new ElasticsearchException("node failure"));
taskOperationFailures.add(new TaskOperationFailure("node",
1,
new ElasticsearchStatusException("task failure", RestStatus.BAD_REQUEST)));

RestStatus status = CONFLICT;
ElasticsearchStatusException statusException =
TransportStopTransformAction.buildException(taskOperationFailures, nodeFailures, status);

assertThat(statusException.status(), equalTo(status));
assertThat(statusException.getMessage(), equalTo(taskOperationFailures.get(0).getCause().getMessage()));
assertThat(statusException.getSuppressed().length, equalTo(1));

statusException = TransportStopTransformAction.buildException(Collections.emptyList(), nodeFailures, status);
assertThat(statusException.status(), equalTo(status));
assertThat(statusException.getMessage(), equalTo(nodeFailures.get(0).getMessage()));
assertThat(statusException.getSuppressed().length, equalTo(0));

statusException = TransportStopTransformAction.buildException(taskOperationFailures, Collections.emptyList(), status);
assertThat(statusException.status(), equalTo(status));
assertThat(statusException.getMessage(), equalTo(taskOperationFailures.get(0).getCause().getMessage()));
assertThat(statusException.getSuppressed().length, equalTo(0));
}

}

0 comments on commit 46a5164

Please sign in to comment.