Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ML][Data Frame] add support for wait_for_checkpoint flag on _stop API #45469

Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
195d0f3
[ML][Data Frame] add support for wait_for_checkpoint flag on _stop
benwtrent Aug 9, 2019
c8c56f3
Merge branch 'master' into feature/ml-df-add-wait_for_checkpoint-flag
elasticmachine Aug 13, 2019
1356bfd
Merge branch 'master' into feature/ml-df-add-wait_for_checkpoint-flag
benwtrent Aug 16, 2019
c08deb5
Merge branch 'feature/ml-df-add-wait_for_checkpoint-flag' of github.c…
benwtrent Aug 19, 2019
ea4cd47
Merge branch 'master' into feature/ml-df-add-wait_for_checkpoint-flag
benwtrent Aug 19, 2019
d87f3ac
intermediate commit
benwtrent Aug 19, 2019
5482584
Merge branch 'master' into feature/ml-df-add-wait_for_checkpoint-flag
benwtrent Aug 28, 2019
a8dcefb
minor fix
benwtrent Aug 28, 2019
b467f22
further bug fixes and race condition fixes
benwtrent Aug 28, 2019
8ef6160
addressing formatting concerns
benwtrent Aug 28, 2019
885aed2
Merge branch 'master' into feature/ml-df-add-wait_for_checkpoint-flag
benwtrent Sep 3, 2019
0acffb3
moving to save state into index and not cluster state
benwtrent Sep 3, 2019
10bd717
adding waitforcheckpoint integration test;
benwtrent Sep 3, 2019
ffd8acb
fixing stop logic
benwtrent Sep 5, 2019
33a4096
fixing request name in test
benwtrent Sep 5, 2019
0ef6e34
Merge branch 'master' into feature/ml-df-add-wait_for_checkpoint-flag
benwtrent Sep 5, 2019
e8db75c
dont wait for tasks completion if there are failures
benwtrent Sep 5, 2019
4a83849
Building intelligible error from failures, fixing yml tests
benwtrent Sep 6, 2019
59f509c
Merge branch 'master' into feature/ml-df-add-wait_for_checkpoint-flag
benwtrent Sep 6, 2019
58d1336
updating assertion
benwtrent Sep 6, 2019
ba99096
Merge branch 'master' into feature/ml-df-add-wait_for_checkpoint-flag
benwtrent Sep 12, 2019
e3e1c50
Merge branch 'master' into feature/ml-df-add-wait_for_checkpoint-flag
benwtrent Sep 16, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import static org.elasticsearch.client.dataframe.DeleteDataFrameTransformRequest.FORCE;
import static org.elasticsearch.client.dataframe.GetDataFrameTransformRequest.ALLOW_NO_MATCH;
import static org.elasticsearch.client.dataframe.PutDataFrameTransformRequest.DEFER_VALIDATION;
import static org.elasticsearch.client.dataframe.StopDataFrameTransformRequest.WAIT_FOR_CHECKPOINT;

final class DataFrameRequestConverters {

Expand Down Expand Up @@ -135,6 +136,9 @@ static Request stopDataFrameTransform(StopDataFrameTransformRequest stopRequest)
if (stopRequest.getAllowNoMatch() != null) {
request.addParameter(ALLOW_NO_MATCH, stopRequest.getAllowNoMatch().toString());
}
if (stopRequest.getWaitForCheckpoint() != null) {
request.addParameter(WAIT_FOR_CHECKPOINT, stopRequest.getWaitForCheckpoint().toString());
}
request.addParameters(params.asMap());
return request;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,21 +28,23 @@

public class StopDataFrameTransformRequest implements Validatable {

public static final String WAIT_FOR_CHECKPOINT = "wait_for_checkpoint";

private final String id;
private Boolean waitForCompletion;
private TimeValue timeout;
private Boolean allowNoMatch;
private Boolean waitForCheckpoint;

public StopDataFrameTransformRequest(String id) {
this.id = id;
waitForCompletion = null;
timeout = null;
this(id, null, null, null);
}

public StopDataFrameTransformRequest(String id, Boolean waitForCompletion, TimeValue timeout) {
public StopDataFrameTransformRequest(String id, Boolean waitForCompletion, TimeValue timeout, Boolean waitForCheckpoint) {
this.id = id;
this.waitForCompletion = waitForCompletion;
this.timeout = timeout;
this.waitForCheckpoint = waitForCheckpoint;
}

public String getId() {
Expand Down Expand Up @@ -73,6 +75,14 @@ public void setAllowNoMatch(Boolean allowNoMatch) {
this.allowNoMatch = allowNoMatch;
}

public Boolean getWaitForCheckpoint() {
return waitForCheckpoint;
}

public void setWaitForCheckpoint(Boolean waitForCheckpoint) {
this.waitForCheckpoint = waitForCheckpoint;
}

@Override
public Optional<ValidationException> validate() {
if (id == null) {
Expand All @@ -86,7 +96,7 @@ public Optional<ValidationException> validate() {

@Override
public int hashCode() {
return Objects.hash(id, waitForCompletion, timeout, allowNoMatch);
return Objects.hash(id, waitForCompletion, timeout, allowNoMatch, waitForCheckpoint);
}

@Override
Expand All @@ -102,7 +112,8 @@ public boolean equals(Object obj) {
return Objects.equals(this.id, other.id)
&& Objects.equals(this.waitForCompletion, other.waitForCompletion)
&& Objects.equals(this.timeout, other.timeout)
&& Objects.equals(this.allowNoMatch, other.allowNoMatch);
&& Objects.equals(this.allowNoMatch, other.allowNoMatch)
&& Objects.equals(this.waitForCheckpoint, other.waitForCheckpoint);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,13 @@ public void testStopDataFrameTransform() {
if (randomBoolean()) {
timeValue = TimeValue.parseTimeValue(randomTimeValue(), "timeout");
}
StopDataFrameTransformRequest stopRequest = new StopDataFrameTransformRequest(id, waitForCompletion, timeValue);

Boolean waitForCheckpoint = null;
if (randomBoolean()) {
waitForCheckpoint = randomBoolean();
}

StopDataFrameTransformRequest stopRequest = new StopDataFrameTransformRequest(id, waitForCompletion, timeValue, waitForCheckpoint);

Request request = DataFrameRequestConverters.stopDataFrameTransform(stopRequest);
assertEquals(HttpPost.METHOD_NAME, request.getMethod());
Expand All @@ -169,6 +175,13 @@ public void testStopDataFrameTransform() {
assertFalse(request.getParameters().containsKey("timeout"));
}

if (waitForCheckpoint != null) {
assertTrue(request.getParameters().containsKey("wait_for_checkpoint"));
assertEquals(stopRequest.getWaitForCheckpoint(), Boolean.parseBoolean(request.getParameters().get("wait_for_checkpoint")));
} else {
assertFalse(request.getParameters().containsKey("wait_for_checkpoint"));
}

assertFalse(request.getParameters().containsKey(ALLOW_NO_MATCH));
stopRequest.setAllowNoMatch(randomBoolean());
request = DataFrameRequestConverters.stopDataFrameTransform(stopRequest);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ private void indexData(String indexName) throws IOException {
public void cleanUpTransforms() throws Exception {
for (String transformId : transformsToClean) {
highLevelClient().dataFrame().stopDataFrameTransform(
new StopDataFrameTransformRequest(transformId, Boolean.TRUE, null), RequestOptions.DEFAULT);
new StopDataFrameTransformRequest(transformId, true, null, false), RequestOptions.DEFAULT);
}

for (String transformId : transformsToClean) {
Expand Down Expand Up @@ -310,7 +310,7 @@ public void testStartStop() throws IOException {
assertThat(taskState, oneOf(DataFrameTransformStats.State.STARTED, DataFrameTransformStats.State.INDEXING,
DataFrameTransformStats.State.STOPPING, DataFrameTransformStats.State.STOPPED));

StopDataFrameTransformRequest stopRequest = new StopDataFrameTransformRequest(id, Boolean.TRUE, null);
StopDataFrameTransformRequest stopRequest = new StopDataFrameTransformRequest(id, true, null, false);
StopDataFrameTransformResponse stopResponse =
execute(stopRequest, client::stopDataFrameTransform, client::stopDataFrameTransformAsync);
assertTrue(stopResponse.isAcknowledged());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public class DataFrameTransformDocumentationIT extends ESRestHighLevelClientTest
public void cleanUpTransforms() throws Exception {
for (String transformId : transformsToClean) {
highLevelClient().dataFrame().stopDataFrameTransform(
new StopDataFrameTransformRequest(transformId, Boolean.TRUE, TimeValue.timeValueSeconds(20)), RequestOptions.DEFAULT);
new StopDataFrameTransformRequest(transformId, true, TimeValue.timeValueSeconds(20), false), RequestOptions.DEFAULT);
}

for (String transformId : transformsToClean) {
Expand Down
5 changes: 5 additions & 0 deletions docs/reference/data-frames/apis/stop-transform.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,11 @@ are no matches or only partial matches.
state completely stops. If set to `false`, the API returns immediately and the
indexer will be stopped asynchronously in the background. Defaults to `false`.

`wait_for_checkpoint`::
(Optional, boolean) If set to `true`, the transform will not completely stop
until the current checkpoint is completed. If set to `false`, the transform
stops as soon as possible. Defaults to `true`.

[[stop-data-frame-transform-response-codes]]
==== {api-response-codes-title}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ public final class DataFrameField {
public static final ParseField GROUP_BY = new ParseField("group_by");
public static final ParseField TIMEOUT = new ParseField("timeout");
public static final ParseField WAIT_FOR_COMPLETION = new ParseField("wait_for_completion");
public static final ParseField WAIT_FOR_CHECKPOINT = new ParseField("wait_for_checkpoint");
public static final ParseField STATS_FIELD = new ParseField("stats");
public static final ParseField INDEX_DOC_TYPE = new ParseField("doc_type");
public static final ParseField SOURCE = new ParseField("source");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,18 +46,25 @@ private StopDataFrameTransformAction() {
public static class Request extends BaseTasksRequest<Request> {
private final String id;
private final boolean waitForCompletion;
private final boolean waitForCheckpoint;
private final boolean force;
private final boolean allowNoMatch;
private Set<String> expandedIds;

public Request(String id, boolean waitForCompletion, boolean force, @Nullable TimeValue timeout, boolean allowNoMatch) {
public Request(String id,
boolean waitForCompletion,
boolean force,
@Nullable TimeValue timeout,
boolean allowNoMatch,
boolean waitForCheckpoint) {
this.id = ExceptionsHelper.requireNonNull(id, DataFrameField.ID.getPreferredName());
this.waitForCompletion = waitForCompletion;
this.force = force;

// use the timeout value already present in BaseTasksRequest
this.setTimeout(timeout == null ? DEFAULT_TIMEOUT : timeout);
this.allowNoMatch = allowNoMatch;
this.waitForCheckpoint = waitForCheckpoint;
}

public Request(StreamInput in) throws IOException {
Expand All @@ -73,6 +80,11 @@ public Request(StreamInput in) throws IOException {
} else {
this.allowNoMatch = true;
}
if (in.getVersion().onOrAfter(Version.V_8_0_0)) {
this.waitForCheckpoint = in.readBoolean();
} else {
this.waitForCheckpoint = true;
}
}

public String getId() {
Expand All @@ -99,6 +111,10 @@ public boolean isAllowNoMatch() {
return allowNoMatch;
}

public boolean isWaitForCheckpoint() {
return waitForCheckpoint;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
Expand All @@ -113,6 +129,9 @@ public void writeTo(StreamOutput out) throws IOException {
if (out.getVersion().onOrAfter(Version.V_7_3_0)) {
out.writeBoolean(allowNoMatch);
}
if (out.getVersion().onOrAfter(Version.V_8_0_0)) {
out.writeBoolean(waitForCheckpoint);
}
}

@Override
Expand All @@ -123,7 +142,7 @@ public ActionRequestValidationException validate() {
@Override
public int hashCode() {
// the base class does not implement hashCode, therefore we need to hash timeout ourselves
return Objects.hash(id, waitForCompletion, force, expandedIds, this.getTimeout(), allowNoMatch);
return Objects.hash(id, waitForCompletion, force, expandedIds, this.getTimeout(), allowNoMatch, waitForCheckpoint);
}

@Override
Expand All @@ -146,7 +165,8 @@ public boolean equals(Object obj) {
Objects.equals(waitForCompletion, other.waitForCompletion) &&
Objects.equals(force, other.force) &&
Objects.equals(expandedIds, other.expandedIds) &&
allowNoMatch == other.allowNoMatch;
allowNoMatch == other.allowNoMatch &&
waitForCheckpoint == other.waitForCheckpoint;
}

@Override
Expand All @@ -157,7 +177,6 @@ public boolean match(Task task) {
return expandedIds.contains(id);
}
}

return false;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ public class DataFrameTransformState implements Task.Status, PersistentTaskState
@Nullable
private NodeAttributes node;

// TODO: 8.x this needs to be deprecated and we move towards a STOPPING TASK_STATE
private boolean shouldStopAtCheckpoint;

public static final ParseField TASK_STATE = new ParseField("task_state");
public static final ParseField INDEXER_STATE = new ParseField("indexer_state");

Expand All @@ -54,6 +57,7 @@ public class DataFrameTransformState implements Task.Status, PersistentTaskState
public static final ParseField PROGRESS = new ParseField("progress");
public static final ParseField NODE = new ParseField("node");


@SuppressWarnings("unchecked")
public static final ConstructingObjectParser<DataFrameTransformState, Void> PARSER = new ConstructingObjectParser<>(NAME,
true,
Expand Down Expand Up @@ -93,14 +97,26 @@ public DataFrameTransformState(DataFrameTransformTaskState taskState,
long checkpoint,
@Nullable String reason,
@Nullable DataFrameTransformProgress progress,
@Nullable NodeAttributes node) {
@Nullable NodeAttributes node,
boolean shouldStopAtCheckpoint) {
this.taskState = taskState;
this.indexerState = indexerState;
this.position = position;
this.checkpoint = checkpoint;
this.reason = reason;
this.progress = progress;
this.node = node;
this.shouldStopAtCheckpoint = shouldStopAtCheckpoint;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not included in the X-Content representation. Is there a good reason for that?

I guess it's because we don't want this to go into the version of this object that gets persisted to the index as part of a DataFrameTransformStoredDoc? But omitting it from the X-Content representation also means it won't survive in cluster state if there's a full cluster restart.

There are other comments saying that in 8.x we want to replace this with a STOPPING enum value. But that would be persisted both in the DataFrameTransformStoredDoc and in the on-disk version of the cluster state. So there's an inconsistency here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@droberts195
If the clusterstate stores itself as XContent, how does it know how to deserialize the objects?

Also, if we are going to store this in the index, we may just want to bite the bullet and figure out how to ONLY store it in the index.

}

public DataFrameTransformState(DataFrameTransformTaskState taskState,
IndexerState indexerState,
@Nullable DataFrameIndexerPosition position,
long checkpoint,
@Nullable String reason,
@Nullable DataFrameTransformProgress progress,
@Nullable NodeAttributes node) {
this(taskState, indexerState, position, checkpoint, reason, progress, node, false);
}

public DataFrameTransformState(DataFrameTransformTaskState taskState,
Expand Down Expand Up @@ -129,6 +145,9 @@ public DataFrameTransformState(StreamInput in) throws IOException {
} else {
node = null;
}
if (in.getVersion().onOrAfter(Version.V_8_0_0)) {
shouldStopAtCheckpoint = in.readBoolean();
}
}

public DataFrameTransformTaskState getTaskState() {
Expand Down Expand Up @@ -164,6 +183,14 @@ public DataFrameTransformState setNode(NodeAttributes node) {
return this;
}

public boolean shouldStopAtCheckpoint() {
return shouldStopAtCheckpoint;
}

public void setShouldStopAtCheckoint(boolean shouldStopAtCheckpoint) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Objects that are stored in the cluster state are supposed to be immutable. This class was already breaching that rule for node. It doesn't cause a problem given the way it's used because DataFrameTransformState objects are deconstructed in the constructor of DataFrameTransformTask and newly constructed in DataFrameTransformTask.getState(). So the only long term DataFrameTransformState objects are in the cluster state itself. But I do wonder whether we should make this more idiomatic now as it seems like a problem waiting to happen when a future maintainer doesn't realise all the subtleties that are going on here. Instead DataFrameTransformState could have a builder that can be initialized with an existing object, then have one field changed and then build a new object. Alternatively it could have a copy constructor that allows everything to be copied except node or shouldStopAtCheckpoint, although now there are two fields that might need to be overridden a builder is probably better. Alternatively since DataFrameTransformTask.getState() constructs a new object that could have an overload that lets you specify the bits you want to be different from the current state.

The reason it's dangerous to have a mutable object in the cluster state is this:

  1. You have a reference to an object of a type that's stored in the cluster state
  2. You update that object
  3. You know that it needs to be updated in the cluster state of all nodes, so pass the updated object to the cluster state update API to do that
  4. The cluster state update API receives your change request, checks to see if the local cluster state has changed, and only if so broadcasts the change to all nodes

The problem arises if the reference in step 1 referred to the actual object in the local cluster state. If it did then the check for changes in step 4 won't find any changes because when you updated your object that was of a type that's stored in the cluster state it actually did update the local cluster state. This then leads to the cluster state of the current node being different to the cluster state of all the other nodes, and you'll never find out from testing in a single node cluster.

this.shouldStopAtCheckpoint = shouldStopAtCheckpoint;
}

public static DataFrameTransformState fromXContent(XContentParser parser) {
try {
return PARSER.parse(parser, null);
Expand Down Expand Up @@ -214,6 +241,9 @@ public void writeTo(StreamOutput out) throws IOException {
if (out.getVersion().onOrAfter(Version.V_7_3_0)) {
out.writeOptionalWriteable(node);
}
if (out.getVersion().onOrAfter(Version.V_8_0_0)) {
out.writeBoolean(shouldStopAtCheckpoint);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import java.util.Locale;

public enum DataFrameTransformTaskState implements Writeable {
// TODO 8.x add a `STOPPING` state and BWC handling in ::fromString
STOPPED, STARTED, FAILED;

public static DataFrameTransformTaskState fromString(String name) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,12 @@ public class StopDataFrameTransformActionRequestTests extends AbstractWireSerial
@Override
protected Request createTestInstance() {
TimeValue timeout = randomBoolean() ? TimeValue.timeValueMinutes(randomIntBetween(1, 10)) : null;
Request request = new Request(randomAlphaOfLengthBetween(1, 10), randomBoolean(), randomBoolean(), timeout, randomBoolean());
Request request = new Request(randomAlphaOfLengthBetween(1, 10),
randomBoolean(),
randomBoolean(),
timeout,
randomBoolean(),
randomBoolean());
if (randomBoolean()) {
request.setExpandedIds(new HashSet<>(Arrays.asList(generateRandomStringArray(5, 6, false))));
}
Expand All @@ -41,9 +46,10 @@ public void testSameButDifferentTimeout() {
boolean waitForCompletion = randomBoolean();
boolean force = randomBoolean();
boolean allowNoMatch = randomBoolean();
boolean waitForCheckpoint = randomBoolean();

Request r1 = new Request(id, waitForCompletion, force, TimeValue.timeValueSeconds(10), allowNoMatch);
Request r2 = new Request(id, waitForCompletion, force, TimeValue.timeValueSeconds(20), allowNoMatch);
Request r1 = new Request(id, waitForCompletion, force, TimeValue.timeValueSeconds(10), allowNoMatch, waitForCheckpoint);
Request r2 = new Request(id, waitForCompletion, force, TimeValue.timeValueSeconds(20), allowNoMatch, waitForCheckpoint);

assertNotEquals(r1,r2);
assertNotEquals(r1.hashCode(),r2.hashCode());
Expand All @@ -56,11 +62,11 @@ public void testMatch() {
DataFrameField.PERSISTENT_TASK_DESCRIPTION_PREFIX + dataFrameId,
TaskId.EMPTY_TASK_ID, Collections.emptyMap());

Request request = new Request("unrelated", false, false, null, false);
Request request = new Request("unrelated", false, false, null, false, false);
request.setExpandedIds(Set.of("foo", "bar"));
assertFalse(request.match(dataFrameTask));

Request matchingRequest = new Request(dataFrameId, false, false, null, false);
Request matchingRequest = new Request(dataFrameId, false, false, null, false, false);
matchingRequest.setExpandedIds(Set.of(dataFrameId));
assertTrue(matchingRequest.match(dataFrameTask));

Expand Down