Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ML][Data Frame] add support for wait_for_checkpoint flag on _stop API #45469

Conversation

benwtrent
Copy link
Member

This adds a new flag wait_for_checkpoint for the _stop transform API.

The behavior is as follows:

  • If the indexer is running, the transform will not stop until the next checkpoint is reached
  • If the indexer is NOT running, the transform will stop immediately
  • If the indexer is failed (or fails after the call is made), the task is still marked as failed, and the user will have to attempt to stop it with force=true.

Implementation details:

  • I opted for storing this flag and state in the cluster state as the index did not make sense when we considering the race conditions around a user calling _stop right when an doSaveState action is being called as the persisted state MAY be overwritten.
  • Not utilizing a STOPPING state since the enumerations are stored in the index, and attempting to read in a new value via the XContentParser on an old node would blow up.

closes #45293

@elasticmachine
Copy link
Collaborator

Pinging @elastic/ml-core

listener.onResponse(new StopDataFrameTransformAction.Response(Boolean.TRUE));
// To cover strange state race conditions, we adjust the variable first (which writes to cluster state if it is different)
// then we stop the transform
transformTask.setShouldStopAtCheckpoint(request.isWaitForCheckpoint(), ActionListener.wrap(
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the default value for wait_for_checkpoint is true and folks could call this against _all transforms, this could get pretty expensive as there will be a cluster state update for each individual task.

Do we want to reconsider the default value of true or do we think this behavior is acceptable?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We already do a cluster state update for each transform when _all are stopped, because removing each persistent task is a cluster state update.

With this change, if there are 50 started transforms then we'll go from doing 50 cluster state updates when stopping _all to 100: 50 to set the flags and 50 later on to remove the tasks. So it's not like we're going from being completely independent of cluster state to suddenly creating a large cluster state update load.

One thing we could consider if we think most people will stick with the default of wait_for_checkpoint: true is default this variable to true when the first DataFrameTransformState is created when the persistent task is first created. Then it won't need updating in the common case, only if people have overridden wait_for_checkpoint: false.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The difficulty with having it default to true within the task, is now we will need two booleans:

  • one that indicates that when we stop, we should wait for the checkpoint
  • one that indicates that we are stopping

Because checkpoints are completed when onFinish is called, we need a way to determine that we are "stopping" and the task should complete. In this PR, we are relying on the flag being set to indicate that the task should stop when onFinish is completed.

@@ -237,6 +240,10 @@ public synchronized void start(Long startingCheckpoint, ActionListener<Response>
getTransformId()));
return;
}
if (shouldStopAtCheckpoint) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't want start to be called if we are shouldStopAtCheckpoint.

Honestly, calling start here is pretty harmless, but I think it is better to throw to alert the user that "hey, we are stopping this thing soon".

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This covers the case where the DF is stopping with wait_for_completion=true then someone tries to start it again via the API?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes @davidkyle , as I said, calling start here should not cause problems, but if the user is attempting to start it after somebody already called _stop?wait_for_checkpoint=true, I think we should throw an error indicating that we are stopping

@@ -288,15 +323,19 @@ public synchronized void stop() {
return;
}

if (getIndexer().getState() == IndexerState.STOPPED) {
if (getIndexer().getState() == IndexerState.STOPPED || getIndexer().getState() == IndexerState.STOPPING) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should have always been here, we should not allow the execution path to continue if we are already STOPPING

@@ -313,6 +352,18 @@ public synchronized void triggered(Event event) {
}

logger.debug("Data frame indexer [{}] schedule has triggered, state: [{}]", event.getJobName(), getIndexer().getState());
// If we are failed, don't trigger
if (taskState.get() == DataFrameTransformTaskState.FAILED) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Having these checks earlier (they are done lower down in the indexer as well) allow for simpler reasoning around what is done within these synchronized methods of stop, start, and triggered

@@ -824,6 +881,7 @@ protected void onFinish(ActionListener<Void> listener) {
nextCheckpoint = null;
// Reset our failure count as we have finished and may start again with a new checkpoint
failureCount.set(0);
transformTask.stateReason.set(null);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we are finishing, the state reason should be cleared out regardless.

@benwtrent
Copy link
Member Author

run elasticsearch-ci/bwc
run elasticsearch-ci/default-distro

@benwtrent
Copy link
Member Author

@elasticmachine update branch

Copy link
Contributor

@droberts195 droberts195 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there are quite a few tricky issues to think about with this. I left a few comments. Maybe we also need to discuss it further as a group on the weekly call.

listener.onResponse(new StopDataFrameTransformAction.Response(Boolean.TRUE));
// To cover strange state race conditions, we adjust the variable first (which writes to cluster state if it is different)
// then we stop the transform
transformTask.setShouldStopAtCheckpoint(request.isWaitForCheckpoint(), ActionListener.wrap(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We already do a cluster state update for each transform when _all are stopped, because removing each persistent task is a cluster state update.

With this change, if there are 50 started transforms then we'll go from doing 50 cluster state updates when stopping _all to 100: 50 to set the flags and 50 later on to remove the tasks. So it's not like we're going from being completely independent of cluster state to suddenly creating a large cluster state update load.

One thing we could consider if we think most people will stick with the default of wait_for_checkpoint: true is default this variable to true when the first DataFrameTransformState is created when the persistent task is first created. Then it won't need updating in the common case, only if people have overridden wait_for_checkpoint: false.

return shouldStopAtCheckpoint;
}

public void setShouldStopAtCheckoint(boolean shouldStopAtCheckpoint) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Objects that are stored in the cluster state are supposed to be immutable. This class was already breaching that rule for node. It doesn't cause a problem given the way it's used because DataFrameTransformState objects are deconstructed in the constructor of DataFrameTransformTask and newly constructed in DataFrameTransformTask.getState(). So the only long term DataFrameTransformState objects are in the cluster state itself. But I do wonder whether we should make this more idiomatic now as it seems like a problem waiting to happen when a future maintainer doesn't realise all the subtleties that are going on here. Instead DataFrameTransformState could have a builder that can be initialized with an existing object, then have one field changed and then build a new object. Alternatively it could have a copy constructor that allows everything to be copied except node or shouldStopAtCheckpoint, although now there are two fields that might need to be overridden a builder is probably better. Alternatively since DataFrameTransformTask.getState() constructs a new object that could have an overload that lets you specify the bits you want to be different from the current state.

The reason it's dangerous to have a mutable object in the cluster state is this:

  1. You have a reference to an object of a type that's stored in the cluster state
  2. You update that object
  3. You know that it needs to be updated in the cluster state of all nodes, so pass the updated object to the cluster state update API to do that
  4. The cluster state update API receives your change request, checks to see if the local cluster state has changed, and only if so broadcasts the change to all nodes

The problem arises if the reference in step 1 referred to the actual object in the local cluster state. If it did then the check for changes in step 4 won't find any changes because when you updated your object that was of a type that's stored in the cluster state it actually did update the local cluster state. This then leads to the cluster state of the current node being different to the cluster state of all the other nodes, and you'll never find out from testing in a single node cluster.

this.taskState = taskState;
this.indexerState = indexerState;
this.position = position;
this.checkpoint = checkpoint;
this.reason = reason;
this.progress = progress;
this.node = node;
this.shouldStopAtCheckpoint = shouldStopAtCheckpoint;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not included in the X-Content representation. Is there a good reason for that?

I guess it's because we don't want this to go into the version of this object that gets persisted to the index as part of a DataFrameTransformStoredDoc? But omitting it from the X-Content representation also means it won't survive in cluster state if there's a full cluster restart.

There are other comments saying that in 8.x we want to replace this with a STOPPING enum value. But that would be persisted both in the DataFrameTransformStoredDoc and in the on-disk version of the cluster state. So there's an inconsistency here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@droberts195
If the clusterstate stores itself as XContent, how does it know how to deserialize the objects?

Also, if we are going to store this in the index, we may just want to bite the bullet and figure out how to ONLY store it in the index.

Copy link
Member

@davidkyle davidkyle left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is tricky code to reason about I will probably need another review. Looks ok, left a few comments.

Regarding the default wait_for_checkpoint: true, it makes sense as the default behaviour but it does significantly change how stop works for continuous DFs. The change is for the best so I'm +1 to it

@@ -298,7 +307,7 @@ protected void createReviewsIndex(String indexName, int numDocs) throws Exceptio
.append("\"}");
bulk.add(new IndexRequest().source(sourceBuilder.toString(), XContentType.JSON));

if (i % 50 == 0) {
if (i % 100 == 0) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 this could probably be 1000

assertTrue(startDataFrameTransform(config.getId(), RequestOptions.DEFAULT).isAcknowledged());

// waitForCheckpoint: true should make the transform continue until we hit the first checkpoint, then it will stop
stopDataFrameTransform(transformId, false, null, true);
Copy link
Member

@davidkyle davidkyle Aug 13, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if the checkpoint is hit before this stop request lands. What happens if we are using a continuous DF which is at checkpoint but there is no new data (getIndexer().sourceHasChanged() == false), how does the DF get from STOPPING to STOPPED

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@davidkyle you are 100% correct, there is a bad race condition here that could cause the transform to just stay running after the user called _stop.

  1. onFinish is called and has made it past the if (transformTask.shouldStopAtCheckpoint) { check
  2. The flag is set to true by the user
  3. stop is called and processed, but since the state is INDEXING nothing is done
  4. onFinish completes sets the state to STARTED

The trick here is that the indexer transitions to STARTED and can get triggered again even off of failures. I think this also shows a bug in how we handle triggers to begin with. If we have not completed a checkpoint, I am not sure we should even check for changes against the indices more than once per checkpoint...

Let me mull this over

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think something will have to be added to doSaveState to handle this race condition.

When doSaveState is called after onFinish the indexer state is then STARTED. If the indexer state is STARTED and shouldStopAtCheckpoint == true that should give some indication of the desired behavior. Though, this may cause other issues as doSaveState is called when we hit an intermittent failure :(.

More careful thought is necessary for this one.

@@ -237,6 +240,10 @@ public synchronized void start(Long startingCheckpoint, ActionListener<Response>
getTransformId()));
return;
}
if (shouldStopAtCheckpoint) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This covers the case where the DF is stopping with wait_for_completion=true then someone tries to start it again via the API?

@benwtrent benwtrent removed the v7.4.0 label Aug 28, 2019
@@ -361,8 +411,10 @@ public synchronized void triggered(Event event) {
return;
}

if (taskState.get() == DataFrameTransformTaskState.FAILED) {
logger.debug("[{}] schedule was triggered for transform but task is failed. Ignoring trigger.", getTransformId());
if (taskState.get() == DataFrameTransformTaskState.FAILED || taskState.get() == DataFrameTransformTaskState.STOPPED) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

having a trigger occur when taskState.get() == DataFrameTransformTaskState.STOPPED is not really possible. We don't ever transition this state to STOPPED in the task any longer. I put this check in as insurance as we should not trigger on a stopped task.

// Since we save the state to an index, we should make sure that our task state is in parity with the indexer state
if (indexerState.equals(IndexerState.STOPPED)) {
transformTask.setTaskStateStopped();
}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed this transition to STOPPED for the task state as there was no need, and it opened a window for start to be called again while we were in the middle of completing doSaveState while stopping.

@benwtrent
Copy link
Member Author

This work is essentially blocked by #46156 . Much of the boilerplate will stay the same, but we will be handling state differently. If we have optimistic concurrency protection for doSaveState we don't really have to use ClusterState for this flag. We could simply error out and tell the user to try again if the update does not occur.

@benwtrent benwtrent closed this Oct 11, 2019
@benwtrent benwtrent deleted the feature/ml-df-add-wait_for_checkpoint-flag branch October 11, 2019 17:05
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Add the option to wait until the next checkpoint when stopping a transform
5 participants