diff --git a/docs/reference/ilm/policy-definitions.asciidoc b/docs/reference/ilm/policy-definitions.asciidoc index 7114516e4d2a9..67fcdba9b76ef 100644 --- a/docs/reference/ilm/policy-definitions.asciidoc +++ b/docs/reference/ilm/policy-definitions.asciidoc @@ -55,7 +55,7 @@ PUT _ilm/policy/my_policy } -------------------------------------------------- -The Above example configures a policy that moves the index into the warm +The above example configures a policy that moves the index into the warm phase after one day. Until then, the index is in a waiting state. After moving into the warm phase, it will wait until 30 days have elapsed before moving to the delete phase and deleting the index. @@ -76,10 +76,14 @@ check occurs. === Phase Execution The current phase definition, of an index's policy being executed, is stored -in the index's metadata. The phase and its actions are compiled into a series -of discrete steps that are executed sequentially. Since some {ilm-init} actions -are more complex and involve multiple operations against an index, each of these -operations are done in isolation in a unit called a "step". The +in the index's metadata. This phase definition is cached to prevent changes to +the policy from putting the index in a state where it cannot proceed from its +current step. When the policy is updated we check to see if this phase +definition can be safely updated, and if so, update the cached definition in +indices using the updated policy. The phase and its actions are compiled into a +series of discrete steps that are executed sequentially. Since some {ilm-init} +actions are more complex and involve multiple operations against an index, each +of these operations are done in isolation in a unit called a "step". The <> exposes this information to us to see which step our index is either to execute next, or is currently executing. diff --git a/x-pack/plugin/ilm/qa/multi-node/src/test/java/org/elasticsearch/xpack/ilm/TimeSeriesLifecycleActionsIT.java b/x-pack/plugin/ilm/qa/multi-node/src/test/java/org/elasticsearch/xpack/ilm/TimeSeriesLifecycleActionsIT.java index ca35a6bee0e03..b7fc5bdcb69e8 100644 --- a/x-pack/plugin/ilm/qa/multi-node/src/test/java/org/elasticsearch/xpack/ilm/TimeSeriesLifecycleActionsIT.java +++ b/x-pack/plugin/ilm/qa/multi-node/src/test/java/org/elasticsearch/xpack/ilm/TimeSeriesLifecycleActionsIT.java @@ -1304,6 +1304,41 @@ public void testRetryableInitializationStep() throws Exception { }); } + public void testRefreshablePhaseJson() throws Exception { + String index = "refresh-index"; + + createNewSingletonPolicy("hot", new RolloverAction(null, null, 100L)); + Request createIndexTemplate = new Request("PUT", "_template/rolling_indexes"); + createIndexTemplate.setJsonEntity("{" + + "\"index_patterns\": [\""+ index + "-*\"], \n" + + " \"settings\": {\n" + + " \"number_of_shards\": 1,\n" + + " \"number_of_replicas\": 0,\n" + + " \"index.lifecycle.name\": \"" + policy+ "\",\n" + + " \"index.lifecycle.rollover_alias\": \"alias\"\n" + + " }\n" + + "}"); + client().performRequest(createIndexTemplate); + + createIndexWithSettings(index + "-1", + Settings.builder().put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0), + true); + + // Index a document + index(client(), index + "-1", "1", "foo", "bar"); + + // Wait for the index to enter the check-rollover-ready step + assertBusy(() -> assertThat(getStepKeyForIndex(index + "-1").getName(), equalTo(WaitForRolloverReadyStep.NAME))); + + // Update the policy to allow rollover at 1 document instead of 100 + createNewSingletonPolicy("hot", new RolloverAction(null, null, 1L)); + + // Index should now have been able to roll over, creating the new index and proceeding to the "complete" step + assertBusy(() -> assertThat(indexExists(index + "-000002"), is(true))); + assertBusy(() -> assertThat(getStepKeyForIndex(index + "-1").getName(), equalTo(TerminalPolicyStep.KEY.getName()))); + } + // This method should be called inside an assertBusy, it has no retry logic of its own private void assertHistoryIsPresent(String policyName, String indexName, boolean success, String stepName) throws IOException { assertHistoryIsPresent(policyName, indexName, success, null, null, stepName); diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycleTransition.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycleTransition.java index 816186882927c..0f6696b0be537 100644 --- a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycleTransition.java +++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycleTransition.java @@ -251,8 +251,8 @@ private static LifecycleExecutionState updateExecutionStateToStep(LifecyclePolic /** * Given a cluster state and lifecycle state, return a new state using the new lifecycle state for the given index. */ - private static ClusterState.Builder newClusterStateWithLifecycleState(Index index, ClusterState clusterState, - LifecycleExecutionState lifecycleState) { + public static ClusterState.Builder newClusterStateWithLifecycleState(Index index, ClusterState clusterState, + LifecycleExecutionState lifecycleState) { ClusterState.Builder newClusterStateBuilder = ClusterState.builder(clusterState); newClusterStateBuilder.metaData(MetaData.builder(clusterState.getMetaData()) .put(IndexMetaData.builder(clusterState.getMetaData().index(index)) diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/action/TransportPutLifecycleAction.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/action/TransportPutLifecycleAction.java index be0a718c8bf10..d1d6205d786e4 100644 --- a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/action/TransportPutLifecycleAction.java +++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/action/TransportPutLifecycleAction.java @@ -8,34 +8,54 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.master.TransportMasterNodeAction; +import org.elasticsearch.client.Client; import org.elasticsearch.cluster.AckedClusterStateUpdateTask; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockLevel; +import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.Strings; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.xcontent.DeprecationHandler; +import org.elasticsearch.common.xcontent.NamedXContentRegistry; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.common.xcontent.json.JsonXContent; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.core.ClientHelper; +import org.elasticsearch.xpack.core.ilm.ErrorStep; import org.elasticsearch.xpack.core.ilm.IndexLifecycleMetadata; +import org.elasticsearch.xpack.core.ilm.LifecycleExecutionState; import org.elasticsearch.xpack.core.ilm.LifecyclePolicy; import org.elasticsearch.xpack.core.ilm.LifecyclePolicyMetadata; +import org.elasticsearch.xpack.core.ilm.LifecycleSettings; +import org.elasticsearch.xpack.core.ilm.PhaseExecutionInfo; +import org.elasticsearch.xpack.core.ilm.Step; import org.elasticsearch.xpack.core.ilm.action.PutLifecycleAction; import org.elasticsearch.xpack.core.ilm.action.PutLifecycleAction.Request; import org.elasticsearch.xpack.core.ilm.action.PutLifecycleAction.Response; +import org.elasticsearch.xpack.ilm.IndexLifecycleTransition; import java.io.IOException; import java.time.Instant; +import java.util.LinkedHashSet; +import java.util.List; import java.util.Map; +import java.util.Set; import java.util.SortedMap; +import java.util.Spliterators; import java.util.TreeMap; import java.util.stream.Collectors; +import java.util.stream.StreamSupport; /** * This class is responsible for bootstrapping {@link IndexLifecycleMetadata} into the cluster-state, as well @@ -44,12 +64,17 @@ public class TransportPutLifecycleAction extends TransportMasterNodeAction { private static final Logger logger = LogManager.getLogger(TransportPutLifecycleAction.class); + private final NamedXContentRegistry xContentRegistry; + private final Client client; @Inject public TransportPutLifecycleAction(TransportService transportService, ClusterService clusterService, ThreadPool threadPool, - ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) { + ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, + NamedXContentRegistry namedXContentRegistry, Client client) { super(PutLifecycleAction.NAME, transportService, clusterService, threadPool, actionFilters, Request::new, indexNameExpressionResolver); + this.xContentRegistry = namedXContentRegistry; + this.client = client; } @Override @@ -81,7 +106,7 @@ protected Response newResponse(boolean acknowledged) { @Override public ClusterState execute(ClusterState currentState) throws Exception { - ClusterState.Builder newState = ClusterState.builder(currentState); + ClusterState.Builder stateBuilder = ClusterState.builder(currentState); IndexLifecycleMetadata currentMetadata = currentState.metaData().custom(IndexLifecycleMetadata.TYPE); if (currentMetadata == null) { // first time using index-lifecycle feature, bootstrap metadata currentMetadata = IndexLifecycleMetadata.EMPTY; @@ -99,13 +124,195 @@ public ClusterState execute(ClusterState currentState) throws Exception { logger.info("updating index lifecycle policy [{}]", request.getPolicy().getName()); } IndexLifecycleMetadata newMetadata = new IndexLifecycleMetadata(newPolicies, currentMetadata.getOperationMode()); - newState.metaData(MetaData.builder(currentState.getMetaData()) + stateBuilder.metaData(MetaData.builder(currentState.getMetaData()) .putCustom(IndexLifecycleMetadata.TYPE, newMetadata).build()); - return newState.build(); + ClusterState nonRefreshedState = stateBuilder.build(); + if (oldPolicy == null) { + return nonRefreshedState; + } else { + try { + return updateIndicesForPolicy(nonRefreshedState, xContentRegistry, client, + oldPolicy.getPolicy(), lifecyclePolicyMetadata); + } catch (Exception e) { + logger.warn(new ParameterizedMessage("unable to refresh indices phase JSON for updated policy [{}]", + oldPolicy.getName()), e); + // Revert to the non-refreshed state + return nonRefreshedState; + } + } } }); } + /** + * Ensure that we have the minimum amount of metadata necessary to check for cache phase + * refresh. This includes: + * - An execution state + * - Existing phase definition JSON + * - A current step key + * - A current phase in the step key + * - Not currently in the ERROR step + */ + static boolean eligibleToCheckForRefresh(final IndexMetaData metaData) { + LifecycleExecutionState executionState = LifecycleExecutionState.fromIndexMetadata(metaData); + if (executionState == null || executionState.getPhaseDefinition() == null) { + return false; + } + + Step.StepKey currentStepKey = LifecycleExecutionState.getCurrentStepKey(executionState); + if (currentStepKey == null || currentStepKey.getPhase() == null) { + return false; + } + + return ErrorStep.NAME.equals(currentStepKey.getName()) == false; + } + + /** + * Parse the {@code phaseDef} phase definition to get the stepkeys for the given phase. + * If there is an error parsing or if the phase definition is missing the required + * information, returns null. + */ + @Nullable + static Set readStepKeys(final NamedXContentRegistry xContentRegistry, final Client client, + final String phaseDef, final String currentPhase) { + final PhaseExecutionInfo phaseExecutionInfo; + try (XContentParser parser = JsonXContent.jsonXContent.createParser(xContentRegistry, + DeprecationHandler.THROW_UNSUPPORTED_OPERATION, phaseDef)) { + phaseExecutionInfo = PhaseExecutionInfo.parse(parser, currentPhase); + } catch (Exception e) { + logger.trace(new ParameterizedMessage("exception reading step keys checking for refreshability, phase definition: {}", + phaseDef), e); + return null; + } + + if (phaseExecutionInfo == null || phaseExecutionInfo.getPhase() == null) { + return null; + } + + return phaseExecutionInfo.getPhase().getActions().values().stream() + .flatMap(a -> a.toSteps(client, phaseExecutionInfo.getPhase().getName(), null).stream()) + .map(Step::getKey) + .collect(Collectors.toCollection(LinkedHashSet::new)); + } + + /** + * Returns 'true' if the index's cached phase JSON can be safely reread, 'false' otherwise. + */ + static boolean isIndexPhaseDefinitionUpdatable(final NamedXContentRegistry xContentRegistry, final Client client, + final IndexMetaData metaData, final LifecyclePolicy newPolicy) { + final String index = metaData.getIndex().getName(); + if (eligibleToCheckForRefresh(metaData) == false) { + logger.debug("[{}] does not contain enough information to check for eligibility of refreshing phase", index); + return false; + } + final String policyId = newPolicy.getName(); + + final LifecycleExecutionState executionState = LifecycleExecutionState.fromIndexMetadata(metaData); + final Step.StepKey currentStepKey = LifecycleExecutionState.getCurrentStepKey(executionState); + final String currentPhase = currentStepKey.getPhase(); + + final Set newStepKeys = newPolicy.toSteps(client).stream() + .map(Step::getKey) + .collect(Collectors.toCollection(LinkedHashSet::new)); + + if (newStepKeys.contains(currentStepKey) == false) { + // The index is on a step that doesn't exist in the new policy, we + // can't safely re-read the JSON + logger.debug("[{}] updated policy [{}] does not contain the current step key [{}], so the policy phase will not be refreshed", + index, policyId, currentStepKey); + return false; + } + + final String phaseDef = executionState.getPhaseDefinition(); + final Set oldStepKeys = readStepKeys(xContentRegistry, client, phaseDef, currentPhase); + if (oldStepKeys == null) { + logger.debug("[{}] unable to parse phase definition for cached policy [{}], policy phase will not be refreshed", + index, policyId); + return false; + } + + final Set oldPhaseStepKeys = oldStepKeys.stream() + .filter(sk -> currentPhase.equals(sk.getPhase())) + .collect(Collectors.toCollection(LinkedHashSet::new)); + + final PhaseExecutionInfo phaseExecutionInfo = new PhaseExecutionInfo(policyId, newPolicy.getPhases().get(currentPhase), 1L, 1L); + final String peiJson = Strings.toString(phaseExecutionInfo); + + final Set newPhaseStepKeys = readStepKeys(xContentRegistry, client, peiJson, currentPhase); + if (newPhaseStepKeys == null) { + logger.debug(new ParameterizedMessage("[{}] unable to parse phase definition for policy [{}] " + + "to determine if it could be refreshed", index, policyId)); + return false; + } + + if (newPhaseStepKeys.equals(oldPhaseStepKeys)) { + // The new and old phase have the same stepkeys for this current phase, so we can + // refresh the definition because we know it won't change the execution flow. + logger.debug("[{}] updated policy [{}] contains the same phase step keys and can be refreshed", index, policyId); + return true; + } else { + logger.debug("[{}] updated policy [{}] has different phase step keys and will NOT refresh phase " + + "definition as it differs too greatly. old: {}, new: {}", + index, policyId, oldPhaseStepKeys, newPhaseStepKeys); + return false; + } + } + + /** + * Rereads the phase JSON for the given index, returning a new cluster state. + */ + static ClusterState refreshPhaseDefinition(final ClusterState state, final String index, final LifecyclePolicyMetadata updatedPolicy) { + final IndexMetaData idxMeta = state.metaData().index(index); + assert eligibleToCheckForRefresh(idxMeta) : "index " + index + " is missing crucial information needed to refresh phase definition"; + + logger.trace("[{}] updating cached phase definition for policy [{}]", index, updatedPolicy.getName()); + LifecycleExecutionState currentExState = LifecycleExecutionState.fromIndexMetadata(idxMeta); + + String currentPhase = currentExState.getPhase(); + PhaseExecutionInfo pei = new PhaseExecutionInfo(updatedPolicy.getName(), + updatedPolicy.getPolicy().getPhases().get(currentPhase), updatedPolicy.getVersion(), updatedPolicy.getModifiedDate()); + + LifecycleExecutionState newExState = LifecycleExecutionState.builder(currentExState) + .setPhaseDefinition(Strings.toString(pei, false, false)) + .build(); + + return IndexLifecycleTransition.newClusterStateWithLifecycleState(idxMeta.getIndex(), state, newExState).build(); + } + + /** + * For the given new policy, returns a new cluster with all updateable indices' phase JSON refreshed. + */ + static ClusterState updateIndicesForPolicy(final ClusterState state, final NamedXContentRegistry xContentRegistry, final Client client, + final LifecyclePolicy oldPolicy, final LifecyclePolicyMetadata newPolicy) { + assert oldPolicy.getName().equals(newPolicy.getName()) : "expected both policies to have the same id but they were: [" + + oldPolicy.getName() + "] vs. [" + newPolicy.getName() + "]"; + + // No need to update anything if the policies are identical in contents + if (oldPolicy.equals(newPolicy.getPolicy())) { + logger.debug("policy [{}] is unchanged and no phase definition refresh is needed", oldPolicy.getName()); + return state; + } + + final List indicesThatCanBeUpdated = + StreamSupport.stream(Spliterators.spliteratorUnknownSize(state.metaData().indices().valuesIt(), 0), false) + .filter(meta -> newPolicy.getName().equals(LifecycleSettings.LIFECYCLE_NAME_SETTING.get(meta.getSettings()))) + .filter(meta -> isIndexPhaseDefinitionUpdatable(xContentRegistry, client, meta, newPolicy.getPolicy())) + .map(meta -> meta.getIndex().getName()) + .collect(Collectors.toList()); + + ClusterState updatedState = state; + for (String index : indicesThatCanBeUpdated) { + try { + updatedState = refreshPhaseDefinition(updatedState, index, newPolicy); + } catch (Exception e) { + logger.warn(new ParameterizedMessage("[{}] unable to refresh phase definition for updated policy [{}]", + index, newPolicy.getName()), e); + } + } + + return updatedState; + } + @Override protected ClusterBlockException checkBlock(Request request, ClusterState state) { return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE); diff --git a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/action/TransportPutLifecycleActionTests.java b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/action/TransportPutLifecycleActionTests.java new file mode 100644 index 0000000000000..f2f67fa281f90 --- /dev/null +++ b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/action/TransportPutLifecycleActionTests.java @@ -0,0 +1,499 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.ilm.action; + +import org.elasticsearch.Version; +import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.xcontent.NamedXContentRegistry; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.xpack.core.ilm.AllocateAction; +import org.elasticsearch.xpack.core.ilm.AllocationRoutedStep; +import org.elasticsearch.xpack.core.ilm.ErrorStep; +import org.elasticsearch.xpack.core.ilm.ForceMergeAction; +import org.elasticsearch.xpack.core.ilm.FreezeAction; +import org.elasticsearch.xpack.core.ilm.LifecycleAction; +import org.elasticsearch.xpack.core.ilm.LifecycleExecutionState; +import org.elasticsearch.xpack.core.ilm.LifecyclePolicy; +import org.elasticsearch.xpack.core.ilm.LifecyclePolicyMetadata; +import org.elasticsearch.xpack.core.ilm.LifecycleSettings; +import org.elasticsearch.xpack.core.ilm.Phase; +import org.elasticsearch.xpack.core.ilm.PhaseExecutionInfo; +import org.elasticsearch.xpack.core.ilm.ReadOnlyAction; +import org.elasticsearch.xpack.core.ilm.RolloverAction; +import org.elasticsearch.xpack.core.ilm.RolloverStep; +import org.elasticsearch.xpack.core.ilm.SegmentCountStep; +import org.elasticsearch.xpack.core.ilm.SetPriorityAction; +import org.elasticsearch.xpack.core.ilm.Step; +import org.elasticsearch.xpack.core.ilm.UpdateRolloverLifecycleDateStep; +import org.elasticsearch.xpack.core.ilm.WaitForRolloverReadyStep; +import org.elasticsearch.xpack.ilm.IndexLifecycle; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.elasticsearch.xpack.core.ilm.LifecycleExecutionState.ILM_CUSTOM_METADATA_KEY; +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.equalTo; +import static org.mockito.Mockito.mock; + +public class TransportPutLifecycleActionTests extends ESTestCase { + private static final NamedXContentRegistry REGISTRY; + private static final Client client = mock(Client.class); + private static final String index = "eggplant"; + + static { + try (IndexLifecycle indexLifecycle = new IndexLifecycle(Settings.EMPTY)) { + List entries = new ArrayList<>(indexLifecycle.getNamedXContent()); + REGISTRY = new NamedXContentRegistry(entries); + } + } + + public void testEligibleForRefresh() { + IndexMetaData meta = mkMeta().build(); + assertFalse(TransportPutLifecycleAction.eligibleToCheckForRefresh(meta)); + + LifecycleExecutionState state = LifecycleExecutionState.builder().build(); + meta = mkMeta().putCustom(ILM_CUSTOM_METADATA_KEY, state.asMap()).build(); + assertFalse(TransportPutLifecycleAction.eligibleToCheckForRefresh(meta)); + + state = LifecycleExecutionState.builder() + .setPhase("phase") + .setAction("action") + .setStep("step") + .build(); + meta = mkMeta().putCustom(ILM_CUSTOM_METADATA_KEY, state.asMap()).build(); + assertFalse(TransportPutLifecycleAction.eligibleToCheckForRefresh(meta)); + + state = LifecycleExecutionState.builder() + .setPhaseDefinition("{}") + .build(); + meta = mkMeta().putCustom(ILM_CUSTOM_METADATA_KEY, state.asMap()).build(); + assertFalse(TransportPutLifecycleAction.eligibleToCheckForRefresh(meta)); + + state = LifecycleExecutionState.builder() + .setPhase("phase") + .setAction("action") + .setStep(ErrorStep.NAME) + .setPhaseDefinition("{}") + .build(); + meta = mkMeta().putCustom(ILM_CUSTOM_METADATA_KEY, state.asMap()).build(); + assertFalse(TransportPutLifecycleAction.eligibleToCheckForRefresh(meta)); + + state = LifecycleExecutionState.builder() + .setPhase("phase") + .setAction("action") + .setStep("step") + .setPhaseDefinition("{}") + .build(); + meta = mkMeta().putCustom(ILM_CUSTOM_METADATA_KEY, state.asMap()).build(); + assertTrue(TransportPutLifecycleAction.eligibleToCheckForRefresh(meta)); + } + + public void testReadStepKeys() { + assertNull(TransportPutLifecycleAction.readStepKeys(REGISTRY, client, "{}", "phase")); + assertNull(TransportPutLifecycleAction.readStepKeys(REGISTRY, client, "aoeu", "phase")); + assertNull(TransportPutLifecycleAction.readStepKeys(REGISTRY, client, "", "phase")); + + assertThat(TransportPutLifecycleAction.readStepKeys(REGISTRY, client, "{\n" + + " \"policy\": \"my_lifecycle3\",\n" + + " \"phase_definition\": { \n" + + " \"min_age\": \"0ms\",\n" + + " \"actions\": {\n" + + " \"rollover\": {\n" + + " \"max_age\": \"30s\"\n" + + " }\n" + + " }\n" + + " },\n" + + " \"version\": 3, \n" + + " \"modified_date_in_millis\": 1539609701576 \n" + + " }", "phase"), + contains(new Step.StepKey("phase", "rollover", WaitForRolloverReadyStep.NAME), + new Step.StepKey("phase", "rollover", RolloverStep.NAME), + new Step.StepKey("phase", "rollover", UpdateRolloverLifecycleDateStep.NAME), + new Step.StepKey("phase", "rollover", RolloverAction.INDEXING_COMPLETE_STEP_NAME))); + + assertThat(TransportPutLifecycleAction.readStepKeys(REGISTRY, client, "{\n" + + " \"policy\" : \"my_lifecycle3\",\n" + + " \"phase_definition\" : {\n" + + " \"min_age\" : \"20m\",\n" + + " \"actions\" : {\n" + + " \"rollover\" : {\n" + + " \"max_age\" : \"5s\"\n" + + " },\n" + + " \"set_priority\" : {\n" + + " \"priority\" : 150\n" + + " }\n" + + " }\n" + + " },\n" + + " \"version\" : 1,\n" + + " \"modified_date_in_millis\" : 1578521007076\n" + + " }", "phase"), + contains(new Step.StepKey("phase", "rollover", WaitForRolloverReadyStep.NAME), + new Step.StepKey("phase", "rollover", RolloverStep.NAME), + new Step.StepKey("phase", "rollover", UpdateRolloverLifecycleDateStep.NAME), + new Step.StepKey("phase", "rollover", RolloverAction.INDEXING_COMPLETE_STEP_NAME), + new Step.StepKey("phase", "set_priority", SetPriorityAction.NAME))); + + Map actions = new HashMap<>(); + actions.put("forcemerge", new ForceMergeAction(5)); + actions.put("freeze", new FreezeAction()); + actions.put("allocate", new AllocateAction(1, null, null, null)); + PhaseExecutionInfo pei = new PhaseExecutionInfo("policy", new Phase("wonky", TimeValue.ZERO, actions), 1, 1); + String phaseDef = Strings.toString(pei); + logger.info("--> phaseDef: {}", phaseDef); + + assertThat(TransportPutLifecycleAction.readStepKeys(REGISTRY, client, phaseDef, "phase"), + contains(new Step.StepKey("phase", "freeze", FreezeAction.NAME), + new Step.StepKey("phase", "allocate", AllocateAction.NAME), + new Step.StepKey("phase", "allocate", AllocationRoutedStep.NAME), + new Step.StepKey("phase", "forcemerge", ReadOnlyAction.NAME), + new Step.StepKey("phase", "forcemerge", ForceMergeAction.NAME), + new Step.StepKey("phase", "forcemerge", SegmentCountStep.NAME))); + } + + public void testIndexCanBeSafelyUpdated() { + + // Success case, it can be updated even though the configuration for the + // rollover and set_priority actions has changed + { + LifecycleExecutionState exState = LifecycleExecutionState.builder() + .setPhase("hot") + .setAction("rollover") + .setStep("check-rollover-ready") + .setPhaseDefinition("{\n" + + " \"policy\" : \"my-policy\",\n" + + " \"phase_definition\" : {\n" + + " \"min_age\" : \"20m\",\n" + + " \"actions\" : {\n" + + " \"rollover\" : {\n" + + " \"max_age\" : \"5s\"\n" + + " },\n" + + " \"set_priority\" : {\n" + + " \"priority\" : 150\n" + + " }\n" + + " }\n" + + " },\n" + + " \"version\" : 1,\n" + + " \"modified_date_in_millis\" : 1578521007076\n" + + " }") + .build(); + + IndexMetaData meta = mkMeta() + .putCustom(ILM_CUSTOM_METADATA_KEY, exState.asMap()) + .build(); + + Map actions = new HashMap<>(); + actions.put("rollover", new RolloverAction(null, null, 1L)); + actions.put("set_priority", new SetPriorityAction(100)); + Phase hotPhase = new Phase("hot", TimeValue.ZERO, actions); + Map phases = Collections.singletonMap("hot", hotPhase); + LifecyclePolicy newPolicy = new LifecyclePolicy("my-policy", phases); + + assertTrue(TransportPutLifecycleAction.isIndexPhaseDefinitionUpdatable(REGISTRY, client, meta, newPolicy)); + } + + // Failure case, can't update because the step we're currently on has been removed in the new policy + { + LifecycleExecutionState exState = LifecycleExecutionState.builder() + .setPhase("hot") + .setAction("rollover") + .setStep("check-rollover-ready") + .setPhaseDefinition("{\n" + + " \"policy\" : \"my-policy\",\n" + + " \"phase_definition\" : {\n" + + " \"min_age\" : \"20m\",\n" + + " \"actions\" : {\n" + + " \"rollover\" : {\n" + + " \"max_age\" : \"5s\"\n" + + " },\n" + + " \"set_priority\" : {\n" + + " \"priority\" : 150\n" + + " }\n" + + " }\n" + + " },\n" + + " \"version\" : 1,\n" + + " \"modified_date_in_millis\" : 1578521007076\n" + + " }") + .build(); + + IndexMetaData meta = mkMeta() + .putCustom(ILM_CUSTOM_METADATA_KEY, exState.asMap()) + .build(); + + Map actions = new HashMap<>(); + actions.put("set_priority", new SetPriorityAction(150)); + Phase hotPhase = new Phase("hot", TimeValue.ZERO, actions); + Map phases = Collections.singletonMap("hot", hotPhase); + LifecyclePolicy newPolicy = new LifecyclePolicy("my-policy", phases); + + assertFalse(TransportPutLifecycleAction.isIndexPhaseDefinitionUpdatable(REGISTRY, client, meta, newPolicy)); + } + + // Failure case, can't update because the future step has been deleted + { + LifecycleExecutionState exState = LifecycleExecutionState.builder() + .setPhase("hot") + .setAction("rollover") + .setStep("check-rollover-ready") + .setPhaseDefinition("{\n" + + " \"policy\" : \"my-policy\",\n" + + " \"phase_definition\" : {\n" + + " \"min_age\" : \"20m\",\n" + + " \"actions\" : {\n" + + " \"rollover\" : {\n" + + " \"max_age\" : \"5s\"\n" + + " },\n" + + " \"set_priority\" : {\n" + + " \"priority\" : 150\n" + + " }\n" + + " }\n" + + " },\n" + + " \"version\" : 1,\n" + + " \"modified_date_in_millis\" : 1578521007076\n" + + " }") + .build(); + + IndexMetaData meta = mkMeta() + .putCustom(ILM_CUSTOM_METADATA_KEY, exState.asMap()) + .build(); + + Map actions = new HashMap<>(); + actions.put("rollover", new RolloverAction(null, TimeValue.timeValueSeconds(5), null)); + Phase hotPhase = new Phase("hot", TimeValue.ZERO, actions); + Map phases = Collections.singletonMap("hot", hotPhase); + LifecyclePolicy newPolicy = new LifecyclePolicy("my-policy", phases); + + assertFalse(TransportPutLifecycleAction.isIndexPhaseDefinitionUpdatable(REGISTRY, client, meta, newPolicy)); + } + + // Failure case, index doesn't have enough info to check + { + LifecycleExecutionState exState = LifecycleExecutionState.builder() + .setPhaseDefinition("{\n" + + " \"policy\" : \"my-policy\",\n" + + " \"phase_definition\" : {\n" + + " \"min_age\" : \"20m\",\n" + + " \"actions\" : {\n" + + " \"rollover\" : {\n" + + " \"max_age\" : \"5s\"\n" + + " },\n" + + " \"set_priority\" : {\n" + + " \"priority\" : 150\n" + + " }\n" + + " }\n" + + " },\n" + + " \"version\" : 1,\n" + + " \"modified_date_in_millis\" : 1578521007076\n" + + " }") + .build(); + + IndexMetaData meta = mkMeta() + .putCustom(ILM_CUSTOM_METADATA_KEY, exState.asMap()) + .build(); + + Map actions = new HashMap<>(); + actions.put("rollover", new RolloverAction(null, null, 1L)); + actions.put("set_priority", new SetPriorityAction(100)); + Phase hotPhase = new Phase("hot", TimeValue.ZERO, actions); + Map phases = Collections.singletonMap("hot", hotPhase); + LifecyclePolicy newPolicy = new LifecyclePolicy("my-policy", phases); + + assertFalse(TransportPutLifecycleAction.isIndexPhaseDefinitionUpdatable(REGISTRY, client, meta, newPolicy)); + } + + // Failure case, the phase JSON is unparseable + { + LifecycleExecutionState exState = LifecycleExecutionState.builder() + .setPhase("hot") + .setAction("rollover") + .setStep("check-rollover-ready") + .setPhaseDefinition("potato") + .build(); + + IndexMetaData meta = mkMeta() + .putCustom(ILM_CUSTOM_METADATA_KEY, exState.asMap()) + .build(); + + Map actions = new HashMap<>(); + actions.put("rollover", new RolloverAction(null, null, 1L)); + actions.put("set_priority", new SetPriorityAction(100)); + Phase hotPhase = new Phase("hot", TimeValue.ZERO, actions); + Map phases = Collections.singletonMap("hot", hotPhase); + LifecyclePolicy newPolicy = new LifecyclePolicy("my-policy", phases); + + assertFalse(TransportPutLifecycleAction.isIndexPhaseDefinitionUpdatable(REGISTRY, client, meta, newPolicy)); + } + } + + public void testRefreshPhaseJson() { + LifecycleExecutionState exState = LifecycleExecutionState.builder() + .setPhase("hot") + .setAction("rollover") + .setStep("check-rollover-ready") + .setPhaseDefinition("{\n" + + " \"policy\" : \"my-policy\",\n" + + " \"phase_definition\" : {\n" + + " \"min_age\" : \"20m\",\n" + + " \"actions\" : {\n" + + " \"rollover\" : {\n" + + " \"max_age\" : \"5s\"\n" + + " },\n" + + " \"set_priority\" : {\n" + + " \"priority\" : 150\n" + + " }\n" + + " }\n" + + " },\n" + + " \"version\" : 1,\n" + + " \"modified_date_in_millis\" : 1578521007076\n" + + " }") + .build(); + + IndexMetaData meta = mkMeta() + .putCustom(ILM_CUSTOM_METADATA_KEY, exState.asMap()) + .build(); + + Map actions = new HashMap<>(); + actions.put("rollover", new RolloverAction(null, null, 1L)); + actions.put("set_priority", new SetPriorityAction(100)); + Phase hotPhase = new Phase("hot", TimeValue.ZERO, actions); + Map phases = Collections.singletonMap("hot", hotPhase); + LifecyclePolicy newPolicy = new LifecyclePolicy("my-policy", phases); + LifecyclePolicyMetadata policyMetadata = new LifecyclePolicyMetadata(newPolicy, Collections.emptyMap(), 2L, 2L); + + ClusterState existingState = ClusterState.builder(ClusterState.EMPTY_STATE) + .metaData(MetaData.builder(MetaData.EMPTY_META_DATA) + .put(meta, false) + .build()) + .build(); + + ClusterState changedState = TransportPutLifecycleAction.refreshPhaseDefinition(existingState, index, policyMetadata); + + IndexMetaData newIdxMeta = changedState.metaData().index(index); + LifecycleExecutionState afterExState = LifecycleExecutionState.fromIndexMetadata(newIdxMeta); + Map beforeState = new HashMap<>(exState.asMap()); + beforeState.remove("phase_definition"); + Map afterState = new HashMap<>(afterExState.asMap()); + afterState.remove("phase_definition"); + // Check that no other execution state changes have been made + assertThat(beforeState, equalTo(afterState)); + + // Check that the phase definition has been refreshed + assertThat(afterExState.getPhaseDefinition(), + equalTo("{\"policy\":\"my-policy\",\"phase_definition\":{\"min_age\":\"0ms\",\"actions\":{\"rollover\":{\"max_docs\":1}," + + "\"set_priority\":{\"priority\":100}}},\"version\":2,\"modified_date_in_millis\":2}")); + } + + public void testUpdateIndicesForPolicy() { + LifecycleExecutionState exState = LifecycleExecutionState.builder() + .setPhase("hot") + .setAction("rollover") + .setStep("check-rollover-ready") + .setPhaseDefinition("{\"policy\":\"my-policy\",\"phase_definition\":{\"min_age\":\"0ms\",\"actions\":{\"rollover\":" + + "{\"max_docs\":1},\"set_priority\":{\"priority\":100}}},\"version\":1,\"modified_date_in_millis\":1578521007076}") + .build(); + + IndexMetaData meta = mkMeta() + .putCustom(ILM_CUSTOM_METADATA_KEY, exState.asMap()) + .build(); + + assertTrue(TransportPutLifecycleAction.eligibleToCheckForRefresh(meta)); + + Map oldActions = new HashMap<>(); + oldActions.put("rollover", new RolloverAction(null, null, 1L)); + oldActions.put("set_priority", new SetPriorityAction(100)); + Phase oldHotPhase = new Phase("hot", TimeValue.ZERO, oldActions); + Map oldPhases = Collections.singletonMap("hot", oldHotPhase); + LifecyclePolicy oldPolicy = new LifecyclePolicy("my-policy", oldPhases); + + Map actions = new HashMap<>(); + actions.put("rollover", new RolloverAction(null, null, 1L)); + actions.put("set_priority", new SetPriorityAction(100)); + Phase hotPhase = new Phase("hot", TimeValue.ZERO, actions); + Map phases = Collections.singletonMap("hot", hotPhase); + LifecyclePolicy newPolicy = new LifecyclePolicy("my-policy", phases); + LifecyclePolicyMetadata policyMetadata = new LifecyclePolicyMetadata(newPolicy, Collections.emptyMap(), 2L, 2L); + + assertTrue(TransportPutLifecycleAction.isIndexPhaseDefinitionUpdatable(REGISTRY, client, meta, newPolicy)); + + ClusterState existingState = ClusterState.builder(ClusterState.EMPTY_STATE) + .metaData(MetaData.builder(MetaData.EMPTY_META_DATA) + .put(meta, false) + .build()) + .build(); + + logger.info("--> update for unchanged policy"); + ClusterState updatedState = TransportPutLifecycleAction.updateIndicesForPolicy(existingState, REGISTRY, + client, oldPolicy, policyMetadata); + + // No change, because the policies were identical + assertThat(updatedState, equalTo(existingState)); + + actions = new HashMap<>(); + actions.put("rollover", new RolloverAction(null, null, 2L)); + actions.put("set_priority", new SetPriorityAction(150)); + hotPhase = new Phase("hot", TimeValue.ZERO, actions); + phases = Collections.singletonMap("hot", hotPhase); + newPolicy = new LifecyclePolicy("my-policy", phases); + policyMetadata = new LifecyclePolicyMetadata(newPolicy, Collections.emptyMap(), 2L, 2L); + + logger.info("--> update with changed policy, but not configured in settings"); + updatedState = TransportPutLifecycleAction.updateIndicesForPolicy(existingState, REGISTRY, client, oldPolicy, policyMetadata); + + // No change, because the index doesn't have a lifecycle.name setting for this policy + assertThat(updatedState, equalTo(existingState)); + + meta = IndexMetaData.builder(index) + .settings(Settings.builder() + .put(LifecycleSettings.LIFECYCLE_NAME, "my-policy") + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, randomIntBetween(1, 10)) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, randomIntBetween(0, 5)) + .put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetaData.SETTING_INDEX_UUID, randomAlphaOfLength(5))) + .putCustom(ILM_CUSTOM_METADATA_KEY, exState.asMap()) + .build(); + existingState = ClusterState.builder(ClusterState.EMPTY_STATE) + .metaData(MetaData.builder(MetaData.EMPTY_META_DATA) + .put(meta, false) + .build()) + .build(); + + logger.info("--> update with changed policy and this index has the policy"); + updatedState = TransportPutLifecycleAction.updateIndicesForPolicy(existingState, REGISTRY, client, oldPolicy, policyMetadata); + + IndexMetaData newIdxMeta = updatedState.metaData().index(index); + LifecycleExecutionState afterExState = LifecycleExecutionState.fromIndexMetadata(newIdxMeta); + Map beforeState = new HashMap<>(exState.asMap()); + beforeState.remove("phase_definition"); + Map afterState = new HashMap<>(afterExState.asMap()); + afterState.remove("phase_definition"); + // Check that no other execution state changes have been made + assertThat(beforeState, equalTo(afterState)); + + // Check that the phase definition has been refreshed + assertThat(afterExState.getPhaseDefinition(), + equalTo("{\"policy\":\"my-policy\",\"phase_definition\":{\"min_age\":\"0ms\",\"actions\":{\"rollover\":{\"max_docs\":2}," + + "\"set_priority\":{\"priority\":150}}},\"version\":2,\"modified_date_in_millis\":2}")); + } + + private static IndexMetaData.Builder mkMeta() { + return IndexMetaData.builder(index) + .settings(Settings.builder() + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, randomIntBetween(1, 10)) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, randomIntBetween(0, 5)) + .put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetaData.SETTING_INDEX_UUID, randomAlphaOfLength(5))); + } +}