From e92534682db9362027ad316868f630e6821ddb59 Mon Sep 17 00:00:00 2001 From: Lee Hinman Date: Mon, 13 Jan 2020 13:44:56 -0700 Subject: [PATCH] =?UTF-8?q?Refresh=20cached=20phase=20policy=20definition?= =?UTF-8?q?=20if=20possible=20on=20new=20poli=E2=80=A6=20(#50820)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Refresh cached phase policy definition if possible on new policy There are some cases when updating a policy does not change the structure in a significant way. In these cases, we can reread the policy definition for any indices using the updated policy. This commit adds this refreshing to the `TransportPutLifecycleAction` to allow this. It allows us to do things like change the configuration values for a particular step, even when on that step (for example, changing the rollover criteria while on the `check-rollover-ready` step). There are more cases where the phase definition can be reread that just the ones checked here (for example, removing an action that has already been passed), and those will be added in subsequent work. Relates to #48431 --- .../reference/ilm/policy-definitions.asciidoc | 14 +- .../ilm/TimeSeriesLifecycleActionsIT.java | 35 ++ .../xpack/ilm/IndexLifecycleTransition.java | 4 +- .../action/TransportPutLifecycleAction.java | 215 +++++++- .../TransportPutLifecycleActionTests.java | 499 ++++++++++++++++++ 5 files changed, 756 insertions(+), 11 deletions(-) create mode 100644 x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/action/TransportPutLifecycleActionTests.java diff --git a/docs/reference/ilm/policy-definitions.asciidoc b/docs/reference/ilm/policy-definitions.asciidoc index 7114516e4d2a9..67fcdba9b76ef 100644 --- a/docs/reference/ilm/policy-definitions.asciidoc +++ b/docs/reference/ilm/policy-definitions.asciidoc @@ -55,7 +55,7 @@ PUT _ilm/policy/my_policy } -------------------------------------------------- -The Above example configures a policy that moves the index into the warm +The above example configures a policy that moves the index into the warm phase after one day. Until then, the index is in a waiting state. After moving into the warm phase, it will wait until 30 days have elapsed before moving to the delete phase and deleting the index. @@ -76,10 +76,14 @@ check occurs. === Phase Execution The current phase definition, of an index's policy being executed, is stored -in the index's metadata. The phase and its actions are compiled into a series -of discrete steps that are executed sequentially. Since some {ilm-init} actions -are more complex and involve multiple operations against an index, each of these -operations are done in isolation in a unit called a "step". The +in the index's metadata. This phase definition is cached to prevent changes to +the policy from putting the index in a state where it cannot proceed from its +current step. When the policy is updated we check to see if this phase +definition can be safely updated, and if so, update the cached definition in +indices using the updated policy. The phase and its actions are compiled into a +series of discrete steps that are executed sequentially. Since some {ilm-init} +actions are more complex and involve multiple operations against an index, each +of these operations are done in isolation in a unit called a "step". The <> exposes this information to us to see which step our index is either to execute next, or is currently executing. diff --git a/x-pack/plugin/ilm/qa/multi-node/src/test/java/org/elasticsearch/xpack/ilm/TimeSeriesLifecycleActionsIT.java b/x-pack/plugin/ilm/qa/multi-node/src/test/java/org/elasticsearch/xpack/ilm/TimeSeriesLifecycleActionsIT.java index ca35a6bee0e03..b7fc5bdcb69e8 100644 --- a/x-pack/plugin/ilm/qa/multi-node/src/test/java/org/elasticsearch/xpack/ilm/TimeSeriesLifecycleActionsIT.java +++ b/x-pack/plugin/ilm/qa/multi-node/src/test/java/org/elasticsearch/xpack/ilm/TimeSeriesLifecycleActionsIT.java @@ -1304,6 +1304,41 @@ public void testRetryableInitializationStep() throws Exception { }); } + public void testRefreshablePhaseJson() throws Exception { + String index = "refresh-index"; + + createNewSingletonPolicy("hot", new RolloverAction(null, null, 100L)); + Request createIndexTemplate = new Request("PUT", "_template/rolling_indexes"); + createIndexTemplate.setJsonEntity("{" + + "\"index_patterns\": [\""+ index + "-*\"], \n" + + " \"settings\": {\n" + + " \"number_of_shards\": 1,\n" + + " \"number_of_replicas\": 0,\n" + + " \"index.lifecycle.name\": \"" + policy+ "\",\n" + + " \"index.lifecycle.rollover_alias\": \"alias\"\n" + + " }\n" + + "}"); + client().performRequest(createIndexTemplate); + + createIndexWithSettings(index + "-1", + Settings.builder().put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0), + true); + + // Index a document + index(client(), index + "-1", "1", "foo", "bar"); + + // Wait for the index to enter the check-rollover-ready step + assertBusy(() -> assertThat(getStepKeyForIndex(index + "-1").getName(), equalTo(WaitForRolloverReadyStep.NAME))); + + // Update the policy to allow rollover at 1 document instead of 100 + createNewSingletonPolicy("hot", new RolloverAction(null, null, 1L)); + + // Index should now have been able to roll over, creating the new index and proceeding to the "complete" step + assertBusy(() -> assertThat(indexExists(index + "-000002"), is(true))); + assertBusy(() -> assertThat(getStepKeyForIndex(index + "-1").getName(), equalTo(TerminalPolicyStep.KEY.getName()))); + } + // This method should be called inside an assertBusy, it has no retry logic of its own private void assertHistoryIsPresent(String policyName, String indexName, boolean success, String stepName) throws IOException { assertHistoryIsPresent(policyName, indexName, success, null, null, stepName); diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycleTransition.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycleTransition.java index 816186882927c..0f6696b0be537 100644 --- a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycleTransition.java +++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycleTransition.java @@ -251,8 +251,8 @@ private static LifecycleExecutionState updateExecutionStateToStep(LifecyclePolic /** * Given a cluster state and lifecycle state, return a new state using the new lifecycle state for the given index. */ - private static ClusterState.Builder newClusterStateWithLifecycleState(Index index, ClusterState clusterState, - LifecycleExecutionState lifecycleState) { + public static ClusterState.Builder newClusterStateWithLifecycleState(Index index, ClusterState clusterState, + LifecycleExecutionState lifecycleState) { ClusterState.Builder newClusterStateBuilder = ClusterState.builder(clusterState); newClusterStateBuilder.metaData(MetaData.builder(clusterState.getMetaData()) .put(IndexMetaData.builder(clusterState.getMetaData().index(index)) diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/action/TransportPutLifecycleAction.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/action/TransportPutLifecycleAction.java index be0a718c8bf10..d1d6205d786e4 100644 --- a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/action/TransportPutLifecycleAction.java +++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/action/TransportPutLifecycleAction.java @@ -8,34 +8,54 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.master.TransportMasterNodeAction; +import org.elasticsearch.client.Client; import org.elasticsearch.cluster.AckedClusterStateUpdateTask; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockLevel; +import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.Strings; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.xcontent.DeprecationHandler; +import org.elasticsearch.common.xcontent.NamedXContentRegistry; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.common.xcontent.json.JsonXContent; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.core.ClientHelper; +import org.elasticsearch.xpack.core.ilm.ErrorStep; import org.elasticsearch.xpack.core.ilm.IndexLifecycleMetadata; +import org.elasticsearch.xpack.core.ilm.LifecycleExecutionState; import org.elasticsearch.xpack.core.ilm.LifecyclePolicy; import org.elasticsearch.xpack.core.ilm.LifecyclePolicyMetadata; +import org.elasticsearch.xpack.core.ilm.LifecycleSettings; +import org.elasticsearch.xpack.core.ilm.PhaseExecutionInfo; +import org.elasticsearch.xpack.core.ilm.Step; import org.elasticsearch.xpack.core.ilm.action.PutLifecycleAction; import org.elasticsearch.xpack.core.ilm.action.PutLifecycleAction.Request; import org.elasticsearch.xpack.core.ilm.action.PutLifecycleAction.Response; +import org.elasticsearch.xpack.ilm.IndexLifecycleTransition; import java.io.IOException; import java.time.Instant; +import java.util.LinkedHashSet; +import java.util.List; import java.util.Map; +import java.util.Set; import java.util.SortedMap; +import java.util.Spliterators; import java.util.TreeMap; import java.util.stream.Collectors; +import java.util.stream.StreamSupport; /** * This class is responsible for bootstrapping {@link IndexLifecycleMetadata} into the cluster-state, as well @@ -44,12 +64,17 @@ public class TransportPutLifecycleAction extends TransportMasterNodeAction { private static final Logger logger = LogManager.getLogger(TransportPutLifecycleAction.class); + private final NamedXContentRegistry xContentRegistry; + private final Client client; @Inject public TransportPutLifecycleAction(TransportService transportService, ClusterService clusterService, ThreadPool threadPool, - ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) { + ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, + NamedXContentRegistry namedXContentRegistry, Client client) { super(PutLifecycleAction.NAME, transportService, clusterService, threadPool, actionFilters, Request::new, indexNameExpressionResolver); + this.xContentRegistry = namedXContentRegistry; + this.client = client; } @Override @@ -81,7 +106,7 @@ protected Response newResponse(boolean acknowledged) { @Override public ClusterState execute(ClusterState currentState) throws Exception { - ClusterState.Builder newState = ClusterState.builder(currentState); + ClusterState.Builder stateBuilder = ClusterState.builder(currentState); IndexLifecycleMetadata currentMetadata = currentState.metaData().custom(IndexLifecycleMetadata.TYPE); if (currentMetadata == null) { // first time using index-lifecycle feature, bootstrap metadata currentMetadata = IndexLifecycleMetadata.EMPTY; @@ -99,13 +124,195 @@ public ClusterState execute(ClusterState currentState) throws Exception { logger.info("updating index lifecycle policy [{}]", request.getPolicy().getName()); } IndexLifecycleMetadata newMetadata = new IndexLifecycleMetadata(newPolicies, currentMetadata.getOperationMode()); - newState.metaData(MetaData.builder(currentState.getMetaData()) + stateBuilder.metaData(MetaData.builder(currentState.getMetaData()) .putCustom(IndexLifecycleMetadata.TYPE, newMetadata).build()); - return newState.build(); + ClusterState nonRefreshedState = stateBuilder.build(); + if (oldPolicy == null) { + return nonRefreshedState; + } else { + try { + return updateIndicesForPolicy(nonRefreshedState, xContentRegistry, client, + oldPolicy.getPolicy(), lifecyclePolicyMetadata); + } catch (Exception e) { + logger.warn(new ParameterizedMessage("unable to refresh indices phase JSON for updated policy [{}]", + oldPolicy.getName()), e); + // Revert to the non-refreshed state + return nonRefreshedState; + } + } } }); } + /** + * Ensure that we have the minimum amount of metadata necessary to check for cache phase + * refresh. This includes: + * - An execution state + * - Existing phase definition JSON + * - A current step key + * - A current phase in the step key + * - Not currently in the ERROR step + */ + static boolean eligibleToCheckForRefresh(final IndexMetaData metaData) { + LifecycleExecutionState executionState = LifecycleExecutionState.fromIndexMetadata(metaData); + if (executionState == null || executionState.getPhaseDefinition() == null) { + return false; + } + + Step.StepKey currentStepKey = LifecycleExecutionState.getCurrentStepKey(executionState); + if (currentStepKey == null || currentStepKey.getPhase() == null) { + return false; + } + + return ErrorStep.NAME.equals(currentStepKey.getName()) == false; + } + + /** + * Parse the {@code phaseDef} phase definition to get the stepkeys for the given phase. + * If there is an error parsing or if the phase definition is missing the required + * information, returns null. + */ + @Nullable + static Set readStepKeys(final NamedXContentRegistry xContentRegistry, final Client client, + final String phaseDef, final String currentPhase) { + final PhaseExecutionInfo phaseExecutionInfo; + try (XContentParser parser = JsonXContent.jsonXContent.createParser(xContentRegistry, + DeprecationHandler.THROW_UNSUPPORTED_OPERATION, phaseDef)) { + phaseExecutionInfo = PhaseExecutionInfo.parse(parser, currentPhase); + } catch (Exception e) { + logger.trace(new ParameterizedMessage("exception reading step keys checking for refreshability, phase definition: {}", + phaseDef), e); + return null; + } + + if (phaseExecutionInfo == null || phaseExecutionInfo.getPhase() == null) { + return null; + } + + return phaseExecutionInfo.getPhase().getActions().values().stream() + .flatMap(a -> a.toSteps(client, phaseExecutionInfo.getPhase().getName(), null).stream()) + .map(Step::getKey) + .collect(Collectors.toCollection(LinkedHashSet::new)); + } + + /** + * Returns 'true' if the index's cached phase JSON can be safely reread, 'false' otherwise. + */ + static boolean isIndexPhaseDefinitionUpdatable(final NamedXContentRegistry xContentRegistry, final Client client, + final IndexMetaData metaData, final LifecyclePolicy newPolicy) { + final String index = metaData.getIndex().getName(); + if (eligibleToCheckForRefresh(metaData) == false) { + logger.debug("[{}] does not contain enough information to check for eligibility of refreshing phase", index); + return false; + } + final String policyId = newPolicy.getName(); + + final LifecycleExecutionState executionState = LifecycleExecutionState.fromIndexMetadata(metaData); + final Step.StepKey currentStepKey = LifecycleExecutionState.getCurrentStepKey(executionState); + final String currentPhase = currentStepKey.getPhase(); + + final Set newStepKeys = newPolicy.toSteps(client).stream() + .map(Step::getKey) + .collect(Collectors.toCollection(LinkedHashSet::new)); + + if (newStepKeys.contains(currentStepKey) == false) { + // The index is on a step that doesn't exist in the new policy, we + // can't safely re-read the JSON + logger.debug("[{}] updated policy [{}] does not contain the current step key [{}], so the policy phase will not be refreshed", + index, policyId, currentStepKey); + return false; + } + + final String phaseDef = executionState.getPhaseDefinition(); + final Set oldStepKeys = readStepKeys(xContentRegistry, client, phaseDef, currentPhase); + if (oldStepKeys == null) { + logger.debug("[{}] unable to parse phase definition for cached policy [{}], policy phase will not be refreshed", + index, policyId); + return false; + } + + final Set oldPhaseStepKeys = oldStepKeys.stream() + .filter(sk -> currentPhase.equals(sk.getPhase())) + .collect(Collectors.toCollection(LinkedHashSet::new)); + + final PhaseExecutionInfo phaseExecutionInfo = new PhaseExecutionInfo(policyId, newPolicy.getPhases().get(currentPhase), 1L, 1L); + final String peiJson = Strings.toString(phaseExecutionInfo); + + final Set newPhaseStepKeys = readStepKeys(xContentRegistry, client, peiJson, currentPhase); + if (newPhaseStepKeys == null) { + logger.debug(new ParameterizedMessage("[{}] unable to parse phase definition for policy [{}] " + + "to determine if it could be refreshed", index, policyId)); + return false; + } + + if (newPhaseStepKeys.equals(oldPhaseStepKeys)) { + // The new and old phase have the same stepkeys for this current phase, so we can + // refresh the definition because we know it won't change the execution flow. + logger.debug("[{}] updated policy [{}] contains the same phase step keys and can be refreshed", index, policyId); + return true; + } else { + logger.debug("[{}] updated policy [{}] has different phase step keys and will NOT refresh phase " + + "definition as it differs too greatly. old: {}, new: {}", + index, policyId, oldPhaseStepKeys, newPhaseStepKeys); + return false; + } + } + + /** + * Rereads the phase JSON for the given index, returning a new cluster state. + */ + static ClusterState refreshPhaseDefinition(final ClusterState state, final String index, final LifecyclePolicyMetadata updatedPolicy) { + final IndexMetaData idxMeta = state.metaData().index(index); + assert eligibleToCheckForRefresh(idxMeta) : "index " + index + " is missing crucial information needed to refresh phase definition"; + + logger.trace("[{}] updating cached phase definition for policy [{}]", index, updatedPolicy.getName()); + LifecycleExecutionState currentExState = LifecycleExecutionState.fromIndexMetadata(idxMeta); + + String currentPhase = currentExState.getPhase(); + PhaseExecutionInfo pei = new PhaseExecutionInfo(updatedPolicy.getName(), + updatedPolicy.getPolicy().getPhases().get(currentPhase), updatedPolicy.getVersion(), updatedPolicy.getModifiedDate()); + + LifecycleExecutionState newExState = LifecycleExecutionState.builder(currentExState) + .setPhaseDefinition(Strings.toString(pei, false, false)) + .build(); + + return IndexLifecycleTransition.newClusterStateWithLifecycleState(idxMeta.getIndex(), state, newExState).build(); + } + + /** + * For the given new policy, returns a new cluster with all updateable indices' phase JSON refreshed. + */ + static ClusterState updateIndicesForPolicy(final ClusterState state, final NamedXContentRegistry xContentRegistry, final Client client, + final LifecyclePolicy oldPolicy, final LifecyclePolicyMetadata newPolicy) { + assert oldPolicy.getName().equals(newPolicy.getName()) : "expected both policies to have the same id but they were: [" + + oldPolicy.getName() + "] vs. [" + newPolicy.getName() + "]"; + + // No need to update anything if the policies are identical in contents + if (oldPolicy.equals(newPolicy.getPolicy())) { + logger.debug("policy [{}] is unchanged and no phase definition refresh is needed", oldPolicy.getName()); + return state; + } + + final List indicesThatCanBeUpdated = + StreamSupport.stream(Spliterators.spliteratorUnknownSize(state.metaData().indices().valuesIt(), 0), false) + .filter(meta -> newPolicy.getName().equals(LifecycleSettings.LIFECYCLE_NAME_SETTING.get(meta.getSettings()))) + .filter(meta -> isIndexPhaseDefinitionUpdatable(xContentRegistry, client, meta, newPolicy.getPolicy())) + .map(meta -> meta.getIndex().getName()) + .collect(Collectors.toList()); + + ClusterState updatedState = state; + for (String index : indicesThatCanBeUpdated) { + try { + updatedState = refreshPhaseDefinition(updatedState, index, newPolicy); + } catch (Exception e) { + logger.warn(new ParameterizedMessage("[{}] unable to refresh phase definition for updated policy [{}]", + index, newPolicy.getName()), e); + } + } + + return updatedState; + } + @Override protected ClusterBlockException checkBlock(Request request, ClusterState state) { return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE); diff --git a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/action/TransportPutLifecycleActionTests.java b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/action/TransportPutLifecycleActionTests.java new file mode 100644 index 0000000000000..f2f67fa281f90 --- /dev/null +++ b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/action/TransportPutLifecycleActionTests.java @@ -0,0 +1,499 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.ilm.action; + +import org.elasticsearch.Version; +import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.xcontent.NamedXContentRegistry; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.xpack.core.ilm.AllocateAction; +import org.elasticsearch.xpack.core.ilm.AllocationRoutedStep; +import org.elasticsearch.xpack.core.ilm.ErrorStep; +import org.elasticsearch.xpack.core.ilm.ForceMergeAction; +import org.elasticsearch.xpack.core.ilm.FreezeAction; +import org.elasticsearch.xpack.core.ilm.LifecycleAction; +import org.elasticsearch.xpack.core.ilm.LifecycleExecutionState; +import org.elasticsearch.xpack.core.ilm.LifecyclePolicy; +import org.elasticsearch.xpack.core.ilm.LifecyclePolicyMetadata; +import org.elasticsearch.xpack.core.ilm.LifecycleSettings; +import org.elasticsearch.xpack.core.ilm.Phase; +import org.elasticsearch.xpack.core.ilm.PhaseExecutionInfo; +import org.elasticsearch.xpack.core.ilm.ReadOnlyAction; +import org.elasticsearch.xpack.core.ilm.RolloverAction; +import org.elasticsearch.xpack.core.ilm.RolloverStep; +import org.elasticsearch.xpack.core.ilm.SegmentCountStep; +import org.elasticsearch.xpack.core.ilm.SetPriorityAction; +import org.elasticsearch.xpack.core.ilm.Step; +import org.elasticsearch.xpack.core.ilm.UpdateRolloverLifecycleDateStep; +import org.elasticsearch.xpack.core.ilm.WaitForRolloverReadyStep; +import org.elasticsearch.xpack.ilm.IndexLifecycle; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.elasticsearch.xpack.core.ilm.LifecycleExecutionState.ILM_CUSTOM_METADATA_KEY; +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.equalTo; +import static org.mockito.Mockito.mock; + +public class TransportPutLifecycleActionTests extends ESTestCase { + private static final NamedXContentRegistry REGISTRY; + private static final Client client = mock(Client.class); + private static final String index = "eggplant"; + + static { + try (IndexLifecycle indexLifecycle = new IndexLifecycle(Settings.EMPTY)) { + List entries = new ArrayList<>(indexLifecycle.getNamedXContent()); + REGISTRY = new NamedXContentRegistry(entries); + } + } + + public void testEligibleForRefresh() { + IndexMetaData meta = mkMeta().build(); + assertFalse(TransportPutLifecycleAction.eligibleToCheckForRefresh(meta)); + + LifecycleExecutionState state = LifecycleExecutionState.builder().build(); + meta = mkMeta().putCustom(ILM_CUSTOM_METADATA_KEY, state.asMap()).build(); + assertFalse(TransportPutLifecycleAction.eligibleToCheckForRefresh(meta)); + + state = LifecycleExecutionState.builder() + .setPhase("phase") + .setAction("action") + .setStep("step") + .build(); + meta = mkMeta().putCustom(ILM_CUSTOM_METADATA_KEY, state.asMap()).build(); + assertFalse(TransportPutLifecycleAction.eligibleToCheckForRefresh(meta)); + + state = LifecycleExecutionState.builder() + .setPhaseDefinition("{}") + .build(); + meta = mkMeta().putCustom(ILM_CUSTOM_METADATA_KEY, state.asMap()).build(); + assertFalse(TransportPutLifecycleAction.eligibleToCheckForRefresh(meta)); + + state = LifecycleExecutionState.builder() + .setPhase("phase") + .setAction("action") + .setStep(ErrorStep.NAME) + .setPhaseDefinition("{}") + .build(); + meta = mkMeta().putCustom(ILM_CUSTOM_METADATA_KEY, state.asMap()).build(); + assertFalse(TransportPutLifecycleAction.eligibleToCheckForRefresh(meta)); + + state = LifecycleExecutionState.builder() + .setPhase("phase") + .setAction("action") + .setStep("step") + .setPhaseDefinition("{}") + .build(); + meta = mkMeta().putCustom(ILM_CUSTOM_METADATA_KEY, state.asMap()).build(); + assertTrue(TransportPutLifecycleAction.eligibleToCheckForRefresh(meta)); + } + + public void testReadStepKeys() { + assertNull(TransportPutLifecycleAction.readStepKeys(REGISTRY, client, "{}", "phase")); + assertNull(TransportPutLifecycleAction.readStepKeys(REGISTRY, client, "aoeu", "phase")); + assertNull(TransportPutLifecycleAction.readStepKeys(REGISTRY, client, "", "phase")); + + assertThat(TransportPutLifecycleAction.readStepKeys(REGISTRY, client, "{\n" + + " \"policy\": \"my_lifecycle3\",\n" + + " \"phase_definition\": { \n" + + " \"min_age\": \"0ms\",\n" + + " \"actions\": {\n" + + " \"rollover\": {\n" + + " \"max_age\": \"30s\"\n" + + " }\n" + + " }\n" + + " },\n" + + " \"version\": 3, \n" + + " \"modified_date_in_millis\": 1539609701576 \n" + + " }", "phase"), + contains(new Step.StepKey("phase", "rollover", WaitForRolloverReadyStep.NAME), + new Step.StepKey("phase", "rollover", RolloverStep.NAME), + new Step.StepKey("phase", "rollover", UpdateRolloverLifecycleDateStep.NAME), + new Step.StepKey("phase", "rollover", RolloverAction.INDEXING_COMPLETE_STEP_NAME))); + + assertThat(TransportPutLifecycleAction.readStepKeys(REGISTRY, client, "{\n" + + " \"policy\" : \"my_lifecycle3\",\n" + + " \"phase_definition\" : {\n" + + " \"min_age\" : \"20m\",\n" + + " \"actions\" : {\n" + + " \"rollover\" : {\n" + + " \"max_age\" : \"5s\"\n" + + " },\n" + + " \"set_priority\" : {\n" + + " \"priority\" : 150\n" + + " }\n" + + " }\n" + + " },\n" + + " \"version\" : 1,\n" + + " \"modified_date_in_millis\" : 1578521007076\n" + + " }", "phase"), + contains(new Step.StepKey("phase", "rollover", WaitForRolloverReadyStep.NAME), + new Step.StepKey("phase", "rollover", RolloverStep.NAME), + new Step.StepKey("phase", "rollover", UpdateRolloverLifecycleDateStep.NAME), + new Step.StepKey("phase", "rollover", RolloverAction.INDEXING_COMPLETE_STEP_NAME), + new Step.StepKey("phase", "set_priority", SetPriorityAction.NAME))); + + Map actions = new HashMap<>(); + actions.put("forcemerge", new ForceMergeAction(5)); + actions.put("freeze", new FreezeAction()); + actions.put("allocate", new AllocateAction(1, null, null, null)); + PhaseExecutionInfo pei = new PhaseExecutionInfo("policy", new Phase("wonky", TimeValue.ZERO, actions), 1, 1); + String phaseDef = Strings.toString(pei); + logger.info("--> phaseDef: {}", phaseDef); + + assertThat(TransportPutLifecycleAction.readStepKeys(REGISTRY, client, phaseDef, "phase"), + contains(new Step.StepKey("phase", "freeze", FreezeAction.NAME), + new Step.StepKey("phase", "allocate", AllocateAction.NAME), + new Step.StepKey("phase", "allocate", AllocationRoutedStep.NAME), + new Step.StepKey("phase", "forcemerge", ReadOnlyAction.NAME), + new Step.StepKey("phase", "forcemerge", ForceMergeAction.NAME), + new Step.StepKey("phase", "forcemerge", SegmentCountStep.NAME))); + } + + public void testIndexCanBeSafelyUpdated() { + + // Success case, it can be updated even though the configuration for the + // rollover and set_priority actions has changed + { + LifecycleExecutionState exState = LifecycleExecutionState.builder() + .setPhase("hot") + .setAction("rollover") + .setStep("check-rollover-ready") + .setPhaseDefinition("{\n" + + " \"policy\" : \"my-policy\",\n" + + " \"phase_definition\" : {\n" + + " \"min_age\" : \"20m\",\n" + + " \"actions\" : {\n" + + " \"rollover\" : {\n" + + " \"max_age\" : \"5s\"\n" + + " },\n" + + " \"set_priority\" : {\n" + + " \"priority\" : 150\n" + + " }\n" + + " }\n" + + " },\n" + + " \"version\" : 1,\n" + + " \"modified_date_in_millis\" : 1578521007076\n" + + " }") + .build(); + + IndexMetaData meta = mkMeta() + .putCustom(ILM_CUSTOM_METADATA_KEY, exState.asMap()) + .build(); + + Map actions = new HashMap<>(); + actions.put("rollover", new RolloverAction(null, null, 1L)); + actions.put("set_priority", new SetPriorityAction(100)); + Phase hotPhase = new Phase("hot", TimeValue.ZERO, actions); + Map phases = Collections.singletonMap("hot", hotPhase); + LifecyclePolicy newPolicy = new LifecyclePolicy("my-policy", phases); + + assertTrue(TransportPutLifecycleAction.isIndexPhaseDefinitionUpdatable(REGISTRY, client, meta, newPolicy)); + } + + // Failure case, can't update because the step we're currently on has been removed in the new policy + { + LifecycleExecutionState exState = LifecycleExecutionState.builder() + .setPhase("hot") + .setAction("rollover") + .setStep("check-rollover-ready") + .setPhaseDefinition("{\n" + + " \"policy\" : \"my-policy\",\n" + + " \"phase_definition\" : {\n" + + " \"min_age\" : \"20m\",\n" + + " \"actions\" : {\n" + + " \"rollover\" : {\n" + + " \"max_age\" : \"5s\"\n" + + " },\n" + + " \"set_priority\" : {\n" + + " \"priority\" : 150\n" + + " }\n" + + " }\n" + + " },\n" + + " \"version\" : 1,\n" + + " \"modified_date_in_millis\" : 1578521007076\n" + + " }") + .build(); + + IndexMetaData meta = mkMeta() + .putCustom(ILM_CUSTOM_METADATA_KEY, exState.asMap()) + .build(); + + Map actions = new HashMap<>(); + actions.put("set_priority", new SetPriorityAction(150)); + Phase hotPhase = new Phase("hot", TimeValue.ZERO, actions); + Map phases = Collections.singletonMap("hot", hotPhase); + LifecyclePolicy newPolicy = new LifecyclePolicy("my-policy", phases); + + assertFalse(TransportPutLifecycleAction.isIndexPhaseDefinitionUpdatable(REGISTRY, client, meta, newPolicy)); + } + + // Failure case, can't update because the future step has been deleted + { + LifecycleExecutionState exState = LifecycleExecutionState.builder() + .setPhase("hot") + .setAction("rollover") + .setStep("check-rollover-ready") + .setPhaseDefinition("{\n" + + " \"policy\" : \"my-policy\",\n" + + " \"phase_definition\" : {\n" + + " \"min_age\" : \"20m\",\n" + + " \"actions\" : {\n" + + " \"rollover\" : {\n" + + " \"max_age\" : \"5s\"\n" + + " },\n" + + " \"set_priority\" : {\n" + + " \"priority\" : 150\n" + + " }\n" + + " }\n" + + " },\n" + + " \"version\" : 1,\n" + + " \"modified_date_in_millis\" : 1578521007076\n" + + " }") + .build(); + + IndexMetaData meta = mkMeta() + .putCustom(ILM_CUSTOM_METADATA_KEY, exState.asMap()) + .build(); + + Map actions = new HashMap<>(); + actions.put("rollover", new RolloverAction(null, TimeValue.timeValueSeconds(5), null)); + Phase hotPhase = new Phase("hot", TimeValue.ZERO, actions); + Map phases = Collections.singletonMap("hot", hotPhase); + LifecyclePolicy newPolicy = new LifecyclePolicy("my-policy", phases); + + assertFalse(TransportPutLifecycleAction.isIndexPhaseDefinitionUpdatable(REGISTRY, client, meta, newPolicy)); + } + + // Failure case, index doesn't have enough info to check + { + LifecycleExecutionState exState = LifecycleExecutionState.builder() + .setPhaseDefinition("{\n" + + " \"policy\" : \"my-policy\",\n" + + " \"phase_definition\" : {\n" + + " \"min_age\" : \"20m\",\n" + + " \"actions\" : {\n" + + " \"rollover\" : {\n" + + " \"max_age\" : \"5s\"\n" + + " },\n" + + " \"set_priority\" : {\n" + + " \"priority\" : 150\n" + + " }\n" + + " }\n" + + " },\n" + + " \"version\" : 1,\n" + + " \"modified_date_in_millis\" : 1578521007076\n" + + " }") + .build(); + + IndexMetaData meta = mkMeta() + .putCustom(ILM_CUSTOM_METADATA_KEY, exState.asMap()) + .build(); + + Map actions = new HashMap<>(); + actions.put("rollover", new RolloverAction(null, null, 1L)); + actions.put("set_priority", new SetPriorityAction(100)); + Phase hotPhase = new Phase("hot", TimeValue.ZERO, actions); + Map phases = Collections.singletonMap("hot", hotPhase); + LifecyclePolicy newPolicy = new LifecyclePolicy("my-policy", phases); + + assertFalse(TransportPutLifecycleAction.isIndexPhaseDefinitionUpdatable(REGISTRY, client, meta, newPolicy)); + } + + // Failure case, the phase JSON is unparseable + { + LifecycleExecutionState exState = LifecycleExecutionState.builder() + .setPhase("hot") + .setAction("rollover") + .setStep("check-rollover-ready") + .setPhaseDefinition("potato") + .build(); + + IndexMetaData meta = mkMeta() + .putCustom(ILM_CUSTOM_METADATA_KEY, exState.asMap()) + .build(); + + Map actions = new HashMap<>(); + actions.put("rollover", new RolloverAction(null, null, 1L)); + actions.put("set_priority", new SetPriorityAction(100)); + Phase hotPhase = new Phase("hot", TimeValue.ZERO, actions); + Map phases = Collections.singletonMap("hot", hotPhase); + LifecyclePolicy newPolicy = new LifecyclePolicy("my-policy", phases); + + assertFalse(TransportPutLifecycleAction.isIndexPhaseDefinitionUpdatable(REGISTRY, client, meta, newPolicy)); + } + } + + public void testRefreshPhaseJson() { + LifecycleExecutionState exState = LifecycleExecutionState.builder() + .setPhase("hot") + .setAction("rollover") + .setStep("check-rollover-ready") + .setPhaseDefinition("{\n" + + " \"policy\" : \"my-policy\",\n" + + " \"phase_definition\" : {\n" + + " \"min_age\" : \"20m\",\n" + + " \"actions\" : {\n" + + " \"rollover\" : {\n" + + " \"max_age\" : \"5s\"\n" + + " },\n" + + " \"set_priority\" : {\n" + + " \"priority\" : 150\n" + + " }\n" + + " }\n" + + " },\n" + + " \"version\" : 1,\n" + + " \"modified_date_in_millis\" : 1578521007076\n" + + " }") + .build(); + + IndexMetaData meta = mkMeta() + .putCustom(ILM_CUSTOM_METADATA_KEY, exState.asMap()) + .build(); + + Map actions = new HashMap<>(); + actions.put("rollover", new RolloverAction(null, null, 1L)); + actions.put("set_priority", new SetPriorityAction(100)); + Phase hotPhase = new Phase("hot", TimeValue.ZERO, actions); + Map phases = Collections.singletonMap("hot", hotPhase); + LifecyclePolicy newPolicy = new LifecyclePolicy("my-policy", phases); + LifecyclePolicyMetadata policyMetadata = new LifecyclePolicyMetadata(newPolicy, Collections.emptyMap(), 2L, 2L); + + ClusterState existingState = ClusterState.builder(ClusterState.EMPTY_STATE) + .metaData(MetaData.builder(MetaData.EMPTY_META_DATA) + .put(meta, false) + .build()) + .build(); + + ClusterState changedState = TransportPutLifecycleAction.refreshPhaseDefinition(existingState, index, policyMetadata); + + IndexMetaData newIdxMeta = changedState.metaData().index(index); + LifecycleExecutionState afterExState = LifecycleExecutionState.fromIndexMetadata(newIdxMeta); + Map beforeState = new HashMap<>(exState.asMap()); + beforeState.remove("phase_definition"); + Map afterState = new HashMap<>(afterExState.asMap()); + afterState.remove("phase_definition"); + // Check that no other execution state changes have been made + assertThat(beforeState, equalTo(afterState)); + + // Check that the phase definition has been refreshed + assertThat(afterExState.getPhaseDefinition(), + equalTo("{\"policy\":\"my-policy\",\"phase_definition\":{\"min_age\":\"0ms\",\"actions\":{\"rollover\":{\"max_docs\":1}," + + "\"set_priority\":{\"priority\":100}}},\"version\":2,\"modified_date_in_millis\":2}")); + } + + public void testUpdateIndicesForPolicy() { + LifecycleExecutionState exState = LifecycleExecutionState.builder() + .setPhase("hot") + .setAction("rollover") + .setStep("check-rollover-ready") + .setPhaseDefinition("{\"policy\":\"my-policy\",\"phase_definition\":{\"min_age\":\"0ms\",\"actions\":{\"rollover\":" + + "{\"max_docs\":1},\"set_priority\":{\"priority\":100}}},\"version\":1,\"modified_date_in_millis\":1578521007076}") + .build(); + + IndexMetaData meta = mkMeta() + .putCustom(ILM_CUSTOM_METADATA_KEY, exState.asMap()) + .build(); + + assertTrue(TransportPutLifecycleAction.eligibleToCheckForRefresh(meta)); + + Map oldActions = new HashMap<>(); + oldActions.put("rollover", new RolloverAction(null, null, 1L)); + oldActions.put("set_priority", new SetPriorityAction(100)); + Phase oldHotPhase = new Phase("hot", TimeValue.ZERO, oldActions); + Map oldPhases = Collections.singletonMap("hot", oldHotPhase); + LifecyclePolicy oldPolicy = new LifecyclePolicy("my-policy", oldPhases); + + Map actions = new HashMap<>(); + actions.put("rollover", new RolloverAction(null, null, 1L)); + actions.put("set_priority", new SetPriorityAction(100)); + Phase hotPhase = new Phase("hot", TimeValue.ZERO, actions); + Map phases = Collections.singletonMap("hot", hotPhase); + LifecyclePolicy newPolicy = new LifecyclePolicy("my-policy", phases); + LifecyclePolicyMetadata policyMetadata = new LifecyclePolicyMetadata(newPolicy, Collections.emptyMap(), 2L, 2L); + + assertTrue(TransportPutLifecycleAction.isIndexPhaseDefinitionUpdatable(REGISTRY, client, meta, newPolicy)); + + ClusterState existingState = ClusterState.builder(ClusterState.EMPTY_STATE) + .metaData(MetaData.builder(MetaData.EMPTY_META_DATA) + .put(meta, false) + .build()) + .build(); + + logger.info("--> update for unchanged policy"); + ClusterState updatedState = TransportPutLifecycleAction.updateIndicesForPolicy(existingState, REGISTRY, + client, oldPolicy, policyMetadata); + + // No change, because the policies were identical + assertThat(updatedState, equalTo(existingState)); + + actions = new HashMap<>(); + actions.put("rollover", new RolloverAction(null, null, 2L)); + actions.put("set_priority", new SetPriorityAction(150)); + hotPhase = new Phase("hot", TimeValue.ZERO, actions); + phases = Collections.singletonMap("hot", hotPhase); + newPolicy = new LifecyclePolicy("my-policy", phases); + policyMetadata = new LifecyclePolicyMetadata(newPolicy, Collections.emptyMap(), 2L, 2L); + + logger.info("--> update with changed policy, but not configured in settings"); + updatedState = TransportPutLifecycleAction.updateIndicesForPolicy(existingState, REGISTRY, client, oldPolicy, policyMetadata); + + // No change, because the index doesn't have a lifecycle.name setting for this policy + assertThat(updatedState, equalTo(existingState)); + + meta = IndexMetaData.builder(index) + .settings(Settings.builder() + .put(LifecycleSettings.LIFECYCLE_NAME, "my-policy") + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, randomIntBetween(1, 10)) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, randomIntBetween(0, 5)) + .put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetaData.SETTING_INDEX_UUID, randomAlphaOfLength(5))) + .putCustom(ILM_CUSTOM_METADATA_KEY, exState.asMap()) + .build(); + existingState = ClusterState.builder(ClusterState.EMPTY_STATE) + .metaData(MetaData.builder(MetaData.EMPTY_META_DATA) + .put(meta, false) + .build()) + .build(); + + logger.info("--> update with changed policy and this index has the policy"); + updatedState = TransportPutLifecycleAction.updateIndicesForPolicy(existingState, REGISTRY, client, oldPolicy, policyMetadata); + + IndexMetaData newIdxMeta = updatedState.metaData().index(index); + LifecycleExecutionState afterExState = LifecycleExecutionState.fromIndexMetadata(newIdxMeta); + Map beforeState = new HashMap<>(exState.asMap()); + beforeState.remove("phase_definition"); + Map afterState = new HashMap<>(afterExState.asMap()); + afterState.remove("phase_definition"); + // Check that no other execution state changes have been made + assertThat(beforeState, equalTo(afterState)); + + // Check that the phase definition has been refreshed + assertThat(afterExState.getPhaseDefinition(), + equalTo("{\"policy\":\"my-policy\",\"phase_definition\":{\"min_age\":\"0ms\",\"actions\":{\"rollover\":{\"max_docs\":2}," + + "\"set_priority\":{\"priority\":150}}},\"version\":2,\"modified_date_in_millis\":2}")); + } + + private static IndexMetaData.Builder mkMeta() { + return IndexMetaData.builder(index) + .settings(Settings.builder() + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, randomIntBetween(1, 10)) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, randomIntBetween(0, 5)) + .put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetaData.SETTING_INDEX_UUID, randomAlphaOfLength(5))); + } +}