Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ML][Data Frame] add support for wait_for_checkpoint flag on _stop API #45469

Closed
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
195d0f3
[ML][Data Frame] add support for wait_for_checkpoint flag on _stop
benwtrent Aug 9, 2019
c8c56f3
Merge branch 'master' into feature/ml-df-add-wait_for_checkpoint-flag
elasticmachine Aug 13, 2019
1356bfd
Merge branch 'master' into feature/ml-df-add-wait_for_checkpoint-flag
benwtrent Aug 16, 2019
c08deb5
Merge branch 'feature/ml-df-add-wait_for_checkpoint-flag' of github.c…
benwtrent Aug 19, 2019
ea4cd47
Merge branch 'master' into feature/ml-df-add-wait_for_checkpoint-flag
benwtrent Aug 19, 2019
d87f3ac
intermediate commit
benwtrent Aug 19, 2019
5482584
Merge branch 'master' into feature/ml-df-add-wait_for_checkpoint-flag
benwtrent Aug 28, 2019
a8dcefb
minor fix
benwtrent Aug 28, 2019
b467f22
further bug fixes and race condition fixes
benwtrent Aug 28, 2019
8ef6160
addressing formatting concerns
benwtrent Aug 28, 2019
885aed2
Merge branch 'master' into feature/ml-df-add-wait_for_checkpoint-flag
benwtrent Sep 3, 2019
0acffb3
moving to save state into index and not cluster state
benwtrent Sep 3, 2019
10bd717
adding waitforcheckpoint integration test;
benwtrent Sep 3, 2019
ffd8acb
fixing stop logic
benwtrent Sep 5, 2019
33a4096
fixing request name in test
benwtrent Sep 5, 2019
0ef6e34
Merge branch 'master' into feature/ml-df-add-wait_for_checkpoint-flag
benwtrent Sep 5, 2019
e8db75c
dont wait for tasks completion if there are failures
benwtrent Sep 5, 2019
4a83849
Building intelligible error from failures, fixing yml tests
benwtrent Sep 6, 2019
59f509c
Merge branch 'master' into feature/ml-df-add-wait_for_checkpoint-flag
benwtrent Sep 6, 2019
58d1336
updating assertion
benwtrent Sep 6, 2019
ba99096
Merge branch 'master' into feature/ml-df-add-wait_for_checkpoint-flag
benwtrent Sep 12, 2019
e3e1c50
Merge branch 'master' into feature/ml-df-add-wait_for_checkpoint-flag
benwtrent Sep 16, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import static org.elasticsearch.client.dataframe.DeleteDataFrameTransformRequest.FORCE;
import static org.elasticsearch.client.dataframe.GetDataFrameTransformRequest.ALLOW_NO_MATCH;
import static org.elasticsearch.client.dataframe.PutDataFrameTransformRequest.DEFER_VALIDATION;
import static org.elasticsearch.client.dataframe.StopDataFrameTransformRequest.WAIT_FOR_CHECKPOINT;

final class DataFrameRequestConverters {

Expand Down Expand Up @@ -135,6 +136,9 @@ static Request stopDataFrameTransform(StopDataFrameTransformRequest stopRequest)
if (stopRequest.getAllowNoMatch() != null) {
request.addParameter(ALLOW_NO_MATCH, stopRequest.getAllowNoMatch().toString());
}
if (stopRequest.getWaitForCheckpoint() != null) {
request.addParameter(WAIT_FOR_CHECKPOINT, stopRequest.getWaitForCheckpoint().toString());
}
request.addParameters(params.asMap());
return request;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,21 +28,23 @@

public class StopDataFrameTransformRequest implements Validatable {

public static final String WAIT_FOR_CHECKPOINT = "wait_for_checkpoint";

private final String id;
private Boolean waitForCompletion;
private TimeValue timeout;
private Boolean allowNoMatch;
private Boolean waitForCheckpoint;

public StopDataFrameTransformRequest(String id) {
this.id = id;
waitForCompletion = null;
timeout = null;
this(id, null, null, null);
}

public StopDataFrameTransformRequest(String id, Boolean waitForCompletion, TimeValue timeout) {
public StopDataFrameTransformRequest(String id, Boolean waitForCompletion, TimeValue timeout, Boolean waitForCheckpoint) {
this.id = id;
this.waitForCompletion = waitForCompletion;
this.timeout = timeout;
this.waitForCheckpoint = waitForCheckpoint;
}

public String getId() {
Expand Down Expand Up @@ -73,6 +75,14 @@ public void setAllowNoMatch(Boolean allowNoMatch) {
this.allowNoMatch = allowNoMatch;
}

public Boolean getWaitForCheckpoint() {
return waitForCheckpoint;
}

public void setWaitForCheckpoint(Boolean waitForCheckpoint) {
this.waitForCheckpoint = waitForCheckpoint;
}

@Override
public Optional<ValidationException> validate() {
if (id == null) {
Expand All @@ -86,7 +96,7 @@ public Optional<ValidationException> validate() {

@Override
public int hashCode() {
return Objects.hash(id, waitForCompletion, timeout, allowNoMatch);
return Objects.hash(id, waitForCompletion, timeout, allowNoMatch, waitForCheckpoint);
}

@Override
Expand All @@ -102,7 +112,8 @@ public boolean equals(Object obj) {
return Objects.equals(this.id, other.id)
&& Objects.equals(this.waitForCompletion, other.waitForCompletion)
&& Objects.equals(this.timeout, other.timeout)
&& Objects.equals(this.allowNoMatch, other.allowNoMatch);
&& Objects.equals(this.allowNoMatch, other.allowNoMatch)
&& Objects.equals(this.waitForCheckpoint, other.waitForCheckpoint);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,13 @@ public void testStopDataFrameTransform() {
if (randomBoolean()) {
timeValue = TimeValue.parseTimeValue(randomTimeValue(), "timeout");
}
StopDataFrameTransformRequest stopRequest = new StopDataFrameTransformRequest(id, waitForCompletion, timeValue);

Boolean waitForCheckpoint = null;
if (randomBoolean()) {
waitForCheckpoint = randomBoolean();
}

StopDataFrameTransformRequest stopRequest = new StopDataFrameTransformRequest(id, waitForCompletion, timeValue, waitForCheckpoint);

Request request = DataFrameRequestConverters.stopDataFrameTransform(stopRequest);
assertEquals(HttpPost.METHOD_NAME, request.getMethod());
Expand All @@ -169,6 +175,13 @@ public void testStopDataFrameTransform() {
assertFalse(request.getParameters().containsKey("timeout"));
}

if (waitForCheckpoint != null) {
assertTrue(request.getParameters().containsKey("wait_for_checkpoint"));
assertEquals(stopRequest.getWaitForCheckpoint(), Boolean.parseBoolean(request.getParameters().get("wait_for_checkpoint")));
} else {
assertFalse(request.getParameters().containsKey("wait_for_checkpoint"));
}

assertFalse(request.getParameters().containsKey(ALLOW_NO_MATCH));
stopRequest.setAllowNoMatch(randomBoolean());
request = DataFrameRequestConverters.stopDataFrameTransform(stopRequest);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ private void indexData(String indexName) throws IOException {
public void cleanUpTransforms() throws Exception {
for (String transformId : transformsToClean) {
highLevelClient().dataFrame().stopDataFrameTransform(
new StopDataFrameTransformRequest(transformId, Boolean.TRUE, null), RequestOptions.DEFAULT);
new StopDataFrameTransformRequest(transformId, true, null, false), RequestOptions.DEFAULT);
}

for (String transformId : transformsToClean) {
Expand Down Expand Up @@ -310,7 +310,7 @@ public void testStartStop() throws IOException {
assertThat(taskState, oneOf(DataFrameTransformStats.State.STARTED, DataFrameTransformStats.State.INDEXING,
DataFrameTransformStats.State.STOPPING, DataFrameTransformStats.State.STOPPED));

StopDataFrameTransformRequest stopRequest = new StopDataFrameTransformRequest(id, Boolean.TRUE, null);
StopDataFrameTransformRequest stopRequest = new StopDataFrameTransformRequest(id, true, null, false);
StopDataFrameTransformResponse stopResponse =
execute(stopRequest, client::stopDataFrameTransform, client::stopDataFrameTransformAsync);
assertTrue(stopResponse.isAcknowledged());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public class DataFrameTransformDocumentationIT extends ESRestHighLevelClientTest
public void cleanUpTransforms() throws Exception {
for (String transformId : transformsToClean) {
highLevelClient().dataFrame().stopDataFrameTransform(
new StopDataFrameTransformRequest(transformId, Boolean.TRUE, TimeValue.timeValueSeconds(20)), RequestOptions.DEFAULT);
new StopDataFrameTransformRequest(transformId, true, TimeValue.timeValueSeconds(20), false), RequestOptions.DEFAULT);
}

for (String transformId : transformsToClean) {
Expand Down
5 changes: 5 additions & 0 deletions docs/reference/data-frames/apis/stop-transform.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,11 @@ are no matches or only partial matches.
state completely stops. If set to `false`, the API returns immediately and the
indexer will be stopped asynchronously in the background. Defaults to `false`.

`wait_for_checkpoint`::
(Optional, boolean) If set to `true`, the transform will not completely stop
until the current checkpoint is completed. If set to `false`, the transform
stops as soon as possible. Defaults to `true`.

[[stop-data-frame-transform-response-codes]]
==== {api-response-codes-title}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ public final class DataFrameField {
public static final ParseField GROUP_BY = new ParseField("group_by");
public static final ParseField TIMEOUT = new ParseField("timeout");
public static final ParseField WAIT_FOR_COMPLETION = new ParseField("wait_for_completion");
public static final ParseField WAIT_FOR_CHECKPOINT = new ParseField("wait_for_checkpoint");
public static final ParseField STATS_FIELD = new ParseField("stats");
public static final ParseField INDEX_DOC_TYPE = new ParseField("doc_type");
public static final ParseField SOURCE = new ParseField("source");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,18 +46,25 @@ private StopDataFrameTransformAction() {
public static class Request extends BaseTasksRequest<Request> {
private final String id;
private final boolean waitForCompletion;
private final boolean waitForCheckpoint;
private final boolean force;
private final boolean allowNoMatch;
private Set<String> expandedIds;

public Request(String id, boolean waitForCompletion, boolean force, @Nullable TimeValue timeout, boolean allowNoMatch) {
public Request(String id,
boolean waitForCompletion,
boolean force,
@Nullable TimeValue timeout,
boolean allowNoMatch,
boolean waitForCheckpoint) {
this.id = ExceptionsHelper.requireNonNull(id, DataFrameField.ID.getPreferredName());
this.waitForCompletion = waitForCompletion;
this.force = force;

// use the timeout value already present in BaseTasksRequest
this.setTimeout(timeout == null ? DEFAULT_TIMEOUT : timeout);
this.allowNoMatch = allowNoMatch;
this.waitForCheckpoint = waitForCheckpoint;
}

public Request(StreamInput in) throws IOException {
Expand All @@ -73,6 +80,11 @@ public Request(StreamInput in) throws IOException {
} else {
this.allowNoMatch = true;
}
if (in.getVersion().onOrAfter(Version.V_8_0_0)) {
this.waitForCheckpoint = in.readBoolean();
} else {
this.waitForCheckpoint = true;
}
}

public String getId() {
Expand All @@ -99,6 +111,10 @@ public boolean isAllowNoMatch() {
return allowNoMatch;
}

public boolean isWaitForCheckpoint() {
return waitForCheckpoint;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
Expand All @@ -113,6 +129,9 @@ public void writeTo(StreamOutput out) throws IOException {
if (out.getVersion().onOrAfter(Version.V_7_3_0)) {
out.writeBoolean(allowNoMatch);
}
if (out.getVersion().onOrAfter(Version.V_8_0_0)) {
out.writeBoolean(waitForCheckpoint);
}
}

@Override
Expand All @@ -123,7 +142,7 @@ public ActionRequestValidationException validate() {
@Override
public int hashCode() {
// the base class does not implement hashCode, therefore we need to hash timeout ourselves
return Objects.hash(id, waitForCompletion, force, expandedIds, this.getTimeout(), allowNoMatch);
return Objects.hash(id, waitForCompletion, force, expandedIds, this.getTimeout(), allowNoMatch, waitForCheckpoint);
}

@Override
Expand All @@ -146,7 +165,8 @@ public boolean equals(Object obj) {
Objects.equals(waitForCompletion, other.waitForCompletion) &&
Objects.equals(force, other.force) &&
Objects.equals(expandedIds, other.expandedIds) &&
allowNoMatch == other.allowNoMatch;
allowNoMatch == other.allowNoMatch &&
waitForCheckpoint == other.waitForCheckpoint;
}

@Override
Expand All @@ -157,7 +177,6 @@ public boolean match(Task task) {
return expandedIds.contains(id);
}
}

return false;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ public class DataFrameTransformState implements Task.Status, PersistentTaskState
@Nullable
private NodeAttributes node;

// TODO: 8.x this needs to be deprecated and we move towards a STOPPING TASK_STATE
private final boolean shouldStopAtNextCheckpoint;

public static final ParseField TASK_STATE = new ParseField("task_state");
public static final ParseField INDEXER_STATE = new ParseField("indexer_state");

Expand All @@ -53,6 +56,8 @@ public class DataFrameTransformState implements Task.Status, PersistentTaskState
public static final ParseField REASON = new ParseField("reason");
public static final ParseField PROGRESS = new ParseField("progress");
public static final ParseField NODE = new ParseField("node");
public static final ParseField SHOULD_STOP_AT_NEXT_CHECKPOINT = new ParseField("should_stop_at_checkpoint");


@SuppressWarnings("unchecked")
public static final ConstructingObjectParser<DataFrameTransformState, Void> PARSER = new ConstructingObjectParser<>(NAME,
Expand All @@ -72,8 +77,16 @@ public class DataFrameTransformState implements Task.Status, PersistentTaskState
String reason = (String) args[5];
DataFrameTransformProgress progress = (DataFrameTransformProgress) args[6];
NodeAttributes node = (NodeAttributes) args[7];

return new DataFrameTransformState(taskState, indexerState, dataFrameIndexerPosition, checkpoint, reason, progress, node);
boolean shouldStopAtNextCheckpoint = args[8] == null ? false : (boolean)args[8];

return new DataFrameTransformState(taskState,
indexerState,
dataFrameIndexerPosition,
checkpoint,
reason,
progress,
node,
shouldStopAtNextCheckpoint);
});

static {
Expand All @@ -85,6 +98,7 @@ public class DataFrameTransformState implements Task.Status, PersistentTaskState
PARSER.declareString(optionalConstructorArg(), REASON);
PARSER.declareField(optionalConstructorArg(), DataFrameTransformProgress.PARSER::apply, PROGRESS, ValueType.OBJECT);
PARSER.declareField(optionalConstructorArg(), NodeAttributes.PARSER::apply, NODE, ValueType.OBJECT);
PARSER.declareBoolean(optionalConstructorArg(), SHOULD_STOP_AT_NEXT_CHECKPOINT);
}

public DataFrameTransformState(DataFrameTransformTaskState taskState,
Expand All @@ -93,14 +107,26 @@ public DataFrameTransformState(DataFrameTransformTaskState taskState,
long checkpoint,
@Nullable String reason,
@Nullable DataFrameTransformProgress progress,
@Nullable NodeAttributes node) {
@Nullable NodeAttributes node,
boolean shouldStopAtNextCheckpoint) {
this.taskState = taskState;
this.indexerState = indexerState;
this.position = position;
this.checkpoint = checkpoint;
this.reason = reason;
this.progress = progress;
this.node = node;
this.shouldStopAtNextCheckpoint = shouldStopAtNextCheckpoint;
}

public DataFrameTransformState(DataFrameTransformTaskState taskState,
IndexerState indexerState,
@Nullable DataFrameIndexerPosition position,
long checkpoint,
@Nullable String reason,
@Nullable DataFrameTransformProgress progress,
@Nullable NodeAttributes node) {
this(taskState, indexerState, position, checkpoint, reason, progress, node, false);
}

public DataFrameTransformState(DataFrameTransformTaskState taskState,
Expand Down Expand Up @@ -129,6 +155,11 @@ public DataFrameTransformState(StreamInput in) throws IOException {
} else {
node = null;
}
if (in.getVersion().onOrAfter(Version.V_8_0_0)) {
shouldStopAtNextCheckpoint = in.readBoolean();
} else {
shouldStopAtNextCheckpoint = false;
}
}

public DataFrameTransformTaskState getTaskState() {
Expand Down Expand Up @@ -164,6 +195,10 @@ public DataFrameTransformState setNode(NodeAttributes node) {
return this;
}

public boolean shouldStopAtNextCheckpoint() {
return shouldStopAtNextCheckpoint;
}

public static DataFrameTransformState fromXContent(XContentParser parser) {
try {
return PARSER.parse(parser, null);
Expand All @@ -190,6 +225,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
if (node != null) {
builder.field(NODE.getPreferredName(), node);
}
builder.field(SHOULD_STOP_AT_NEXT_CHECKPOINT.getPreferredName(), shouldStopAtNextCheckpoint);
builder.endObject();
return builder;
}
Expand All @@ -214,6 +250,9 @@ public void writeTo(StreamOutput out) throws IOException {
if (out.getVersion().onOrAfter(Version.V_7_3_0)) {
out.writeOptionalWriteable(node);
}
if (out.getVersion().onOrAfter(Version.V_8_0_0)) {
out.writeBoolean(shouldStopAtNextCheckpoint);
}
}

@Override
Expand All @@ -234,12 +273,13 @@ public boolean equals(Object other) {
this.checkpoint == that.checkpoint &&
Objects.equals(this.reason, that.reason) &&
Objects.equals(this.progress, that.progress) &&
Objects.equals(this.shouldStopAtNextCheckpoint, that.shouldStopAtNextCheckpoint) &&
Objects.equals(this.node, that.node);
}

@Override
public int hashCode() {
return Objects.hash(taskState, indexerState, position, checkpoint, reason, progress, node);
return Objects.hash(taskState, indexerState, position, checkpoint, reason, progress, node, shouldStopAtNextCheckpoint);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import java.util.Locale;

public enum DataFrameTransformTaskState implements Writeable {
// TODO 8.x add a `STOPPING` state and BWC handling in ::fromString
STOPPED, STARTED, FAILED;

public static DataFrameTransformTaskState fromString(String name) {
Expand Down
Loading