Skip to content

Commit

Permalink
[Transform] Make force-stopping the transform always remove persisten…
Browse files Browse the repository at this point in the history
…t task from cluster state (#106989)
  • Loading branch information
przemekwitek committed Apr 10, 2024
1 parent 6507ba5 commit e21f2e3
Show file tree
Hide file tree
Showing 8 changed files with 177 additions and 77 deletions.
7 changes: 7 additions & 0 deletions docs/changelog/106989.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
pr: 106989
summary: Make force-stopping the transform always remove persistent task from cluster
state
area: Transform
type: bug
issues:
- 106811
Original file line number Diff line number Diff line change
Expand Up @@ -241,38 +241,39 @@ public void testTransformLifecycleInALoop() throws Exception {
long sleepAfterStartMillis = randomLongBetween(0, 5_000);
boolean force = randomBoolean();
try {
// Create the continuous transform
// Create the continuous transform.
putTransform(transformId, config, RequestOptions.DEFAULT);
assertThat(getTransformTasks(), is(empty()));
assertThat(getTransformTasksFromClusterState(transformId), is(empty()));

startTransform(transformId, RequestOptions.DEFAULT);
// There is 1 transform task after start
// There is 1 transform task after start.
assertThat(getTransformTasks(), hasSize(1));
assertThat(getTransformTasksFromClusterState(transformId), hasSize(1));

Thread.sleep(sleepAfterStartMillis);
// There should still be 1 transform task as the transform is continuous
// There should still be 1 transform task as the transform is continuous.
assertThat(getTransformTasks(), hasSize(1));
assertThat(getTransformTasksFromClusterState(transformId), hasSize(1));

// Stop the transform with force set randomly
// Stop the transform with force set randomly.
stopTransform(transformId, true, null, false, force);
// After the transform is stopped, there should be no transform task left
assertThat(getTransformTasks(), is(empty()));
if (force) {
// If the "force" has been used, then the persistent task is removed from the cluster state but the local task can still
// be seen by the PersistentTasksNodeService. We need to wait until PersistentTasksNodeService reconciles the state.
assertBusy(() -> assertThat(getTransformTasks(), is(empty())));
} else {
// If the "force" hasn't been used then we can expect the local task to be already gone.
assertThat(getTransformTasks(), is(empty()));
}
// After the transform is stopped, there should be no transform task left in the cluster state.
assertThat(getTransformTasksFromClusterState(transformId), is(empty()));

// Delete the transform
deleteTransform(transformId);
} catch (AssertionError | Exception e) {
throw new AssertionError(
format(
"Failure at iteration %d (sleepAfterStartMillis=%s,force=%s): %s",
i,
sleepAfterStartMillis,
force,
e.getMessage()
),
format("Failure at iteration %d (sleepAfterStart=%sms,force=%s): %s", i, sleepAfterStartMillis, force, e.getMessage()),
e
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,6 @@

public abstract class TransformRestTestCase extends TransformCommonRestTestCase {

protected static final String AUTH_KEY = "Authorization";
protected static final String SECONDARY_AUTH_KEY = "es-secondary-authorization";

private final Set<String> createdTransformIds = new HashSet<>();

protected void cleanUp() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import org.elasticsearch.client.Request;
import org.elasticsearch.client.ResponseException;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.elasticsearch.core.Strings;
import org.elasticsearch.xpack.core.transform.TransformField;
import org.elasticsearch.xpack.core.transform.transforms.persistence.TransformInternalIndexConstants;

Expand All @@ -19,6 +18,7 @@
import java.util.Map;
import java.util.concurrent.TimeUnit;

import static org.elasticsearch.core.Strings.format;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
Expand Down Expand Up @@ -86,10 +86,10 @@ public void testTaskRemovalAfterInternalIndexGotDeleted() throws Exception {
deleteTransform(transformId);
}

public void testCreateAndDeleteTransformInALoop() throws IOException {
public void testBatchTransformLifecycltInALoop() throws IOException {
createReviewsIndex();

String transformId = "test_create_and_delete_in_a_loop";
String transformId = "test_batch_lifecycle_in_a_loop";
String destIndex = transformId + "-dest";
for (int i = 0; i < 100; ++i) {
try {
Expand All @@ -108,7 +108,48 @@ public void testCreateAndDeleteTransformInALoop() throws IOException {
// Delete the transform
deleteTransform(transformId);
} catch (AssertionError | Exception e) {
fail("Failure at iteration " + i + ": " + e.getMessage());
throw new AssertionError(format("Failure at iteration %d: %s", i, e.getMessage()), e);
}
}
}

public void testContinuousTransformLifecycleInALoop() throws Exception {
createReviewsIndex();

String transformId = "test_cont_lifecycle_in_a_loop";
String destIndex = transformId + "-dest";
for (int i = 0; i < 100; ++i) {
long sleepAfterStartMillis = randomLongBetween(0, 5_000);
boolean force = randomBoolean();
try {
// Create the continuous transform.
createContinuousPivotReviewsTransform(transformId, destIndex, null);
assertThat(getTransformTasks(), is(empty()));
assertThat(getTransformTasksFromClusterState(transformId), is(empty()));

startTransform(transformId);
// There is 1 transform task after start.
assertThat(getTransformTasks(), hasSize(1));
assertThat(getTransformTasksFromClusterState(transformId), hasSize(1));

Thread.sleep(sleepAfterStartMillis);
// There should still be 1 transform task as the transform is continuous.
assertThat(getTransformTasks(), hasSize(1));
assertThat(getTransformTasksFromClusterState(transformId), hasSize(1));

// Stop the transform with force set randomly.
stopTransform(transformId, force);
// After the transform is stopped, there should be no transform task left.
assertThat(getTransformTasks(), is(empty()));
assertThat(getTransformTasksFromClusterState(transformId), is(empty()));

// Delete the transform.
deleteTransform(transformId);
} catch (AssertionError | Exception e) {
throw new AssertionError(
format("Failure at iteration %d (sleepAfterStart=%sms,force=%s): %s", i, sleepAfterStartMillis, force, e.getMessage()),
e
);
}
}
}
Expand Down Expand Up @@ -168,7 +209,7 @@ private void beEvilAndDeleteTheTransformIndex() throws IOException {
}

private static String createConfig(String sourceIndex, String destIndex) {
return Strings.format("""
return format("""
{
"source": {
"index": "%s"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ public void testForceStopFailedTransform() throws Exception {
createContinuousPivotReviewsTransform(transformId, transformIndex, null);

assertThat(getTransformTasks(), is(empty()));
assertThat(getTransformTasksFromClusterState(transformId), is(empty()));

startTransform(transformId);
awaitState(transformId, TransformStats.State.FAILED);
Expand All @@ -78,6 +79,7 @@ public void testForceStopFailedTransform() throws Exception {
assertThat((String) XContentMapValues.extractValue("reason", fullState), startsWith(failureReason));

assertThat(getTransformTasks(), hasSize(1));
assertThat(getTransformTasksFromClusterState(transformId), hasSize(1));

// verify that we cannot stop a failed transform
ResponseException ex = expectThrows(ResponseException.class, () -> stopTransform(transformId, false));
Expand All @@ -99,6 +101,7 @@ public void testForceStopFailedTransform() throws Exception {
assertThat(XContentMapValues.extractValue("reason", fullState), is(nullValue()));

assertThat(getTransformTasks(), is(empty()));
assertThat(getTransformTasksFromClusterState(transformId), is(empty()));
}

public void testForceResetFailedTransform() throws Exception {
Expand All @@ -109,6 +112,7 @@ public void testForceResetFailedTransform() throws Exception {
createContinuousPivotReviewsTransform(transformId, transformIndex, null);

assertThat(getTransformTasks(), is(empty()));
assertThat(getTransformTasksFromClusterState(transformId), is(empty()));

startTransform(transformId);
awaitState(transformId, TransformStats.State.FAILED);
Expand All @@ -122,6 +126,7 @@ public void testForceResetFailedTransform() throws Exception {
assertThat((String) XContentMapValues.extractValue("reason", fullState), startsWith(failureReason));

assertThat(getTransformTasks(), hasSize(1));
assertThat(getTransformTasksFromClusterState(transformId), hasSize(1));

// verify that we cannot reset a failed transform
ResponseException ex = expectThrows(ResponseException.class, () -> resetTransform(transformId, false));
Expand All @@ -135,6 +140,7 @@ public void testForceResetFailedTransform() throws Exception {
resetTransform(transformId, true);

assertThat(getTransformTasks(), is(empty()));
assertThat(getTransformTasksFromClusterState(transformId), is(empty()));
}

public void testStartFailedTransform() throws Exception {
Expand All @@ -145,6 +151,7 @@ public void testStartFailedTransform() throws Exception {
createContinuousPivotReviewsTransform(transformId, transformIndex, null);

assertThat(getTransformTasks(), is(empty()));
assertThat(getTransformTasksFromClusterState(transformId), is(empty()));

startTransform(transformId);
awaitState(transformId, TransformStats.State.FAILED);
Expand All @@ -158,6 +165,7 @@ public void testStartFailedTransform() throws Exception {
assertThat((String) XContentMapValues.extractValue("reason", fullState), startsWith(failureReason));

assertThat(getTransformTasks(), hasSize(1));
assertThat(getTransformTasksFromClusterState(transformId), hasSize(1));

var expectedFailure = "Unable to start transform [test-force-start-failed-transform] "
+ "as it is in a failed state. Use force stop and then restart the transform once error is resolved. More details: ["
Expand All @@ -172,6 +180,7 @@ public void testStartFailedTransform() throws Exception {
stopTransform(transformId, true);

assertThat(getTransformTasks(), is(empty()));
assertThat(getTransformTasksFromClusterState(transformId), is(empty()));
}

private void awaitState(String transformId, TransformStats.State state) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,18 +164,23 @@ protected void doExecute(Task task, Request request, ActionListener<Response> li
state
);

final ActionListener<Response> doExecuteListener;
if (transformNodeAssignments.getWaitingForAssignment().size() > 0) {
doExecuteListener = cancelTransformTasksWithNoAssignment(finalListener, transformNodeAssignments);
} else {
doExecuteListener = finalListener;
}
final ActionListener<Response> doExecuteListener = cancelTransformTasksListener(
transformNodeAssignments.getWaitingForAssignment(),
finalListener
);

if (transformNodeAssignments.getExecutorNodes().size() > 0) {
if (request.isForce()) {
// When force==true, we **do not** fan out to individual tasks (i.e. taskOperation method will not be called) as we
// want to make sure that the persistent tasks will be removed from cluster state even if these tasks are no longer
// visible by the PersistentTasksService.
cancelTransformTasksListener(transformNodeAssignments.getAssigned(), doExecuteListener).onResponse(
new Response(true)
);
} else if (transformNodeAssignments.getExecutorNodes().isEmpty()) {
doExecuteListener.onResponse(new Response(true));
} else {
request.setNodes(transformNodeAssignments.getExecutorNodes().toArray(new String[0]));
super.doExecute(task, request, doExecuteListener);
} else {
doExecuteListener.onResponse(new Response(true));
}
}, e -> {
if (e instanceof ResourceNotFoundException) {
Expand All @@ -189,13 +194,10 @@ protected void doExecute(Task task, Request request, ActionListener<Response> li
listener.onFailure(e);
// found transforms without a config
} else if (request.isForce()) {
final ActionListener<Response> doExecuteListener;

if (transformNodeAssignments.getWaitingForAssignment().size() > 0) {
doExecuteListener = cancelTransformTasksWithNoAssignment(finalListener, transformNodeAssignments);
} else {
doExecuteListener = finalListener;
}
final ActionListener<Response> doExecuteListener = cancelTransformTasksListener(
transformNodeAssignments.getWaitingForAssignment(),
finalListener
);

if (transformNodeAssignments.getExecutorNodes().size() > 0) {
request.setExpandedIds(transformNodeAssignments.getAssigned());
Expand Down Expand Up @@ -235,28 +237,13 @@ protected void taskOperation(
TransformTask transformTask,
ActionListener<Response> listener
) {

Set<String> ids = request.getExpandedIds();
if (ids == null) {
listener.onFailure(new IllegalStateException("Request does not have expandedIds set"));
return;
}

if (ids.contains(transformTask.getTransformId())) {
if (request.isForce()) {
// If force==true, we skip the additional step (setShouldStopAtCheckpoint) and move directly to shutting down the task.
// This way we ensure that the persistent task is removed ASAP (as opposed to being removed in one of the listeners).
try {
// Here the task is deregistered in scheduler and marked as completed in persistent task service.
transformTask.shutdown();
// Here the indexer is aborted so that its thread finishes work ASAP.
transformTask.onCancelled();
listener.onResponse(new Response(true));
} catch (ElasticsearchException ex) {
listener.onFailure(ex);
}
return;
}
// move the call to the generic thread pool, so we do not block the network thread
threadPool.generic().execute(() -> {
transformTask.setShouldStopAtCheckpoint(request.isWaitForCheckpoint(), ActionListener.wrap(r -> {
Expand Down Expand Up @@ -306,7 +293,6 @@ protected StopTransformAction.Response newResponse(
}

private ActionListener<Response> waitForStopListener(Request request, ActionListener<Response> listener) {

ActionListener<Response> onStopListener = ActionListener.wrap(
waitResponse -> transformConfigManager.refresh(ActionListener.wrap(r -> listener.onResponse(waitResponse), e -> {
if ((ExceptionsHelper.unwrapCause(e) instanceof IndexNotFoundException) == false) {
Expand Down Expand Up @@ -393,6 +379,7 @@ private void waitForTransformStopped(
) {
// This map is accessed in the predicate and the listener callbacks
final Map<String, ElasticsearchException> exceptions = new ConcurrentHashMap<>();

persistentTasksService.waitForPersistentTasksCondition(persistentTasksCustomMetadata -> {
if (persistentTasksCustomMetadata == null) {
return true;
Expand Down Expand Up @@ -501,34 +488,38 @@ private void waitForTransformStopped(
}));
}

private ActionListener<Response> cancelTransformTasksWithNoAssignment(
final ActionListener<Response> finalListener,
final TransformNodeAssignments transformNodeAssignments
/**
* Creates and returns the listener that sends remove request for every task in the given set.
*
* @param transformTasks set of transform tasks that should be removed
* @param finalListener listener that should be called once all the given tasks are removed
* @return listener that removes given tasks in parallel
*/
private ActionListener<Response> cancelTransformTasksListener(
final Set<String> transformTasks,
final ActionListener<Response> finalListener
) {
final ActionListener<Response> doExecuteListener = ActionListener.wrap(response -> {
if (transformTasks.isEmpty()) {
return finalListener;
}
return ActionListener.wrap(response -> {
GroupedActionListener<PersistentTask<?>> groupedListener = new GroupedActionListener<>(
transformNodeAssignments.getWaitingForAssignment().size(),
ActionListener.wrap(r -> {
finalListener.onResponse(response);
}, finalListener::onFailure)
transformTasks.size(),
ActionListener.wrap(r -> finalListener.onResponse(response), finalListener::onFailure)
);

for (String unassignedTaskId : transformNodeAssignments.getWaitingForAssignment()) {
persistentTasksService.sendRemoveRequest(unassignedTaskId, null, groupedListener);
for (String taskId : transformTasks) {
persistentTasksService.sendRemoveRequest(taskId, null, groupedListener);
}

}, e -> {
GroupedActionListener<PersistentTask<?>> groupedListener = new GroupedActionListener<>(
transformNodeAssignments.getWaitingForAssignment().size(),
ActionListener.wrap(r -> {
finalListener.onFailure(e);
}, finalListener::onFailure)
transformTasks.size(),
ActionListener.wrap(r -> finalListener.onFailure(e), finalListener::onFailure)
);

for (String unassignedTaskId : transformNodeAssignments.getWaitingForAssignment()) {
persistentTasksService.sendRemoveRequest(unassignedTaskId, null, groupedListener);
for (String taskId : transformTasks) {
persistentTasksService.sendRemoveRequest(taskId, null, groupedListener);
}
});
return doExecuteListener;
}
}

0 comments on commit e21f2e3

Please sign in to comment.