Skip to content

Commit

Permalink
ILM retryable async action steps (#50522)
Browse files Browse the repository at this point in the history
This adds support for retrying AsyncActionSteps by triggering the async
step after ILM was moved back on the failed step (the async step we'll
be attempting to run after the cluster state reflects ILM being moved
back on the failed step).

This also marks the RolloverStep as retryable and adds an integration
test where the RolloverStep is failing to execute as the rolled over
index already exists to test that the async action RolloverStep is
retried until the rolled over index is deleted.
  • Loading branch information
andreidan committed Jan 3, 2020
1 parent e23e1bf commit 8bee5f4
Show file tree
Hide file tree
Showing 3 changed files with 155 additions and 63 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@ public RolloverStep(StepKey key, StepKey nextStepKey, Client client) {
super(key, nextStepKey, client);
}

@Override
public boolean isRetryable() {
return true;
}

@Override
public void performAction(IndexMetaData indexMetaData, ClusterState currentClusterState,
ClusterStateObserver observer, Listener listener) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -940,69 +940,142 @@ public void testILMRolloverRetriesOnReadOnlyBlock() throws Exception {
assertBusy(() -> assertThat(getStepKeyForIndex(firstIndex), equalTo(TerminalPolicyStep.KEY)));
}

public void testILMRolloverOnManuallyRolledIndex() throws Exception {
String originalIndex = index + "-000001";
String secondIndex = index + "-000002";
String thirdIndex = index + "-000003";

// Set up a policy with rollover
createNewSingletonPolicy("hot", new RolloverAction(null, null, 2L));
Request createIndexTemplate = new Request("PUT", "_template/rolling_indexes");
createIndexTemplate.setJsonEntity("{" +
"\"index_patterns\": [\""+ index + "-*\"], \n" +
" \"settings\": {\n" +
" \"number_of_shards\": 1,\n" +
" \"number_of_replicas\": 0,\n" +
" \"index.lifecycle.name\": \"" + policy+ "\", \n" +
" \"index.lifecycle.rollover_alias\": \"alias\"\n" +
" }\n" +
"}");
client().performRequest(createIndexTemplate);

createIndexWithSettings(
originalIndex,
Settings.builder().put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0),
true
);

// Index a document
index(client(), originalIndex, "1", "foo", "bar");
Request refreshOriginalIndex = new Request("POST", "/" + originalIndex + "/_refresh");
client().performRequest(refreshOriginalIndex);

// Manual rollover
Request rolloverRequest = new Request("POST", "/alias/_rollover");
rolloverRequest.setJsonEntity("{\n" +
" \"conditions\": {\n" +
" \"max_docs\": \"1\"\n" +
" }\n" +
"}"
);
client().performRequest(rolloverRequest);
assertBusy(() -> assertTrue(indexExists(secondIndex)));

// Index another document into the original index so the ILM rollover policy condition is met
index(client(), originalIndex, "2", "foo", "bar");
client().performRequest(refreshOriginalIndex);

// Wait for the rollover policy to execute
assertBusy(() -> assertThat(getStepKeyForIndex(originalIndex), equalTo(TerminalPolicyStep.KEY)));

// ILM should manage the second index after attempting (and skipping) rolling the original index
assertBusy(() -> assertTrue((boolean) explainIndex(secondIndex).getOrDefault("managed", true)));

// index some documents to trigger an ILM rollover
index(client(), "alias", "1", "foo", "bar");
index(client(), "alias", "2", "foo", "bar");
index(client(), "alias", "3", "foo", "bar");
Request refreshSecondIndex = new Request("POST", "/" + secondIndex + "/_refresh");
client().performRequest(refreshSecondIndex).getStatusLine();

// ILM should rollover the second index even though it skipped the first one
assertBusy(() -> assertThat(getStepKeyForIndex(secondIndex), equalTo(TerminalPolicyStep.KEY)));
assertBusy(() -> assertTrue(indexExists(thirdIndex)));
}
public void testILMRolloverOnManuallyRolledIndex() throws Exception {
String originalIndex = index + "-000001";
String secondIndex = index + "-000002";
String thirdIndex = index + "-000003";

// Set up a policy with rollover
createNewSingletonPolicy("hot", new RolloverAction(null, null, 2L));
Request createIndexTemplate = new Request("PUT", "_template/rolling_indexes");
createIndexTemplate.setJsonEntity("{" +
"\"index_patterns\": [\"" + index + "-*\"], \n" +
" \"settings\": {\n" +
" \"number_of_shards\": 1,\n" +
" \"number_of_replicas\": 0,\n" +
" \"index.lifecycle.name\": \"" + policy + "\", \n" +
" \"index.lifecycle.rollover_alias\": \"alias\"\n" +
" }\n" +
"}");
client().performRequest(createIndexTemplate);

createIndexWithSettings(
originalIndex,
Settings.builder().put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0),
true
);

// Index a document
index(client(), originalIndex, "1", "foo", "bar");
Request refreshOriginalIndex = new Request("POST", "/" + originalIndex + "/_refresh");
client().performRequest(refreshOriginalIndex);

// Manual rollover
Request rolloverRequest = new Request("POST", "/alias/_rollover");
rolloverRequest.setJsonEntity("{\n" +
" \"conditions\": {\n" +
" \"max_docs\": \"1\"\n" +
" }\n" +
"}"
);
client().performRequest(rolloverRequest);
assertBusy(() -> assertTrue(indexExists(secondIndex)));

// Index another document into the original index so the ILM rollover policy condition is met
index(client(), originalIndex, "2", "foo", "bar");
client().performRequest(refreshOriginalIndex);

// Wait for the rollover policy to execute
assertBusy(() -> assertThat(getStepKeyForIndex(originalIndex), equalTo(TerminalPolicyStep.KEY)));

// ILM should manage the second index after attempting (and skipping) rolling the original index
assertBusy(() -> assertTrue((boolean) explainIndex(secondIndex).getOrDefault("managed", true)));

// index some documents to trigger an ILM rollover
index(client(), "alias", "1", "foo", "bar");
index(client(), "alias", "2", "foo", "bar");
index(client(), "alias", "3", "foo", "bar");
Request refreshSecondIndex = new Request("POST", "/" + secondIndex + "/_refresh");
client().performRequest(refreshSecondIndex).getStatusLine();

// ILM should rollover the second index even though it skipped the first one
assertBusy(() -> assertThat(getStepKeyForIndex(secondIndex), equalTo(TerminalPolicyStep.KEY)));
assertBusy(() -> assertTrue(indexExists(thirdIndex)));
}

public void testRolloverStepRetriesUntilRolledOverIndexIsDeleted() throws Exception {
String index = this.index + "-000001";
String rolledIndex = this.index + "-000002";

createNewSingletonPolicy("hot", new RolloverAction(null, TimeValue.timeValueSeconds(1), null));

// create the rolled index so the rollover of the first index fails
createIndexWithSettings(
rolledIndex,
Settings.builder().put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
.put(RolloverAction.LIFECYCLE_ROLLOVER_ALIAS, "alias"),
false
);

createIndexWithSettings(
index,
Settings.builder().put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
.put(LifecycleSettings.LIFECYCLE_NAME, policy)
.put(RolloverAction.LIFECYCLE_ROLLOVER_ALIAS, "alias"),
true
);

assertBusy(() -> assertThat((Integer) explainIndex(index).get(FAILED_STEP_RETRY_COUNT_FIELD), greaterThanOrEqualTo(1)), 30,
TimeUnit.SECONDS);

Request moveToStepRequest = new Request("POST", "_ilm/move/" + index);
moveToStepRequest.setJsonEntity("{\n" +
" \"current_step\": {\n" +
" \"phase\": \"hot\",\n" +
" \"action\": \"rollover\",\n" +
" \"name\": \"check-rollover-ready\"\n" +
" },\n" +
" \"next_step\": {\n" +
" \"phase\": \"hot\",\n" +
" \"action\": \"rollover\",\n" +
" \"name\": \"attempt-rollover\"\n" +
" }\n" +
"}");

// Using {@link #waitUntil} here as ILM moves back and forth between the {@link WaitForRolloverReadyStep} step and
// {@link org.elasticsearch.xpack.core.ilm.ErrorStep} in order to retry the failing step. As {@link #assertBusy}
// increases the wait time between calls exponentially, we might miss the window where the policy is on
// {@link WaitForRolloverReadyStep} and the move to `attempt-rollover` request will not be successful.
waitUntil(() -> {
try {
return client().performRequest(moveToStepRequest).getStatusLine().getStatusCode() == 200;
} catch (IOException e) {
return false;
}
}, 30, TimeUnit.SECONDS);

// Similar to above, using {@link #waitUntil} as we want to make sure the `attempt-rollover` step started failing and is being
// retried (which means ILM moves back and forth between the `attempt-rollover` step and the `error` step)
waitUntil(() -> {
try {
Map<String, Object> explainIndexResponse = explainIndex(index);
String step = (String) explainIndexResponse.get("step");
Integer retryCount = (Integer) explainIndexResponse.get(FAILED_STEP_RETRY_COUNT_FIELD);
return step != null && step.equals("attempt-rollover") && retryCount != null && retryCount >= 1;
} catch (IOException e) {
return false;
}
}, 30, TimeUnit.SECONDS);

deleteIndex(rolledIndex);

// the rollover step should eventually succeed
assertBusy(() -> assertThat(indexExists(rolledIndex), is(true)));
assertBusy(() -> assertThat(getStepKeyForIndex(index), equalTo(TerminalPolicyStep.KEY)));
}

public void testHistoryIsWrittenWithSuccess() throws Exception {
String index = "index";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,20 @@ public void onFailure(String source, Exception e) {
logger.error(new ParameterizedMessage("retry execution of step [{}] for index [{}] failed",
failedStep.getKey().getName(), index), e);
}

@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
if (oldState.equals(newState) == false) {
IndexMetaData newIndexMeta = newState.metaData().index(index);
Step indexMetaCurrentStep = getCurrentStep(stepRegistry, policy, newIndexMeta);
StepKey stepKey = indexMetaCurrentStep.getKey();
if (stepKey != null && stepKey != TerminalPolicyStep.KEY && newIndexMeta != null) {
logger.trace("policy [{}] for index [{}] was moved back on the failed step for as part of an automatic " +
"retry. Attempting to execute the failed step [{}] if it's an async action", policy, index, stepKey);
maybeRunAsyncAction(newState, newIndexMeta, policy, stepKey);
}
}
}
});
} else {
logger.debug("policy [{}] for index [{}] on an error step after a terminal error, skipping execution", policy, index);
Expand Down

0 comments on commit 8bee5f4

Please sign in to comment.