Skip to content

Commit

Permalink
ILM: execute cached steps even if policy is updated (#75296) (#75443)
Browse files Browse the repository at this point in the history
This makes ILM honour the cached phase even when the underlying policy
is updated to not contain the cached actions.

Co-authored-by: Andrei Dan <andrei.dan@elastic.co>
  • Loading branch information
elasticsearchmachine and andreidan committed Jul 19, 2021
1 parent 2cae264 commit 2c16812
Show file tree
Hide file tree
Showing 5 changed files with 111 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -224,8 +224,8 @@ public static boolean isIndexPhaseDefinitionUpdatable(final NamedXContentRegistr
* information, returns null.
*/
@Nullable
static Set<Step.StepKey> readStepKeys(final NamedXContentRegistry xContentRegistry, final Client client,
final String phaseDef, final String currentPhase, final XPackLicenseState licenseState) {
public static Set<Step.StepKey> readStepKeys(final NamedXContentRegistry xContentRegistry, final Client client,
final String phaseDef, final String currentPhase, final XPackLicenseState licenseState) {
final PhaseExecutionInfo phaseExecutionInfo;
try (XContentParser parser = JsonXContent.jsonXContent.createParser(xContentRegistry,
DeprecationHandler.THROW_UNSUPPORTED_OPERATION, phaseDef)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.elasticsearch.xpack.core.ilm.Phase;
import org.elasticsearch.xpack.core.ilm.PhaseCompleteStep;
import org.elasticsearch.xpack.core.ilm.RolloverAction;
import org.elasticsearch.xpack.core.ilm.SetPriorityAction;
import org.elasticsearch.xpack.core.ilm.Step.StepKey;
import org.elasticsearch.xpack.core.ilm.WaitForRolloverReadyStep;

Expand All @@ -33,6 +34,9 @@

import static java.util.Collections.singletonMap;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.xpack.TimeSeriesRestDriver.createIndexWithSettings;
import static org.elasticsearch.xpack.TimeSeriesRestDriver.createNewSingletonPolicy;
import static org.elasticsearch.xpack.TimeSeriesRestDriver.indexDocument;

public class ChangePolicyforIndexIT extends ESRestTestCase {

Expand Down Expand Up @@ -124,6 +128,37 @@ public void testChangePolicyForIndex() throws Exception {
assertEquals("javaRestTest-0,javaRestTest-1,javaRestTest-2,javaRestTest-3", includesAllocation);
}

public void testILMHonoursTheCachedPhaseAfterPolicyUpdate() throws Exception {
String indexName = "test-000001";
String policyName = "rolloverPolicy";
String alias = "thealias";
createNewSingletonPolicy(client(), policyName, "hot", new RolloverAction(null, null, null, 1L));

createIndexWithSettings(client(), indexName, alias, Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.put(RolloverAction.LIFECYCLE_ROLLOVER_ALIAS, alias)
.put(LifecycleSettings.LIFECYCLE_NAME, policyName));

// Check the index is on the check-rollover-ready step
assertBusy(() -> assertStep(indexName, new StepKey("hot", RolloverAction.NAME, WaitForRolloverReadyStep.NAME)), 30,
TimeUnit.SECONDS);

// update the policy to not contain rollover
createNewSingletonPolicy(client(), policyName, "hot", new SetPriorityAction(200));

// Check the index is on the check-rollover-ready step
assertBusy(() -> assertStep(indexName, new StepKey("hot", RolloverAction.NAME, WaitForRolloverReadyStep.NAME)), 30,
TimeUnit.SECONDS);

indexDocument(client(), indexName, true);

String rolloverIndex = "test-000002";
// let's check the cached rollover action still executed and the rollover index exists
assertBusy(() -> indexExists(rolloverIndex), 30, TimeUnit.SECONDS);
assertBusy(() -> assertStep(indexName, PhaseCompleteStep.finalStep("hot").getKey()), 30, TimeUnit.SECONDS);
}

private void assertStep(String indexName, StepKey expectedStep) throws IOException {
Response explainResponse = client().performRequest(new Request("GET", "/" + indexName + "/_ilm/explain"));
assertOK(explainResponse);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.function.BiFunction;
import java.util.function.LongSupplier;

Expand Down Expand Up @@ -84,8 +85,13 @@ public static void validateTransition(IndexMetadata idxMeta, Step.StepKey curren
"], currently: [" + realKey + "]");
}

// Always allow moving to the terminal step, even if it doesn't exist in the policy
if (stepRegistry.stepExists(indexPolicySetting, newStepKey) == false && newStepKey.equals(TerminalPolicyStep.KEY) == false) {
final Set<Step.StepKey> cachedStepKeys =
stepRegistry.parseStepKeysFromPhase(lifecycleState.getPhaseDefinition(), lifecycleState.getPhase());
boolean isNewStepCached = cachedStepKeys != null && cachedStepKeys.contains(newStepKey);

// Always allow moving to the terminal step or to a step that's present in the cached phase, even if it doesn't exist in the policy
if (isNewStepCached == false &&
(stepRegistry.stepExists(indexPolicySetting, newStepKey) == false && newStepKey.equals(TerminalPolicyStep.KEY) == false)) {
throw new IllegalArgumentException("step [" + newStepKey + "] for index [" + idxMeta.getIndex().getName() +
"] with policy [" + indexPolicySetting + "] does not exist");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,15 @@
import org.elasticsearch.cluster.Diff;
import org.elasticsearch.cluster.DiffableUtils;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.common.xcontent.DeprecationHandler;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentParseException;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.Index;
import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.xpack.core.ClientHelper;
Expand All @@ -34,6 +34,7 @@
import org.elasticsearch.xpack.core.ilm.LifecyclePolicyMetadata;
import org.elasticsearch.xpack.core.ilm.LifecycleSettings;
import org.elasticsearch.xpack.core.ilm.Phase;
import org.elasticsearch.xpack.core.ilm.PhaseCacheManagement;
import org.elasticsearch.xpack.core.ilm.PhaseExecutionInfo;
import org.elasticsearch.xpack.core.ilm.Step;
import org.elasticsearch.xpack.core.ilm.TerminalPolicyStep;
Expand All @@ -45,6 +46,7 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -143,6 +145,15 @@ public LifecyclePolicyMetadata read(StreamInput in, String key) {
}
}

/**
* Parses the step keys from the {@code phaseDef} for the given phase.
* Returns null if there's a parsing error.
*/
@Nullable
public Set<Step.StepKey> parseStepKeysFromPhase(String phaseDef, String currentPhase) {
return PhaseCacheManagement.readStepKeys(xContentRegistry, client, phaseDef, currentPhase, licenseState);
}

private List<Step> parseStepsFromPhase(String policy, String currentPhase, String phaseDef) throws IOException {
final PhaseExecutionInfo phaseExecutionInfo;
LifecyclePolicyMetadata policyMetadata = lifecyclePolicyMap.get(policy);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,10 @@
import org.elasticsearch.xpack.core.ilm.Phase;
import org.elasticsearch.xpack.core.ilm.PhaseCompleteStep;
import org.elasticsearch.xpack.core.ilm.RolloverAction;
import org.elasticsearch.xpack.core.ilm.RolloverStep;
import org.elasticsearch.xpack.core.ilm.SetPriorityAction;
import org.elasticsearch.xpack.core.ilm.Step;
import org.elasticsearch.xpack.core.ilm.WaitForRolloverReadyStep;

import java.io.IOException;
import java.util.ArrayList;
Expand Down Expand Up @@ -489,11 +491,61 @@ public void testValidateValidTransition() {
try {
IndexLifecycleTransition.validateTransition(indexMetadata, currentStepKey, nextStepKey, policyRegistry);
} catch (Exception e) {
logger.error(e);
logger.error(e.getMessage(), e);
fail("validateTransition should not throw exception on valid transitions");
}
}

public void testValidateTransitionToCachedStepMissingFromPolicy() {
LifecycleExecutionState.Builder executionState = LifecycleExecutionState.builder()
.setPhase("hot")
.setAction("rollover")
.setStep("check-rollover-ready")
.setPhaseDefinition("{\n" +
" \"policy\" : \"my-policy\",\n" +
" \"phase_definition\" : {\n" +
" \"min_age\" : \"20m\",\n" +
" \"actions\" : {\n" +
" \"rollover\" : {\n" +
" \"max_age\" : \"5s\"\n" +
" },\n" +
" \"set_priority\" : {\n" +
" \"priority\" : 150\n" +
" }\n" +
" }\n" +
" },\n" +
" \"version\" : 1,\n" +
" \"modified_date_in_millis\" : 1578521007076\n" +
" }");

IndexMetadata meta = buildIndexMetadata("my-policy", executionState);

Map<String, LifecycleAction> actions = new HashMap<>();
actions.put(SetPriorityAction.NAME, new SetPriorityAction(100));
Phase hotPhase = new Phase("hot", TimeValue.ZERO, actions);
Map<String, Phase> phases = Collections.singletonMap("hot", hotPhase);
LifecyclePolicy policyWithoutRollover = new LifecyclePolicy("my-policy", phases);
LifecyclePolicyMetadata policyMetadata = new LifecyclePolicyMetadata(policyWithoutRollover, Collections.emptyMap(), 2L, 2L);

ClusterState existingState = ClusterState.builder(ClusterState.EMPTY_STATE)
.metadata(Metadata.builder(Metadata.EMPTY_METADATA)
.put(meta, false)
.build())
.build();
try (Client client = new NoOpClient(getTestName())) {
Step.StepKey currentStepKey = new Step.StepKey("hot", RolloverAction.NAME, WaitForRolloverReadyStep.NAME);
Step.StepKey nextStepKey = new Step.StepKey("hot", RolloverAction.NAME, RolloverStep.NAME);
Step currentStep = new WaitForRolloverReadyStep(currentStepKey, nextStepKey, client, null, null, null, 1L);
try {
IndexLifecycleTransition.validateTransition(meta, currentStepKey, nextStepKey, createOneStepPolicyStepRegistry("my-policy",
currentStep));
} catch (Exception e) {
logger.error(e.getMessage(), e);
fail("validateTransition should not throw exception on valid transitions");
}
}
}

public void testMoveClusterStateToFailedStep() {
String indexName = "my_index";
String policyName = "my_policy";
Expand Down

0 comments on commit 2c16812

Please sign in to comment.