diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/DataFrameRequestConverters.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/DataFrameRequestConverters.java index ab8c79a1784ba..08cec05db8718 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/DataFrameRequestConverters.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/DataFrameRequestConverters.java @@ -41,6 +41,7 @@ import static org.elasticsearch.client.transform.DeleteDataFrameTransformRequest.FORCE; import static org.elasticsearch.client.transform.GetDataFrameTransformRequest.ALLOW_NO_MATCH; import static org.elasticsearch.client.transform.PutDataFrameTransformRequest.DEFER_VALIDATION; +import static org.elasticsearch.client.transform.StopDataFrameTransformRequest.WAIT_FOR_CHECKPOINT; final class DataFrameRequestConverters { @@ -135,6 +136,9 @@ static Request stopDataFrameTransform(StopDataFrameTransformRequest stopRequest) if (stopRequest.getAllowNoMatch() != null) { request.addParameter(ALLOW_NO_MATCH, stopRequest.getAllowNoMatch().toString()); } + if (stopRequest.getWaitForCheckpoint() != null) { + request.addParameter(WAIT_FOR_CHECKPOINT, stopRequest.getWaitForCheckpoint().toString()); + } request.addParameters(params.asMap()); return request; } diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/transform/StopDataFrameTransformRequest.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/transform/StopDataFrameTransformRequest.java index 3a662c2caec26..5f2745dbe2a48 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/transform/StopDataFrameTransformRequest.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/transform/StopDataFrameTransformRequest.java @@ -28,21 +28,23 @@ public class StopDataFrameTransformRequest implements Validatable { + public static final String WAIT_FOR_CHECKPOINT = "wait_for_checkpoint"; + private final String id; private Boolean waitForCompletion; private TimeValue timeout; private Boolean allowNoMatch; + private Boolean waitForCheckpoint; public StopDataFrameTransformRequest(String id) { - this.id = id; - waitForCompletion = null; - timeout = null; + this(id, null, null, null); } - public StopDataFrameTransformRequest(String id, Boolean waitForCompletion, TimeValue timeout) { + public StopDataFrameTransformRequest(String id, Boolean waitForCompletion, TimeValue timeout, Boolean waitForCheckpoint) { this.id = id; this.waitForCompletion = waitForCompletion; this.timeout = timeout; + this.waitForCheckpoint = waitForCheckpoint; } public String getId() { @@ -73,6 +75,14 @@ public void setAllowNoMatch(Boolean allowNoMatch) { this.allowNoMatch = allowNoMatch; } + public Boolean getWaitForCheckpoint() { + return waitForCheckpoint; + } + + public void setWaitForCheckpoint(Boolean waitForCheckpoint) { + this.waitForCheckpoint = waitForCheckpoint; + } + @Override public Optional validate() { if (id == null) { @@ -86,7 +96,7 @@ public Optional validate() { @Override public int hashCode() { - return Objects.hash(id, waitForCompletion, timeout, allowNoMatch); + return Objects.hash(id, waitForCompletion, timeout, allowNoMatch, waitForCheckpoint); } @Override @@ -102,7 +112,8 @@ public boolean equals(Object obj) { return Objects.equals(this.id, other.id) && Objects.equals(this.waitForCompletion, other.waitForCompletion) && Objects.equals(this.timeout, other.timeout) - && Objects.equals(this.allowNoMatch, other.allowNoMatch); + && Objects.equals(this.allowNoMatch, other.allowNoMatch) + && Objects.equals(this.waitForCheckpoint, other.waitForCheckpoint); } } diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/DataFrameRequestConvertersTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/DataFrameRequestConvertersTests.java index 4c4b1d8f709ba..ede167d581c18 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/DataFrameRequestConvertersTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/DataFrameRequestConvertersTests.java @@ -149,7 +149,13 @@ public void testStopDataFrameTransform() { if (randomBoolean()) { timeValue = TimeValue.parseTimeValue(randomTimeValue(), "timeout"); } - StopDataFrameTransformRequest stopRequest = new StopDataFrameTransformRequest(id, waitForCompletion, timeValue); + + Boolean waitForCheckpoint = null; + if (randomBoolean()) { + waitForCheckpoint = randomBoolean(); + } + + StopDataFrameTransformRequest stopRequest = new StopDataFrameTransformRequest(id, waitForCompletion, timeValue, waitForCheckpoint); Request request = DataFrameRequestConverters.stopDataFrameTransform(stopRequest); assertEquals(HttpPost.METHOD_NAME, request.getMethod()); @@ -169,6 +175,13 @@ public void testStopDataFrameTransform() { assertFalse(request.getParameters().containsKey("timeout")); } + if (waitForCheckpoint != null) { + assertTrue(request.getParameters().containsKey("wait_for_checkpoint")); + assertEquals(stopRequest.getWaitForCheckpoint(), Boolean.parseBoolean(request.getParameters().get("wait_for_checkpoint"))); + } else { + assertFalse(request.getParameters().containsKey("wait_for_checkpoint")); + } + assertFalse(request.getParameters().containsKey(ALLOW_NO_MATCH)); stopRequest.setAllowNoMatch(randomBoolean()); request = DataFrameRequestConverters.stopDataFrameTransform(stopRequest); diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/DataFrameTransformIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/DataFrameTransformIT.java index b532b877fa957..658d379eddc26 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/DataFrameTransformIT.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/DataFrameTransformIT.java @@ -147,7 +147,7 @@ private void indexData(String indexName) throws IOException { public void cleanUpTransforms() throws Exception { for (String transformId : transformsToClean) { highLevelClient().dataFrame().stopDataFrameTransform( - new StopDataFrameTransformRequest(transformId, Boolean.TRUE, null), RequestOptions.DEFAULT); + new StopDataFrameTransformRequest(transformId, true, null, false), RequestOptions.DEFAULT); } for (String transformId : transformsToClean) { @@ -310,7 +310,7 @@ public void testStartStop() throws IOException { assertThat(taskState, oneOf(DataFrameTransformStats.State.STARTED, DataFrameTransformStats.State.INDEXING, DataFrameTransformStats.State.STOPPING, DataFrameTransformStats.State.STOPPED)); - StopDataFrameTransformRequest stopRequest = new StopDataFrameTransformRequest(id, Boolean.TRUE, null); + StopDataFrameTransformRequest stopRequest = new StopDataFrameTransformRequest(id, true, null, false); StopDataFrameTransformResponse stopResponse = execute(stopRequest, client::stopDataFrameTransform, client::stopDataFrameTransformAsync); assertTrue(stopResponse.isAcknowledged()); diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/DataFrameTransformDocumentationIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/DataFrameTransformDocumentationIT.java index d4cef221401d6..49fef176b1a4a 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/DataFrameTransformDocumentationIT.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/DataFrameTransformDocumentationIT.java @@ -81,7 +81,7 @@ public class DataFrameTransformDocumentationIT extends ESRestHighLevelClientTest public void cleanUpTransforms() throws Exception { for (String transformId : transformsToClean) { highLevelClient().dataFrame().stopDataFrameTransform( - new StopDataFrameTransformRequest(transformId, Boolean.TRUE, TimeValue.timeValueSeconds(20)), RequestOptions.DEFAULT); + new StopDataFrameTransformRequest(transformId, true, TimeValue.timeValueSeconds(20), false), RequestOptions.DEFAULT); } for (String transformId : transformsToClean) { diff --git a/docs/reference/data-frames/apis/stop-transform.asciidoc b/docs/reference/data-frames/apis/stop-transform.asciidoc index 3ce9c56a25425..a55063b3db3fa 100644 --- a/docs/reference/data-frames/apis/stop-transform.asciidoc +++ b/docs/reference/data-frames/apis/stop-transform.asciidoc @@ -81,6 +81,11 @@ are no matches or only partial matches. state completely stops. If set to `false`, the API returns immediately and the indexer will be stopped asynchronously in the background. Defaults to `false`. +`wait_for_checkpoint`:: + (Optional, boolean) If set to `true`, the transform will not completely stop + until the current checkpoint is completed. If set to `false`, the transform + stops as soon as possible. Defaults to `true`. + [[stop-data-frame-transform-response-codes]] ==== {api-response-codes-title} diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/TransformField.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/TransformField.java index 54abc9799e354..8721a014a3606 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/TransformField.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/TransformField.java @@ -22,6 +22,7 @@ public final class TransformField { public static final ParseField GROUP_BY = new ParseField("group_by"); public static final ParseField TIMEOUT = new ParseField("timeout"); public static final ParseField WAIT_FOR_COMPLETION = new ParseField("wait_for_completion"); + public static final ParseField WAIT_FOR_CHECKPOINT = new ParseField("wait_for_checkpoint"); public static final ParseField STATS_FIELD = new ParseField("stats"); public static final ParseField INDEX_DOC_TYPE = new ParseField("doc_type"); public static final ParseField SOURCE = new ParseField("source"); diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/action/StopTransformAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/action/StopTransformAction.java index 4fd3ce7f54de8..dfd9ec35832a1 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/action/StopTransformAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/action/StopTransformAction.java @@ -46,11 +46,17 @@ private StopTransformAction() { public static class Request extends BaseTasksRequest { private final String id; private final boolean waitForCompletion; + private final boolean waitForCheckpoint; private final boolean force; private final boolean allowNoMatch; private Set expandedIds; - public Request(String id, boolean waitForCompletion, boolean force, @Nullable TimeValue timeout, boolean allowNoMatch) { + public Request(String id, + boolean waitForCompletion, + boolean force, + @Nullable TimeValue timeout, + boolean allowNoMatch, + boolean waitForCheckpoint) { this.id = ExceptionsHelper.requireNonNull(id, TransformField.ID.getPreferredName()); this.waitForCompletion = waitForCompletion; this.force = force; @@ -58,6 +64,7 @@ public Request(String id, boolean waitForCompletion, boolean force, @Nullable Ti // use the timeout value already present in BaseTasksRequest this.setTimeout(timeout == null ? DEFAULT_TIMEOUT : timeout); this.allowNoMatch = allowNoMatch; + this.waitForCheckpoint = waitForCheckpoint; } public Request(StreamInput in) throws IOException { @@ -73,6 +80,11 @@ public Request(StreamInput in) throws IOException { } else { this.allowNoMatch = true; } + if (in.getVersion().onOrAfter(Version.V_8_0_0)) { + this.waitForCheckpoint = in.readBoolean(); + } else { + this.waitForCheckpoint = true; + } } public String getId() { @@ -99,6 +111,10 @@ public boolean isAllowNoMatch() { return allowNoMatch; } + public boolean isWaitForCheckpoint() { + return waitForCheckpoint; + } + @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); @@ -113,6 +129,9 @@ public void writeTo(StreamOutput out) throws IOException { if (out.getVersion().onOrAfter(Version.V_7_3_0)) { out.writeBoolean(allowNoMatch); } + if (out.getVersion().onOrAfter(Version.V_8_0_0)) { + out.writeBoolean(waitForCheckpoint); + } } @Override @@ -123,7 +142,7 @@ public ActionRequestValidationException validate() { @Override public int hashCode() { // the base class does not implement hashCode, therefore we need to hash timeout ourselves - return Objects.hash(id, waitForCompletion, force, expandedIds, this.getTimeout(), allowNoMatch); + return Objects.hash(id, waitForCompletion, force, expandedIds, this.getTimeout(), allowNoMatch, waitForCheckpoint); } @Override @@ -146,7 +165,8 @@ public boolean equals(Object obj) { Objects.equals(waitForCompletion, other.waitForCompletion) && Objects.equals(force, other.force) && Objects.equals(expandedIds, other.expandedIds) && - allowNoMatch == other.allowNoMatch; + allowNoMatch == other.allowNoMatch && + waitForCheckpoint == other.waitForCheckpoint; } @Override @@ -157,7 +177,6 @@ public boolean match(Task task) { return expandedIds.contains(id); } } - return false; } } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformState.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformState.java index 29ae1fe3968c4..d98a08f5e8461 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformState.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformState.java @@ -43,6 +43,9 @@ public class TransformState implements Task.Status, PersistentTaskState { @Nullable private NodeAttributes node; + // TODO: 8.x this needs to be deprecated and we move towards a STOPPING TASK_STATE + private final boolean shouldStopAtNextCheckpoint; + public static final ParseField TASK_STATE = new ParseField("task_state"); public static final ParseField INDEXER_STATE = new ParseField("indexer_state"); @@ -53,6 +56,8 @@ public class TransformState implements Task.Status, PersistentTaskState { public static final ParseField REASON = new ParseField("reason"); public static final ParseField PROGRESS = new ParseField("progress"); public static final ParseField NODE = new ParseField("node"); + public static final ParseField SHOULD_STOP_AT_NEXT_CHECKPOINT = new ParseField("should_stop_at_checkpoint"); + @SuppressWarnings("unchecked") public static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>(NAME, @@ -72,8 +77,16 @@ public class TransformState implements Task.Status, PersistentTaskState { String reason = (String) args[5]; TransformProgress progress = (TransformProgress) args[6]; NodeAttributes node = (NodeAttributes) args[7]; - - return new TransformState(taskState, indexerState, transformIndexerPosition, checkpoint, reason, progress, node); + boolean shouldStopAtNextCheckpoint = args[8] == null ? false : (boolean)args[8]; + + return new TransformState(taskState, + indexerState, + transformIndexerPosition, + checkpoint, + reason, + progress, + node, + shouldStopAtNextCheckpoint); }); static { @@ -85,6 +98,7 @@ public class TransformState implements Task.Status, PersistentTaskState { PARSER.declareString(optionalConstructorArg(), REASON); PARSER.declareField(optionalConstructorArg(), TransformProgress.PARSER::apply, PROGRESS, ValueType.OBJECT); PARSER.declareField(optionalConstructorArg(), NodeAttributes.PARSER::apply, NODE, ValueType.OBJECT); + PARSER.declareBoolean(optionalConstructorArg(), SHOULD_STOP_AT_NEXT_CHECKPOINT); } public TransformState(TransformTaskState taskState, @@ -93,7 +107,8 @@ public TransformState(TransformTaskState taskState, long checkpoint, @Nullable String reason, @Nullable TransformProgress progress, - @Nullable NodeAttributes node) { + @Nullable NodeAttributes node, + boolean shouldStopAtNextCheckpoint) { this.taskState = taskState; this.indexerState = indexerState; this.position = position; @@ -101,6 +116,17 @@ public TransformState(TransformTaskState taskState, this.reason = reason; this.progress = progress; this.node = node; + this.shouldStopAtNextCheckpoint = shouldStopAtNextCheckpoint; + } + + public TransformState(TransformTaskState taskState, + IndexerState indexerState, + @Nullable TransformIndexerPosition position, + long checkpoint, + @Nullable String reason, + @Nullable TransformProgress progress, + @Nullable NodeAttributes node) { + this(taskState, indexerState, position, checkpoint, reason, progress, node, false); } public TransformState(TransformTaskState taskState, @@ -129,6 +155,11 @@ public TransformState(StreamInput in) throws IOException { } else { node = null; } + if (in.getVersion().onOrAfter(Version.V_8_0_0)) { + shouldStopAtNextCheckpoint = in.readBoolean(); + } else { + shouldStopAtNextCheckpoint = false; + } } public TransformTaskState getTaskState() { @@ -164,6 +195,10 @@ public TransformState setNode(NodeAttributes node) { return this; } + public boolean shouldStopAtNextCheckpoint() { + return shouldStopAtNextCheckpoint; + } + public static TransformState fromXContent(XContentParser parser) { try { return PARSER.parse(parser, null); @@ -190,6 +225,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws if (node != null) { builder.field(NODE.getPreferredName(), node); } + builder.field(SHOULD_STOP_AT_NEXT_CHECKPOINT.getPreferredName(), shouldStopAtNextCheckpoint); builder.endObject(); return builder; } @@ -214,6 +250,9 @@ public void writeTo(StreamOutput out) throws IOException { if (out.getVersion().onOrAfter(Version.V_7_3_0)) { out.writeOptionalWriteable(node); } + if (out.getVersion().onOrAfter(Version.V_8_0_0)) { + out.writeBoolean(shouldStopAtNextCheckpoint); + } } @Override @@ -234,12 +273,13 @@ public boolean equals(Object other) { this.checkpoint == that.checkpoint && Objects.equals(this.reason, that.reason) && Objects.equals(this.progress, that.progress) && + Objects.equals(this.shouldStopAtNextCheckpoint, that.shouldStopAtNextCheckpoint) && Objects.equals(this.node, that.node); } @Override public int hashCode() { - return Objects.hash(taskState, indexerState, position, checkpoint, reason, progress, node); + return Objects.hash(taskState, indexerState, position, checkpoint, reason, progress, node, shouldStopAtNextCheckpoint); } @Override diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformTaskState.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformTaskState.java index e807dafc8b435..0049f4a6272a8 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformTaskState.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformTaskState.java @@ -14,6 +14,7 @@ import java.util.Locale; public enum TransformTaskState implements Writeable { + // TODO 8.x add a `STOPPING` state and BWC handling in ::fromString STOPPED, STARTED, FAILED; public static TransformTaskState fromString(String name) { diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/action/StopTransformActionRequestTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/action/StopTransformActionRequestTests.java index 40852fdf34e89..a76cdb28f32a0 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/action/StopTransformActionRequestTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/action/StopTransformActionRequestTests.java @@ -24,7 +24,12 @@ public class StopTransformActionRequestTests extends AbstractWireSerializingTest @Override protected Request createTestInstance() { TimeValue timeout = randomBoolean() ? TimeValue.timeValueMinutes(randomIntBetween(1, 10)) : null; - Request request = new Request(randomAlphaOfLengthBetween(1, 10), randomBoolean(), randomBoolean(), timeout, randomBoolean()); + Request request = new Request(randomAlphaOfLengthBetween(1, 10), + randomBoolean(), + randomBoolean(), + timeout, + randomBoolean(), + randomBoolean()); if (randomBoolean()) { request.setExpandedIds(new HashSet<>(Arrays.asList(generateRandomStringArray(5, 6, false)))); } @@ -41,9 +46,10 @@ public void testSameButDifferentTimeout() { boolean waitForCompletion = randomBoolean(); boolean force = randomBoolean(); boolean allowNoMatch = randomBoolean(); + boolean waitForCheckpoint = randomBoolean(); - Request r1 = new Request(id, waitForCompletion, force, TimeValue.timeValueSeconds(10), allowNoMatch); - Request r2 = new Request(id, waitForCompletion, force, TimeValue.timeValueSeconds(20), allowNoMatch); + Request r1 = new Request(id, waitForCompletion, force, TimeValue.timeValueSeconds(10), allowNoMatch, waitForCheckpoint); + Request r2 = new Request(id, waitForCompletion, force, TimeValue.timeValueSeconds(20), allowNoMatch, waitForCheckpoint); assertNotEquals(r1,r2); assertNotEquals(r1.hashCode(),r2.hashCode()); @@ -56,11 +62,11 @@ public void testMatch() { TransformField.PERSISTENT_TASK_DESCRIPTION_PREFIX + dataFrameId, TaskId.EMPTY_TASK_ID, Collections.emptyMap()); - Request request = new Request("unrelated", false, false, null, false); + Request request = new Request("unrelated", false, false, null, false, false); request.setExpandedIds(Set.of("foo", "bar")); assertFalse(request.match(dataFrameTask)); - Request matchingRequest = new Request(dataFrameId, false, false, null, false); + Request matchingRequest = new Request(dataFrameId, false, false, null, false, false); matchingRequest.setExpandedIds(Set.of(dataFrameId)); assertTrue(matchingRequest.match(dataFrameTask)); diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/TransformStateTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/TransformStateTests.java index 7836505557541..7b849b757fab6 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/TransformStateTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/TransformStateTests.java @@ -6,6 +6,9 @@ package org.elasticsearch.xpack.core.transform.transforms; +import org.elasticsearch.Version; +import org.elasticsearch.common.io.stream.BytesStreamOutput; +import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.Writeable.Reader; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.test.AbstractSerializingTestCase; @@ -26,7 +29,8 @@ public static TransformState randomDataFrameTransformState() { randomLongBetween(0,10), randomBoolean() ? null : randomAlphaOfLength(10), randomBoolean() ? null : randomDataFrameTransformProgress(), - randomBoolean() ? null : randomNodeAttributes()); + randomBoolean() ? null : randomNodeAttributes(), + randomBoolean()); } @Override @@ -53,4 +57,24 @@ protected boolean supportsUnknownFields() { protected Predicate getRandomFieldsExcludeFilter() { return field -> !field.isEmpty(); } + + public void testBackwardsSerialization() throws IOException { + TransformState state = new TransformState(randomFrom(TransformTaskState.values()), + randomFrom(IndexerState.values()), + TransformIndexerPositionTests.randomDataFrameIndexerPosition(), + randomLongBetween(0,10), + randomBoolean() ? null : randomAlphaOfLength(10), + randomBoolean() ? null : randomDataFrameTransformProgress(), + randomBoolean() ? null : randomNodeAttributes(), + false); // Will be false after BWC deserialization + try (BytesStreamOutput output = new BytesStreamOutput()) { + output.setVersion(Version.V_7_4_0); + state.writeTo(output); + try (StreamInput in = output.bytes().streamInput()) { + in.setVersion(Version.V_7_4_0); + TransformState streamedState = new TransformState(in); + assertEquals(state, streamedState); + } + } + } } diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/api/data_frame.stop_data_frame_transform.json b/x-pack/plugin/src/test/resources/rest-api-spec/api/data_frame.stop_data_frame_transform.json index af0c35b156390..7dfed1e80d6f8 100644 --- a/x-pack/plugin/src/test/resources/rest-api-spec/api/data_frame.stop_data_frame_transform.json +++ b/x-pack/plugin/src/test/resources/rest-api-spec/api/data_frame.stop_data_frame_transform.json @@ -35,6 +35,11 @@ "type":"boolean", "required":false, "description":"Whether to ignore if a wildcard expression matches no data frame transforms. (This includes `_all` string or when no data frame transforms have been specified)" + }, + "wait_for_checkpoint": { + "type":"boolean", + "required":false, + "description":"Whether to wait for the transform to reach a checkpoint before stopping. Default to true" } } } diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_start_stop.yml b/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_start_stop.yml index 199146a8f0d1c..37a201639994f 100644 --- a/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_start_stop.yml +++ b/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_start_stop.yml @@ -33,6 +33,7 @@ setup: teardown: - do: data_frame.stop_data_frame_transform: + wait_for_checkpoint: false transform_id: "airline-transform-start-stop" timeout: "10m" wait_for_completion: true @@ -107,6 +108,7 @@ teardown: - do: data_frame.stop_data_frame_transform: + wait_for_checkpoint: false transform_id: "airline-transform-start-stop" wait_for_completion: true - match: { acknowledged: true } @@ -164,6 +166,7 @@ teardown: - do: data_frame.stop_data_frame_transform: + wait_for_checkpoint: false transform_id: "airline-transform-start-stop-continuous" wait_for_completion: true - match: { acknowledged: true } @@ -189,6 +192,7 @@ teardown: - do: data_frame.stop_data_frame_transform: + wait_for_checkpoint: false transform_id: "airline-transform-start-stop-continuous" wait_for_completion: true - match: { acknowledged: true } @@ -201,18 +205,21 @@ teardown: - do: catch: missing data_frame.stop_data_frame_transform: + wait_for_checkpoint: false transform_id: "missing-transform" --- "Test stop missing transform by expression": - do: data_frame.stop_data_frame_transform: + wait_for_checkpoint: false allow_no_match: true transform_id: "missing-transform*" - do: catch: missing data_frame.stop_data_frame_transform: + wait_for_checkpoint: false allow_no_match: false transform_id: "missing-transform*" @@ -220,6 +227,7 @@ teardown: "Test stop already stopped transform": - do: data_frame.stop_data_frame_transform: + wait_for_checkpoint: false transform_id: "airline-transform-start-stop" - match: { acknowledged: true } @@ -263,6 +271,7 @@ teardown: - do: data_frame.stop_data_frame_transform: + wait_for_checkpoint: false transform_id: "airline-transform-start-stop" wait_for_completion: true - match: { acknowledged: true } @@ -277,6 +286,7 @@ teardown: - do: data_frame.stop_data_frame_transform: + wait_for_checkpoint: false transform_id: "airline-transform-start-later" wait_for_completion: true - match: { acknowledged: true } @@ -311,6 +321,7 @@ teardown: - do: data_frame.stop_data_frame_transform: + wait_for_checkpoint: false transform_id: "_all" wait_for_completion: true - match: { acknowledged: true } diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_stats.yml b/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_stats.yml index b4699898d4833..f01079b848874 100644 --- a/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_stats.yml +++ b/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_stats.yml @@ -33,6 +33,7 @@ setup: teardown: - do: data_frame.stop_data_frame_transform: + wait_for_checkpoint: false transform_id: "airline-transform-stats" wait_for_completion: true @@ -252,6 +253,7 @@ teardown: - do: data_frame.stop_data_frame_transform: + wait_for_checkpoint: false transform_id: "airline-transform-stats-continuous" wait_for_completion: true diff --git a/x-pack/plugin/transform/qa/multi-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameIntegTestCase.java b/x-pack/plugin/transform/qa/multi-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameIntegTestCase.java index 479926ea992bb..fdf3a47637338 100644 --- a/x-pack/plugin/transform/qa/multi-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameIntegTestCase.java +++ b/x-pack/plugin/transform/qa/multi-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameIntegTestCase.java @@ -86,8 +86,17 @@ protected void cleanUpTransforms() throws IOException { } protected StopDataFrameTransformResponse stopDataFrameTransform(String id) throws IOException { + return stopDataFrameTransform(id, true, null, false); + } + + protected StopDataFrameTransformResponse stopDataFrameTransform(String id, + boolean waitForCompletion, + TimeValue timeout, + boolean waitForCheckpoint) throws IOException { RestHighLevelClient restClient = new TestRestHighLevelClient(); - return restClient.dataFrame().stopDataFrameTransform(new StopDataFrameTransformRequest(id, true, null), RequestOptions.DEFAULT); + return restClient.dataFrame() + .stopDataFrameTransform(new StopDataFrameTransformRequest(id, waitForCompletion, timeout, waitForCheckpoint), + RequestOptions.DEFAULT); } protected StartDataFrameTransformResponse startDataFrameTransform(String id, RequestOptions options) throws IOException { @@ -298,7 +307,7 @@ protected void createReviewsIndex(String indexName, int numDocs) throws Exceptio .append("\"}"); bulk.add(new IndexRequest().source(sourceBuilder.toString(), XContentType.JSON)); - if (i % 50 == 0) { + if (i % 100 == 0) { BulkResponse response = restClient.bulk(bulk, RequestOptions.DEFAULT); assertThat(response.buildFailureMessage(), response.hasFailures(), is(false)); bulk = new BulkRequest(indexName); diff --git a/x-pack/plugin/transform/qa/multi-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameTransformIT.java b/x-pack/plugin/transform/qa/multi-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameTransformIT.java index 7cdb53515342d..ca21ce20faa01 100644 --- a/x-pack/plugin/transform/qa/multi-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameTransformIT.java +++ b/x-pack/plugin/transform/qa/multi-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameTransformIT.java @@ -235,6 +235,49 @@ public void testContinuousDataFrameTransformUpdate() throws Exception { deleteDataFrameTransform(config.getId()); } + public void testStopWaitForCheckpoint() throws Exception { + String indexName = "wait-for-checkpoint-reviews"; + String transformId = "data-frame-transform-wait-for-checkpoint"; + createReviewsIndex(indexName, 1000); + + Map groups = new HashMap<>(); + groups.put("by-day", createDateHistogramGroupSourceWithCalendarInterval("timestamp", DateHistogramInterval.DAY, null)); + groups.put("by-user", TermsGroupSource.builder().setField("user_id").build()); + groups.put("by-business", TermsGroupSource.builder().setField("business_id").build()); + + AggregatorFactories.Builder aggs = AggregatorFactories.builder() + .addAggregator(AggregationBuilders.avg("review_score").field("stars")) + .addAggregator(AggregationBuilders.max("timestamp").field("timestamp")); + + DataFrameTransformConfig config = createTransformConfigBuilder(transformId, + groups, + aggs, + "reviews-by-user-business-day", + QueryBuilders.matchAllQuery(), + indexName) + .setSyncConfig(new TimeSyncConfig("timestamp", TimeValue.timeValueSeconds(1))) + .build(); + + assertTrue(putDataFrameTransform(config, RequestOptions.DEFAULT).isAcknowledged()); + assertTrue(startDataFrameTransform(config.getId(), RequestOptions.DEFAULT).isAcknowledged()); + + // waitForCheckpoint: true should make the transform continue until we hit the first checkpoint, then it will stop + stopDataFrameTransform(transformId, false, null, true); + + // Wait until the first checkpoint + waitUntilCheckpoint(config.getId(), 1L); + + // Even though we are continuous, we should be stopped now as we needed to stop at the first checkpoint + assertBusy(() -> { + DataFrameTransformStats stateAndStats = getDataFrameTransformStats(config.getId()).getTransformsStats().get(0); + assertThat(stateAndStats.getState(), equalTo(DataFrameTransformStats.State.STOPPED)); + assertThat(stateAndStats.getIndexerStats().getNumDocuments(), equalTo(1000L)); + }); + + stopDataFrameTransform(config.getId()); + deleteDataFrameTransform(config.getId()); + } + private void indexMoreDocs(long timestamp, long userId, String index) throws Exception { BulkRequest bulk = new BulkRequest(index); for (int i = 0; i < 25; i++) { diff --git a/x-pack/plugin/transform/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFramePivotRestIT.java b/x-pack/plugin/transform/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFramePivotRestIT.java index aca70d146fa3b..4b7d90c9b7ca1 100644 --- a/x-pack/plugin/transform/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFramePivotRestIT.java +++ b/x-pack/plugin/transform/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFramePivotRestIT.java @@ -7,6 +7,7 @@ package org.elasticsearch.xpack.dataframe.integration; import org.elasticsearch.client.Request; +import org.elasticsearch.client.ResponseException; import org.elasticsearch.common.xcontent.support.XContentMapValues; import org.junit.Before; @@ -857,6 +858,60 @@ public void testManyBucketsWithSmallPageSize() throws Exception { assertEquals(101, ((List)XContentMapValues.extractValue("transforms.stats.pages_processed", stats)).get(0)); } + public void testContinuousStopWaitForCheckpoint() throws Exception { + Request updateLoggingLevels = new Request("PUT", "/_cluster/settings"); + updateLoggingLevels.setJsonEntity( + "{\"transient\": {" + + "\"logger.org.elasticsearch.xpack.core.indexing.AsyncTwoPhaseIndexer\": \"trace\"," + + "\"logger.org.elasticsearch.xpack.dataframe\": \"trace\"}}"); + client().performRequest(updateLoggingLevels); + String indexName = "continuous_reviews_wait_for_checkpoint"; + createReviewsIndex(indexName); + String transformId = "simple_continuous_pivot_wait_for_checkpoint"; + String dataFrameIndex = "pivot_reviews_continuous_wait_for_checkpoint"; + setupDataAccessRole(DATA_ACCESS_ROLE, indexName, dataFrameIndex); + final Request createDataframeTransformRequest = createRequestWithAuth("PUT", DATAFRAME_ENDPOINT + transformId, + BASIC_AUTH_VALUE_DATA_FRAME_ADMIN_WITH_SOME_DATA_ACCESS); + String config = "{" + + " \"source\": {\"index\":\"" + indexName + "\"}," + + " \"dest\": {\"index\":\"" + dataFrameIndex + "\"}," + + " \"frequency\": \"1s\"," + + " \"sync\": {\"time\": {\"field\": \"timestamp\", \"delay\": \"1s\"}}," + + " \"pivot\": {" + + " \"group_by\": {" + + " \"reviewer\": {" + + " \"terms\": {" + + " \"field\": \"user_id\"" + + " } } }," + + " \"aggregations\": {" + + " \"avg_rating\": {" + + " \"avg\": {" + + " \"field\": \"stars\"" + + " } } } }" + + "}"; + createDataframeTransformRequest.setJsonEntity(config); + Map createDataframeTransformResponse = entityAsMap(client().performRequest(createDataframeTransformRequest)); + assertThat(createDataframeTransformResponse.get("acknowledged"), equalTo(Boolean.TRUE)); + + startAndWaitForContinuousTransform(transformId, dataFrameIndex, null); + assertTrue(indexExists(dataFrameIndex)); + assertBusy(() -> { + try { + stopDataFrameTransform(transformId, false, true); + } catch (ResponseException e) { + // We get a conflict sometimes depending on WHEN we try to write the state, should eventually pass though + assertThat(e.getResponse().getStatusLine().getStatusCode(), equalTo(200)); + } + }); + + // get and check some users + assertOnePivotValue(dataFrameIndex + "/_search?q=reviewer:user_0", 3.776978417); + assertOnePivotValue(dataFrameIndex + "/_search?q=reviewer:user_5", 3.72); + assertOnePivotValue(dataFrameIndex + "/_search?q=reviewer:user_11", 3.846153846); + assertOnePivotValue(dataFrameIndex + "/_search?q=reviewer:user_20", 3.769230769); + assertOnePivotValue(dataFrameIndex + "/_search?q=reviewer:user_26", 3.918918918); + } + private void assertOnePivotValue(String query, double expected) throws IOException { Map searchResult = getAsMap(query); diff --git a/x-pack/plugin/transform/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameRestTestCase.java b/x-pack/plugin/transform/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameRestTestCase.java index f0e4e5fc6867f..fc9c95f599be2 100644 --- a/x-pack/plugin/transform/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameRestTestCase.java +++ b/x-pack/plugin/transform/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameRestTestCase.java @@ -240,10 +240,14 @@ protected void startDataframeTransform(String transformId, boolean force, String } protected void stopDataFrameTransform(String transformId, boolean force) throws Exception { - // start the transform + stopDataFrameTransform(transformId, force, false); + } + + protected void stopDataFrameTransform(String transformId, boolean force, boolean waitForCheckpoint) throws Exception { final Request stopTransformRequest = createRequestWithAuth("POST", DATAFRAME_ENDPOINT + transformId + "/_stop", null); stopTransformRequest.addParameter(TransformField.FORCE.getPreferredName(), Boolean.toString(force)); stopTransformRequest.addParameter(TransformField.WAIT_FOR_COMPLETION.getPreferredName(), Boolean.toString(true)); + stopTransformRequest.addParameter(TransformField.WAIT_FOR_CHECKPOINT.getPreferredName(), Boolean.toString(waitForCheckpoint)); Map stopTransformResponse = entityAsMap(client().performRequest(stopTransformRequest)); assertThat(stopTransformResponse.get("acknowledged"), equalTo(Boolean.TRUE)); } diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportDeleteDataFrameTransformAction.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportDeleteDataFrameTransformAction.java index 60740e0fe35b2..870c535f3c97b 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportDeleteDataFrameTransformAction.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportDeleteDataFrameTransformAction.java @@ -85,7 +85,7 @@ protected void masterOperation(Task task, Request request, ClusterState state, executeAsyncWithOrigin(client, DATA_FRAME_ORIGIN, StopTransformAction.INSTANCE, - new StopTransformAction.Request(request.getId(), true, true, null, true), + new StopTransformAction.Request(request.getId(), true, true, null, true, false), ActionListener.wrap( r -> stopTransformActionListener.onResponse(null), stopTransformActionListener::onFailure)); diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportStopDataFrameTransformAction.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportStopDataFrameTransformAction.java index 412a23038dee7..1f60cafc3ab94 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportStopDataFrameTransformAction.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportStopDataFrameTransformAction.java @@ -46,6 +46,8 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; +import java.util.stream.Stream; import static org.elasticsearch.xpack.core.transform.TransformMessages.DATA_FRAME_CANNOT_STOP_FAILED_TRANSFORM; @@ -134,7 +136,6 @@ protected void doExecute(Task task, Request request, ActionListener li @Override protected void taskOperation(Request request, DataFrameTransformTask transformTask, ActionListener listener) { - Set ids = request.getExpandedIds(); if (ids == null) { listener.onFailure(new IllegalStateException("Request does not have expandedIds set")); @@ -142,13 +143,24 @@ protected void taskOperation(Request request, DataFrameTransformTask transformTa } if (ids.contains(transformTask.getTransformId())) { - try { - transformTask.stop(request.isForce()); - } catch (ElasticsearchException ex) { - listener.onFailure(ex); - return; - } - listener.onResponse(new Response(Boolean.TRUE)); + transformTask.setShouldStopAtCheckpoint(request.isWaitForCheckpoint(), ActionListener.wrap( + r -> { + try { + transformTask.stop(request.isForce(), request.isWaitForCheckpoint()); + listener.onResponse(new Response(true)); + } catch (ElasticsearchException ex) { + listener.onFailure(ex); + } + }, + e -> listener.onFailure( + new ElasticsearchStatusException( + "Failed to update transform task [{}] state value should_stop_at_checkpoint from [{}] to [{}]", + RestStatus.CONFLICT, + transformTask.getTransformId(), + transformTask.getState().shouldStopAtNextCheckpoint(), + request.isWaitForCheckpoint())) + ) + ); } else { listener.onFailure(new RuntimeException("ID of data frame indexer task [" + transformTask.getTransformId() + "] does not match request's ID [" + request.getId() + "]")); @@ -187,6 +199,13 @@ private ActionListener waitForStopListener(Request request, ActionList ); return ActionListener.wrap( response -> { + // If there were failures attempting to stop the tasks, we don't know if they will actually stop. + // It is better to respond to the user now than allow for the persistent task waiting to timeout + if (response.getTaskFailures().isEmpty() == false || response.getNodeFailures().isEmpty() == false) { + RestStatus status = firstNotOKStatus(response.getTaskFailures(), response.getNodeFailures()); + listener.onFailure(buildException(response.getTaskFailures(), response.getNodeFailures(), status)); + return; + } // Wait until the persistent task is stopped // Switch over to Generic threadpool so we don't block the network thread threadPool.generic().execute(() -> @@ -196,6 +215,46 @@ private ActionListener waitForStopListener(Request request, ActionList ); } + static ElasticsearchStatusException buildException(List taskOperationFailures, + List elasticsearchExceptions, + RestStatus status) { + List exceptions = Stream.concat( + taskOperationFailures.stream().map(TaskOperationFailure::getCause), + elasticsearchExceptions.stream()).collect(Collectors.toList()); + + ElasticsearchStatusException elasticsearchStatusException = + new ElasticsearchStatusException(exceptions.get(0).getMessage(), status); + + for (int i = 1; i < exceptions.size(); i++) { + elasticsearchStatusException.addSuppressed(exceptions.get(i)); + } + return elasticsearchStatusException; + } + + static RestStatus firstNotOKStatus(List taskOperationFailures, List exceptions) { + RestStatus status = RestStatus.OK; + + for (TaskOperationFailure taskOperationFailure : taskOperationFailures) { + status = taskOperationFailure.getStatus(); + if (RestStatus.OK.equals(status) == false) { + break; + } + } + if (status == RestStatus.OK) { + for (ElasticsearchException exception : exceptions) { + // As it stands right now, this will ALWAYS be INTERNAL_SERVER_ERROR. + // FailedNodeException does not overwrite the `status()` method and the logic in ElasticsearchException + // Just returns an INTERNAL_SERVER_ERROR + status = exception.status(); + if (RestStatus.OK.equals(status) == false) { + break; + } + } + } + // If all the previous exceptions don't have a valid status, we have an unknown error. + return status == RestStatus.OK ? RestStatus.INTERNAL_SERVER_ERROR : status; + } + private void waitForDataFrameStopped(Set persistentTaskIds, TimeValue timeout, boolean force, diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/persistence/DataFrameInternalIndex.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/persistence/DataFrameInternalIndex.java index 4834df6158441..d10dea711d934 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/persistence/DataFrameInternalIndex.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/persistence/DataFrameInternalIndex.java @@ -52,10 +52,11 @@ public final class DataFrameInternalIndex { * progress::docs_processed, progress::docs_indexed, * stats::exponential_avg_checkpoint_duration_ms, stats::exponential_avg_documents_indexed, * stats::exponential_avg_documents_processed + * version 3 (7.5): state::should_stop_at_checkpoint */ // constants for the index - public static final String INDEX_VERSION = "2"; + public static final String INDEX_VERSION = "3"; public static final String INDEX_PATTERN = ".data-frame-internal-"; public static final String LATEST_INDEX_VERSIONED_NAME = INDEX_PATTERN + INDEX_VERSION; public static final String LATEST_INDEX_NAME = LATEST_INDEX_VERSIONED_NAME; @@ -80,6 +81,7 @@ public final class DataFrameInternalIndex { public static final String DOUBLE = "double"; public static final String LONG = "long"; public static final String KEYWORD = "keyword"; + public static final String BOOLEAN = "boolean"; public static IndexTemplateMetaData getIndexTemplateMetaData() throws IOException { IndexTemplateMetaData dataFrameTemplate = IndexTemplateMetaData.builder(LATEST_INDEX_VERSIONED_NAME) @@ -183,6 +185,9 @@ private static XContentBuilder addDataFrameTransformStoredDocMappings(XContentBu .startObject(TransformState.INDEXER_STATE.getPreferredName()) .field(TYPE, KEYWORD) .endObject() + .startObject(TransformState.SHOULD_STOP_AT_NEXT_CHECKPOINT.getPreferredName()) + .field(TYPE, BOOLEAN) + .endObject() .startObject(TransformState.CURRENT_POSITION.getPreferredName()) .field(ENABLED, false) .endObject() diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/rest/action/RestStopDataFrameTransformAction.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/rest/action/RestStopDataFrameTransformAction.java index 40e6a6eee136a..5f7a8be6faf8c 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/rest/action/RestStopDataFrameTransformAction.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/rest/action/RestStopDataFrameTransformAction.java @@ -28,13 +28,14 @@ protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient boolean waitForCompletion = restRequest.paramAsBoolean(TransformField.WAIT_FOR_COMPLETION.getPreferredName(), false); boolean force = restRequest.paramAsBoolean(TransformField.FORCE.getPreferredName(), false); boolean allowNoMatch = restRequest.paramAsBoolean(TransformField.ALLOW_NO_MATCH.getPreferredName(), false); - + boolean waitForCheckpoint = restRequest.paramAsBoolean(TransformField.WAIT_FOR_CHECKPOINT.getPreferredName(), true); StopTransformAction.Request request = new StopTransformAction.Request(id, waitForCompletion, force, timeout, - allowNoMatch); + allowNoMatch, + waitForCheckpoint); return channel -> client.execute(StopTransformAction.INSTANCE, request, new RestToXContentListener<>(channel)); diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/ClientDataFrameIndexer.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/ClientDataFrameIndexer.java index a32b5ff6abc57..744e35c2a678b 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/ClientDataFrameIndexer.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/ClientDataFrameIndexer.java @@ -64,6 +64,8 @@ class ClientDataFrameIndexer extends DataFrameIndexer { // Keeps track of the last exception that was written to our audit, keeps us from spamming the audit index private volatile String lastAuditedExceptionMessage = null; private final AtomicBoolean oldStatsCleanedUp = new AtomicBoolean(false); + // TODO should be obviated in 8.x with DataFrameTransformTaskState::STOPPING + private volatile boolean shouldStopAtCheckpoint = false; private volatile Instant changesLastDetectedAt; ClientDataFrameIndexer(DataFrameTransformsConfigManager transformsConfigManager, @@ -78,7 +80,8 @@ class ClientDataFrameIndexer extends DataFrameIndexer { TransformProgress transformProgress, TransformCheckpoint lastCheckpoint, TransformCheckpoint nextCheckpoint, - DataFrameTransformTask parentTask) { + DataFrameTransformTask parentTask, + boolean shouldStopAtCheckpoint) { super(ExceptionsHelper.requireNonNull(parentTask, "parentTask") .getThreadPool() .executor(ThreadPool.Names.GENERIC), @@ -97,6 +100,15 @@ class ClientDataFrameIndexer extends DataFrameIndexer { this.client = ExceptionsHelper.requireNonNull(client, "client"); this.transformTask = parentTask; this.failureCount = new AtomicInteger(0); + this.shouldStopAtCheckpoint = shouldStopAtCheckpoint; + } + + boolean shouldStopAtCheckpoint() { + return shouldStopAtCheckpoint; + } + + void setShouldStopAtCheckpoint(boolean shouldStopAtCheckpoint) { + this.shouldStopAtCheckpoint = shouldStopAtCheckpoint; } @Override @@ -297,15 +309,21 @@ protected void doSaveState(IndexerState indexerState, TransformIndexerPosition p return; } - // This means that the indexer was triggered to discover changes, found none, and exited early. - // If the state is `STOPPED` this means that DataFrameTransformTask#stop was called while we were checking for changes. - // Allow the stop call path to continue - if (hasSourceChanged == false && indexerState.equals(IndexerState.STOPPED) == false) { - next.run(); - return; - } - TransformTaskState taskState = transformTask.getTaskState(); + boolean shouldStopAtCheckpoint = shouldStopAtCheckpoint(); + + // If we should stop at the next checkpoint, are STARTED, and with `initialRun()` we are in one of two states + // 1. We have just called `onFinish` completing our request, but `shouldStopAtCheckpoint` was set to `true` before our check + // there and now + // 2. We are on the very first run of a NEW checkpoint and got here either through a failure, or the very first save state call. + // + // In either case, we should stop so that we guarantee a consistent state and that there are no partially completed checkpoints + if (shouldStopAtCheckpoint && initialRun() && indexerState.equals(IndexerState.STARTED)) { + indexerState = IndexerState.STOPPED; + auditor.info(transformConfig.getId(), "Data frame is no longer in the middle of a checkpoint, initiating stop."); + logger.info("[{}] data frame transform is no longer in the middle of a checkpoint, initiating stop.", + transformConfig.getId()); + } if (indexerState.equals(IndexerState.STARTED) && transformTask.getCheckpoint() == 1 @@ -317,10 +335,23 @@ protected void doSaveState(IndexerState indexerState, TransformIndexerPosition p logger.info("[{}] data frame transform finished indexing all data, initiating stop.", transformConfig.getId()); } + // This means that the indexer was triggered to discover changes, found none, and exited early. + // If the state is `STOPPED` this means that DataFrameTransformTask#stop was called while we were checking for changes. + // Allow the stop call path to continue + if (hasSourceChanged == false && indexerState.equals(IndexerState.STOPPED) == false) { + next.run(); + return; + } + // If we are `STOPPED` on a `doSaveState` call, that indicates we transitioned to `STOPPED` from `STOPPING` // OR we called `doSaveState` manually as the indexer was not actively running. // Since we save the state to an index, we should make sure that our task state is in parity with the indexer state if (indexerState.equals(IndexerState.STOPPED)) { + // If we are going to stop after the state is saved, we should NOT persist `shouldStopAtCheckpoint: true` as this may + // cause problems if the task starts up again. + // Additionally, we don't have to worry about inconsistency with the ClusterState (if it is persisted there) as the + // when we stop, we mark the task as complete and that state goes away. + shouldStopAtCheckpoint = false; // We don't want adjust the stored taskState because as soon as it is `STOPPED` a user could call // .start again. taskState = TransformTaskState.STOPPED; @@ -332,9 +363,17 @@ protected void doSaveState(IndexerState indexerState, TransformIndexerPosition p position, transformTask.getCheckpoint(), transformTask.getStateReason(), - getProgress()); + getProgress(), + null, + shouldStopAtCheckpoint); logger.debug("[{}] updating persistent state of transform to [{}].", transformConfig.getId(), state.toString()); + doSaveState(state, ActionListener.wrap( + r -> next.run(), + e -> next.run() + )); + } + protected void doSaveState(TransformState state, ActionListener listener) { // This could be `null` but the putOrUpdateTransformStoredDoc handles that case just fine SeqNoPrimaryTermAndIndex seqNoPrimaryTermAndIndex = transformTask.getSeqNoPrimaryTermAndIndex(); @@ -342,50 +381,51 @@ protected void doSaveState(IndexerState indexerState, TransformIndexerPosition p // called is controlled by AsyncTwoPhaseIndexer#onBulkResponse which calls doSaveState every so // often when doing bulk indexing calls or at the end of one indexing run. transformsConfigManager.putOrUpdateTransformStoredDoc( - new TransformStoredDoc(getJobId(), state, getStats()), - seqNoPrimaryTermAndIndex, - ActionListener.wrap( - r -> { - transformTask.updateSeqNoPrimaryTermAndIndex(seqNoPrimaryTermAndIndex, r); - // for auto stop shutdown the task - if (state.getTaskState().equals(TransformTaskState.STOPPED)) { - transformTask.shutdown(); - } - // Only do this clean up once, if it succeeded, no reason to do the query again. - if (oldStatsCleanedUp.compareAndSet(false, true)) { - transformsConfigManager.deleteOldTransformStoredDocuments(getJobId(), ActionListener.wrap( - nil -> { - logger.trace("[{}] deleted old transform stats and state document", getJobId()); - next.run(); - }, - e -> { - String msg = LoggerMessageFormat.format("[{}] failed deleting old transform configurations.", - getJobId()); - logger.warn(msg, e); - // If we have failed, we should attempt the clean up again later - oldStatsCleanedUp.set(false); - next.run(); - } - )); - } else { - next.run(); - } - }, - statsExc -> { - logger.error(new ParameterizedMessage("[{}] updating stats of transform failed.", - transformConfig.getId()), - statsExc); - auditor.warning(getJobId(), - "Failure updating stats of transform: " + statsExc.getMessage()); - // for auto stop shutdown the task - if (state.getTaskState().equals(TransformTaskState.STOPPED)) { - transformTask.shutdown(); + new TransformStoredDoc(getJobId(), state, getStats()), + seqNoPrimaryTermAndIndex, + ActionListener.wrap( + r -> { + transformTask.updateSeqNoPrimaryTermAndIndex(seqNoPrimaryTermAndIndex, r); + // for auto stop shutdown the task + if (state.getTaskState().equals(TransformTaskState.STOPPED)) { + transformTask.shutdown(); + } + // Only do this clean up once, if it succeeded, no reason to do the query again. + if (oldStatsCleanedUp.compareAndSet(false, true)) { + transformsConfigManager.deleteOldTransformStoredDocuments(getJobId(), ActionListener.wrap( + nil -> { + logger.trace("[{}] deleted old transform stats and state document", getJobId()); + listener.onResponse(null); + }, + e -> { + String msg = LoggerMessageFormat.format("[{}] failed deleting old transform configurations.", + getJobId()); + logger.warn(msg, e); + // If we have failed, we should attempt the clean up again later + oldStatsCleanedUp.set(false); + listener.onResponse(null); } - next.run(); - } - )); + )); + } else { + listener.onResponse(null); + } + }, + statsExc -> { + logger.error(new ParameterizedMessage("[{}] updating stats of transform failed.", + transformConfig.getId()), + statsExc); + auditor.warning(getJobId(), + "Failure updating stats of transform: " + statsExc.getMessage()); + // for auto stop shutdown the task + if (state.getTaskState().equals(TransformTaskState.STOPPED)) { + transformTask.shutdown(); + } + listener.onFailure(statsExc); + } + )); } + @Override protected void onFailure(Exception exc) { // the failure handler must not throw an exception due to internal problems @@ -404,6 +444,10 @@ protected void onFinish(ActionListener listener) { // This indicates an early exit since no changes were found. // So, don't treat this like a checkpoint being completed, as no work was done. if (hasSourceChanged == false) { + // TODO should be obviated in 8.x with DataFrameTransformTaskState::STOPPING + if (shouldStopAtCheckpoint) { + stop(); + } listener.onResponse(null); return; } @@ -447,6 +491,10 @@ protected void onFinish(ActionListener listener) { logger.debug( "[{}] finished indexing for data frame transform checkpoint [{}].", getJobId(), checkpoint); auditBulkFailures = true; + // TODO should be obviated in 8.x with DataFrameTransformTaskState::STOPPING + if (shouldStopAtCheckpoint) { + stop(); + } listener.onResponse(null); } catch (Exception e) { listener.onFailure(e); diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/ClientDataFrameIndexerBuilder.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/ClientDataFrameIndexerBuilder.java index f2557cc859061..32d982fb77a01 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/ClientDataFrameIndexerBuilder.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/ClientDataFrameIndexerBuilder.java @@ -34,6 +34,7 @@ class ClientDataFrameIndexerBuilder { private TransformProgress progress; private TransformCheckpoint lastCheckpoint; private TransformCheckpoint nextCheckpoint; + private boolean shouldStopAtCheckpoint; ClientDataFrameIndexerBuilder() { this.initialStats = new TransformIndexerStats(); @@ -54,7 +55,13 @@ ClientDataFrameIndexer build(DataFrameTransformTask parentTask) { this.progress, this.lastCheckpoint, this.nextCheckpoint, - parentTask); + parentTask, + this.shouldStopAtCheckpoint); + } + + ClientDataFrameIndexerBuilder setShouldStopAtCheckpoint(boolean shouldStopAtCheckpoint) { + this.shouldStopAtCheckpoint = shouldStopAtCheckpoint; + return this; } ClientDataFrameIndexerBuilder setClient(Client client) { @@ -120,4 +127,4 @@ ClientDataFrameIndexerBuilder setNextCheckpoint(TransformCheckpoint nextCheckpoi this.nextCheckpoint = nextCheckpoint; return this; } -} \ No newline at end of file +} diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/DataFrameTransformPersistentTasksExecutor.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/DataFrameTransformPersistentTasksExecutor.java index 3d862d76b54e8..06c280623d25f 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/DataFrameTransformPersistentTasksExecutor.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/DataFrameTransformPersistentTasksExecutor.java @@ -200,16 +200,19 @@ protected void nodeOperation(AllocatedPersistentTask task, @Nullable Transform p // Since we have not set the value for this yet, it SHOULD be null buildTask.updateSeqNoPrimaryTermAndIndex(null, seqNoPrimaryTermAndIndex); logger.trace("[{}] initializing state and stats: [{}]", transformId, stateAndStats.toString()); + TransformState transformState = stateAndStats.getTransformState(); indexerBuilder.setInitialStats(stateAndStats.getTransformStats()) .setInitialPosition(stateAndStats.getTransformState().getPosition()) .setProgress(stateAndStats.getTransformState().getProgress()) - .setIndexerState(currentIndexerState(stateAndStats.getTransformState())); + .setIndexerState(currentIndexerState(stateAndStats.getTransformState())) + // TODO should be obviated in 8.x with DataFrameTransformTaskState::STOPPING + .setShouldStopAtCheckpoint(transformState.shouldStopAtNextCheckpoint()); logger.debug("[{}] Loading existing state: [{}], position [{}]", transformId, stateAndStats.getTransformState(), stateAndStats.getTransformState().getPosition()); - stateHolder.set(stateAndStats.getTransformState()); + stateHolder.set(transformState); final long lastCheckpoint = stateHolder.get().getCheckpoint(); if (lastCheckpoint == 0) { diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/DataFrameTransformTask.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/DataFrameTransformTask.java index ad8f3e6691a57..70fb8aabd0dd9 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/DataFrameTransformTask.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/DataFrameTransformTask.java @@ -142,7 +142,9 @@ public TransformState getState() { initialPosition, currentCheckpoint.get(), stateReason.get(), - null); + null, + null, //Node attributes + false); } else { return new TransformState( taskState.get(), @@ -150,7 +152,9 @@ public TransformState getState() { indexer.get().getPosition(), currentCheckpoint.get(), stateReason.get(), - getIndexer().getProgress()); + getIndexer().getProgress(), + null, //Node attributes + indexer.get().shouldStopAtCheckpoint()); } } @@ -210,6 +214,15 @@ synchronized void start(Long startingCheckpoint, boolean force, boolean failOnCo RestStatus.CONFLICT)); return; } + // If we are already in a `STARTED` state, we should not attempt to call `.start` on the indexer again. + if (taskState.get() == TransformTaskState.STARTED) { + listener.onFailure(new ElasticsearchStatusException( + "Cannot start transform [{}] as it is already STARTED.", + RestStatus.CONFLICT, + getTransformId() + )); + return; + } if (getIndexer() == null) { // If our state is failed AND the indexer is null, the user needs to _stop?force=true so that the indexer gets // fully initialized. @@ -232,6 +245,12 @@ synchronized void start(Long startingCheckpoint, boolean force, boolean failOnCo )); return; } + + if (getIndexer().shouldStopAtCheckpoint()) { + listener.onFailure(new ElasticsearchException("Cannot start task for data frame transform [{}], " + + "because it is stopping at the next checkpoint.", transform.getId())); + return; + } final IndexerState newState = getIndexer().start(); if (Arrays.stream(RUNNING_STATES).noneMatch(newState::equals)) { listener.onFailure(new ElasticsearchException("Cannot start task for data frame transform [{}], because state was [{}]", @@ -288,8 +307,63 @@ public synchronized void start(Long startingCheckpoint, boolean force, ActionLis start(startingCheckpoint, force, true, listener); } - public synchronized void stop(boolean force) { - logger.debug("[{}] stop called with force [{}] and state [{}]", getTransformId(), force, getState()); + /** + * This sets the flag for the task to stop at the next checkpoint. + * + * If first persists the flag to cluster state, and then mutates the local variable. + * + * It only persists to cluster state if the value is different than what is currently held in memory. + * @param shouldStopAtCheckpoint whether or not we should stop at the next checkpoint or not + * @param shouldStopAtCheckpointListener the listener to return to when we have persisted the updated value to the state index. + */ + public synchronized void setShouldStopAtCheckpoint(boolean shouldStopAtCheckpoint, + ActionListener shouldStopAtCheckpointListener) { + logger.debug("[{}] attempted to set task to stop at checkpoint [{}] with state [{}]", + getTransformId(), + shouldStopAtCheckpoint, + getState()); + if (taskState.get() == TransformTaskState.STARTED == false || + getIndexer() == null || + getIndexer().shouldStopAtCheckpoint() == shouldStopAtCheckpoint || + getIndexer().getState() == IndexerState.STOPPED || + getIndexer().getState() == IndexerState.STOPPING) { + shouldStopAtCheckpointListener.onResponse(null); + return; + } + TransformState state = new TransformState( + taskState.get(), + indexer.get().getState(), + indexer.get().getPosition(), + currentCheckpoint.get(), + stateReason.get(), + getIndexer().getProgress(), + null, //Node attributes + shouldStopAtCheckpoint); + getIndexer().doSaveState(state, + ActionListener.wrap( + r -> { + // We only want to update this internal value if it is persisted as such + getIndexer().setShouldStopAtCheckpoint(shouldStopAtCheckpoint); + logger.debug("[{}] successfully persisted should_stop_at_checkpoint update [{}]", + getTransformId(), + shouldStopAtCheckpoint); + shouldStopAtCheckpointListener.onResponse(null); + }, + statsExc -> { + logger.warn("[{}] failed to persist should_stop_at_checkpoint update [{}]", + getTransformId(), + shouldStopAtCheckpoint); + shouldStopAtCheckpointListener.onFailure(statsExc); + } + )); + } + + public synchronized void stop(boolean force, boolean shouldStopAtCheckpoint) { + logger.debug("[{}] stop called with force [{}], shouldStopAtCheckpoint [{}], state [{}]", + getTransformId(), + force, + shouldStopAtCheckpoint, + getState()); if (getIndexer() == null) { // If there is no indexer the task has not been triggered // but it still needs to be stopped and removed @@ -309,16 +383,26 @@ public synchronized void stop(boolean force) { RestStatus.CONFLICT); } - IndexerState state = getIndexer().stop(); stateReason.set(null); // No reason to keep it in the potentially failed state. - // Since we have called `stop` against the indexer, we have no more fear of triggering again. - // But, since `doSaveState` is asynchronous, it is best to set the state as STARTED so that another `start` call cannot be + // Since `doSaveState` is asynchronous, it is best to set the state as STARTED so that another `start` call cannot be // executed while we are wrapping up. - taskState.compareAndSet(TransformTaskState.FAILED, TransformTaskState.STARTED); - if (state == IndexerState.STOPPED) { - getIndexer().onStop(); - getIndexer().doSaveState(state, getIndexer().getPosition(), () -> {}); + boolean wasFailed = taskState.compareAndSet(TransformTaskState.FAILED, TransformTaskState.STARTED); + + // shouldStopAtCheckpoint only comes into play when onFinish is called (or doSaveState right after). + // if it is false, stop immediately + if (shouldStopAtCheckpoint == false || + // If state was in a failed state, we should stop immediately as we will never reach the next checkpoint + wasFailed || + // If the indexerState is STARTED and it is on an initialRun, that means that the indexer has previously finished a checkpoint, + // or has yet to even start one. + // Either way, this means that we won't get to have onFinish called down stream (or at least won't for some time). + (getIndexer().getState() == IndexerState.STARTED && getIndexer().initialRun())) { + IndexerState state = getIndexer().stop(); + if (state == IndexerState.STOPPED) { + getIndexer().onStop(); + getIndexer().doSaveState(state, getIndexer().getPosition(), () -> {}); + } } } @@ -413,6 +497,12 @@ synchronized void markAsFailed(String reason, ActionListener listener) { // We should not keep retrying. Either the task will be stopped, or started // If it is started again, it is registered again. deregisterSchedulerJob(); + // The idea of stopping at the next checkpoint is no longer valid. Since a failed task could potentially START again, + // we should set this flag to false. + if (getIndexer() != null) { + getIndexer().setShouldStopAtCheckpoint(false); + } + // The end user should see that the task is in a failed state, and attempt to stop it again but with force=true taskState.set(TransformTaskState.FAILED); stateReason.set(reason); TransformState newState = getState(); diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/action/TransportStopDataFrameTransformActionTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/action/TransportStopDataFrameTransformActionTests.java index 5de09509d7ead..1e16cea345a8d 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/action/TransportStopDataFrameTransformActionTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/action/TransportStopDataFrameTransformActionTests.java @@ -5,12 +5,15 @@ */ package org.elasticsearch.xpack.transform.action; +import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.Version; +import org.elasticsearch.action.TaskOperationFailure; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.persistent.PersistentTasksCustomMetaData; +import org.elasticsearch.rest.RestStatus; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.xpack.core.indexing.IndexerState; import org.elasticsearch.xpack.core.transform.TransformMessages; @@ -18,8 +21,10 @@ import org.elasticsearch.xpack.core.transform.transforms.TransformState; import org.elasticsearch.xpack.core.transform.transforms.TransformTaskState; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.List; import static org.elasticsearch.rest.RestStatus.CONFLICT; import static org.hamcrest.Matchers.equalTo; @@ -91,4 +96,67 @@ public void testTaskStateValidationWithDataFrameTasks() { "task has failed"))); } + public void testFirstNotOKStatus() { + List nodeFailures = new ArrayList<>(); + List taskOperationFailures = new ArrayList<>(); + + nodeFailures.add(new ElasticsearchException("nodefailure", + new ElasticsearchStatusException("failure", RestStatus.UNPROCESSABLE_ENTITY))); + taskOperationFailures.add(new TaskOperationFailure("node", + 1, + new ElasticsearchStatusException("failure", RestStatus.BAD_REQUEST))); + + assertThat(TransportStopDataFrameTransformAction.firstNotOKStatus(Collections.emptyList(), Collections.emptyList()), + equalTo(RestStatus.INTERNAL_SERVER_ERROR)); + + assertThat(TransportStopDataFrameTransformAction.firstNotOKStatus(taskOperationFailures, Collections.emptyList()), + equalTo(RestStatus.BAD_REQUEST)); + assertThat(TransportStopDataFrameTransformAction.firstNotOKStatus(taskOperationFailures, nodeFailures), + equalTo(RestStatus.BAD_REQUEST)); + assertThat(TransportStopDataFrameTransformAction.firstNotOKStatus(taskOperationFailures, + Collections.singletonList(new ElasticsearchException(new ElasticsearchStatusException("not failure", RestStatus.OK)))), + equalTo(RestStatus.BAD_REQUEST)); + + assertThat(TransportStopDataFrameTransformAction.firstNotOKStatus( + Collections.singletonList(new TaskOperationFailure( + "node", + 1, + new ElasticsearchStatusException("not failure", RestStatus.OK))), + nodeFailures), + equalTo(RestStatus.INTERNAL_SERVER_ERROR)); + + assertThat(TransportStopDataFrameTransformAction.firstNotOKStatus( + Collections.emptyList(), + nodeFailures), + equalTo(RestStatus.INTERNAL_SERVER_ERROR)); + } + + public void testBuildException() { + List nodeFailures = new ArrayList<>(); + List taskOperationFailures = new ArrayList<>(); + + nodeFailures.add(new ElasticsearchException("node failure")); + taskOperationFailures.add(new TaskOperationFailure("node", + 1, + new ElasticsearchStatusException("task failure", RestStatus.BAD_REQUEST))); + + RestStatus status = CONFLICT; + ElasticsearchStatusException statusException = + TransportStopDataFrameTransformAction.buildException(taskOperationFailures, nodeFailures, status); + + assertThat(statusException.status(), equalTo(status)); + assertThat(statusException.getMessage(), equalTo(taskOperationFailures.get(0).getCause().getMessage())); + assertThat(statusException.getSuppressed().length, equalTo(1)); + + statusException = TransportStopDataFrameTransformAction.buildException(Collections.emptyList(), nodeFailures, status); + assertThat(statusException.status(), equalTo(status)); + assertThat(statusException.getMessage(), equalTo(nodeFailures.get(0).getMessage())); + assertThat(statusException.getSuppressed().length, equalTo(0)); + + statusException = TransportStopDataFrameTransformAction.buildException(taskOperationFailures, Collections.emptyList(), status); + assertThat(statusException.status(), equalTo(status)); + assertThat(statusException.getMessage(), equalTo(taskOperationFailures.get(0).getCause().getMessage())); + assertThat(statusException.getSuppressed().length, equalTo(0)); + } + } diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/ClientDataFrameIndexerTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/ClientDataFrameIndexerTests.java index 0745af2a97fbd..ebf5a3f17dee6 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/ClientDataFrameIndexerTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/ClientDataFrameIndexerTests.java @@ -67,7 +67,8 @@ public void testAudiOnFinishFrequency() { 2L, Collections.emptyMap(), Instant.now().toEpochMilli()), - parentTask); + parentTask, + false); List shouldAudit = IntStream.range(0, 100_000).boxed().map(indexer::shouldAuditOnFinish).collect(Collectors.toList());