Skip to content

Commit

Permalink
[7.13] Always handle non-complete AsyncActionStep execution (#73796) (#…
Browse files Browse the repository at this point in the history
…73853)

This commit ensures that `IndexLifecycleRunner` will always handle the execution of an
`AsyncActionStep`, regardless of it's returned status. In the event that it does not complete, the
index will move into the ERROR step with an exception, rather than being stuck in the step without a
way to execute the AsyncActionStep.

This also updates `CreateSnapshotStep` and `RolloverStep` to call `onFailure` with more helpful
exceptions rather than potentially returning `false` to the runner.

Resolves #73794
  • Loading branch information
dakrone committed Jun 7, 2021
1 parent 11064f9 commit 7ad3cdd
Show file tree
Hide file tree
Showing 4 changed files with 96 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotRequest;
import org.elasticsearch.client.Client;
Expand All @@ -16,6 +17,8 @@
import org.elasticsearch.common.Strings;
import org.elasticsearch.snapshots.SnapshotInfo;

import java.util.Locale;

import static org.elasticsearch.xpack.core.ilm.LifecycleExecutionState.fromIndexMetadata;

/**
Expand Down Expand Up @@ -76,8 +79,12 @@ void performDuringNoSnapshot(IndexMetadata indexMetadata, ClusterState currentCl
} else {
int failures = snapInfo.failedShards();
int total = snapInfo.totalShards();
logger.warn("failed to create snapshot successfully, {} failures out of {} total shards failed", failures, total);
listener.onResponse(false);
String message = String.format(Locale.ROOT,
"failed to create snapshot successfully, %s failures out of %s total shards failed", failures, total);
logger.warn(message);
ElasticsearchException failure = new ElasticsearchException(message,
snapInfo.shardFailures().stream().findFirst().orElse(null));
listener.onFailure(failure);
}
}, listener::onFailure));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,11 @@ public void performAction(IndexMetadata indexMetadata, ClusterState currentClust
getClient().admin().indices().rolloverIndex(rolloverRequest,
ActionListener.wrap(response -> {
assert response.isRolledOver() : "the only way this rollover call should fail is with an exception";
listener.onResponse(response.isRolledOver());
if (response.isRolledOver()) {
listener.onResponse(true);
} else {
listener.onFailure(new IllegalStateException("unexepected exception on unconditional rollover"));
}
}, listener::onFailure));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,12 @@ public void onResponse(Boolean complete) {
// index since it will be... deleted.
registerDeleteOperation(indexMetadata);
}
} else {
// All steps *should* return true for complete, or invoke listener.onFailure
// with a useful exception. In the case that they don't, we move to error
// step here with a generic exception
moveToErrorStep(indexMetadata.getIndex(), policy, currentStep.getKey(),
new IllegalStateException("unknown exception for step " + currentStep.getKey() + " in policy " + policy));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ public class IndexLifecycleRunnerTests extends ESTestCase {
public void prepare() {
threadPool = new TestThreadPool("test");
noopClient = new NoOpClient(threadPool);
historyStore = new NoOpHistoryStore();
historyStore = new NoOpHistoryStore(noopClient);
}

@After
Expand Down Expand Up @@ -552,6 +552,77 @@ public void testRunStateChangePolicyWithAsyncActionNextStep() throws Exception {
"\"state\":{\"phase\":\"phase\",\"action\":\"action\",\"step\":\"async_action_step\",\"step_time\":\"0\"}}"));
}

public void testRunAsyncActionReturningFalseEntersError() throws Exception {
String policyName = "foo";
StepKey stepKey = new StepKey("phase", "action", "async_action_step");
StepKey nextStepKey = new StepKey("phase", "action", "cluster_state_action_step");
MockAsyncActionStep step = new MockAsyncActionStep(stepKey, nextStepKey);
MockClusterStateActionStep nextStep = new MockClusterStateActionStep(nextStepKey, null);
MockPolicyStepsRegistry stepRegistry = createMultiStepPolicyStepRegistry(policyName, Arrays.asList(step, nextStep));
stepRegistry.setResolver((i, k) -> {
if (stepKey.equals(k)) {
return step;
} else if (nextStepKey.equals(k)) {
return nextStep;
} else {
fail("should not try to retrieve different step");
return null;
}
});
LifecycleExecutionState les = LifecycleExecutionState.builder()
.setPhase("phase")
.setAction("action")
.setStep("async_action_step")
.build();
IndexMetadata indexMetadata = IndexMetadata.builder("test")
.settings(Settings.builder()
.put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT)
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)
.put(LifecycleSettings.LIFECYCLE_NAME, policyName))
.putCustom(LifecycleExecutionState.ILM_CUSTOM_METADATA_KEY, les.asMap())
.build();
ClusterService clusterService = ClusterServiceUtils.createClusterService(threadPool);
DiscoveryNode node = clusterService.localNode();
IndexLifecycleMetadata ilm = new IndexLifecycleMetadata(Collections.emptyMap(), OperationMode.RUNNING);
ClusterState state = ClusterState.builder(new ClusterName("cluster"))
.metadata(Metadata.builder()
.put(indexMetadata, true)
.putCustom(IndexLifecycleMetadata.TYPE, ilm))
.nodes(DiscoveryNodes.builder()
.add(node)
.masterNodeId(node.getId())
.localNodeId(node.getId()))
.build();
ClusterServiceUtils.setState(clusterService, state);
IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, historyStore, clusterService, threadPool, () -> 0L);

CountDownLatch latch = new CountDownLatch(1);
step.setLatch(latch);
step.setWillComplete(false);
ClusterState before = clusterService.state();
runner.maybeRunAsyncAction(before, indexMetadata, policyName, stepKey);

// Wait for the action step to finish
awaitLatch(latch, 5, TimeUnit.SECONDS);

assertThat(step.getExecuteCount(), equalTo(1L));

try {
assertBusy(() -> {
ILMHistoryItem historyItem = historyStore.getItems().stream()
.findFirst()
.orElseThrow(() -> new AssertionError("failed to register ILM history"));
assertThat(historyItem.toString(),
containsString("\"{\\\"type\\\":\\\"illegal_state_exception\\\",\\\"reason\\\":" +
"\\\"unknown exception for step {\\\\\\\"phase\\\\\\\":\\\\\\\"phase\\\\\\\",\\\\\\\"action" +
"\\\\\\\":\\\\\\\"action\\\\\\\",\\\\\\\"name\\\\\\\":\\\\\\\"async_action_step\\\\\\\"} in policy foo\\\""));
});
} finally {
clusterService.close();
}
}

public void testRunPeriodicStep() throws Exception {
String policyName = "foo";
StepKey stepKey = new StepKey("phase", "action", "cluster_state_action_step");
Expand Down Expand Up @@ -880,7 +951,7 @@ public static void assertClusterStateOnNextStep(ClusterState oldClusterState, In
static class MockAsyncActionStep extends AsyncActionStep {

private Exception exception;
private boolean willComplete;
private boolean willComplete = true;
private boolean indexSurvives = true;
private long executeCount = 0;
private CountDownLatch latch;
Expand Down Expand Up @@ -1178,11 +1249,12 @@ public static MockPolicyStepsRegistry createMultiStepPolicyStepRegistry(String p
return new MockPolicyStepsRegistry(lifecyclePolicyMap, firstStepMap, stepMap, REGISTRY, client);
}

private class NoOpHistoryStore extends ILMHistoryStore {
private static class NoOpHistoryStore extends ILMHistoryStore {
private static final Logger logger = LogManager.getLogger(NoOpHistoryStore.class);

private final List<ILMHistoryItem> items = new ArrayList<>();

NoOpHistoryStore() {
NoOpHistoryStore(Client noopClient) {
super(Settings.EMPTY, noopClient, null, null);
}

Expand Down

0 comments on commit 7ad3cdd

Please sign in to comment.