Skip to content

Commit

Permalink
Rebuild step on PolicyStepsRegistry.getStep (#33780)
Browse files Browse the repository at this point in the history
This moves away from caching a list of steps for a current phase, instead
rebuilding the necessary step from the phase JSON stored in the index's
metadata.

Relates to #29823
  • Loading branch information
dakrone committed Sep 18, 2018
1 parent 11a55d2 commit 27dd258
Show file tree
Hide file tree
Showing 8 changed files with 274 additions and 258 deletions.
Expand Up @@ -82,6 +82,55 @@ protected LifecyclePolicy createTestInstance() {
return randomTimeseriesLifecyclePolicy(lifecycleName);
}

/**
* The same as {@link #randomTimeseriesLifecyclePolicy(String)} but ensures
* that the resulting policy has all valid phases and all valid actions.
*/
public static LifecyclePolicy randomTimeseriesLifecyclePolicyWithAllPhases(@Nullable String lifecycleName) {
List<String> phaseNames = TimeseriesLifecycleType.VALID_PHASES;
Map<String, Phase> phases = new HashMap<>(phaseNames.size());
Function<String, Set<String>> validActions = (phase) -> {
switch (phase) {
case "hot":
return TimeseriesLifecycleType.VALID_HOT_ACTIONS;
case "warm":
return TimeseriesLifecycleType.VALID_WARM_ACTIONS;
case "cold":
return TimeseriesLifecycleType.VALID_COLD_ACTIONS;
case "delete":
return TimeseriesLifecycleType.VALID_DELETE_ACTIONS;
default:
throw new IllegalArgumentException("invalid phase [" + phase + "]");
}};
Function<String, LifecycleAction> randomAction = (action) -> {
switch (action) {
case AllocateAction.NAME:
return AllocateActionTests.randomInstance();
case DeleteAction.NAME:
return new DeleteAction();
case ForceMergeAction.NAME:
return ForceMergeActionTests.randomInstance();
case ReadOnlyAction.NAME:
return new ReadOnlyAction();
case RolloverAction.NAME:
return RolloverActionTests.randomInstance();
case ShrinkAction.NAME:
return ShrinkActionTests.randomInstance();
default:
throw new IllegalArgumentException("invalid action [" + action + "]");
}};
for (String phase : phaseNames) {
TimeValue after = TimeValue.parseTimeValue(randomTimeValue(0, 1000000000, "s", "m", "h", "d"), "test_after");
Map<String, LifecycleAction> actions = new HashMap<>();
Set<String> actionNames = validActions.apply(phase);
for (String action : actionNames) {
actions.put(action, randomAction.apply(action));
}
phases.put(phase, new Phase(phase, after, actions));
}
return new LifecyclePolicy(TimeseriesLifecycleType.INSTANCE, lifecycleName, phases);
}

public static LifecyclePolicy randomTimeseriesLifecyclePolicy(@Nullable String lifecycleName) {
List<String> phaseNames = randomSubsetOf(TimeseriesLifecycleType.VALID_PHASES);
Map<String, Phase> phases = new HashMap<>(phaseNames.size());
Expand Down
Expand Up @@ -70,7 +70,7 @@ public ClusterState execute(ClusterState currentState) throws IOException {
// This index doesn't exist any more, there's nothing to execute currently
return currentState;
}
Step registeredCurrentStep = IndexLifecycleRunner.getCurrentStep(policyStepsRegistry, policy, index,
Step registeredCurrentStep = IndexLifecycleRunner.getCurrentStep(policyStepsRegistry, policy, indexMetaData,
indexMetaData.getSettings());
if (currentStep.equals(registeredCurrentStep)) {
// We can do cluster state steps all together until we
Expand Down Expand Up @@ -118,7 +118,7 @@ public ClusterState execute(ClusterState currentState) throws IOException {
if (currentStep.getKey().getPhase().equals(currentStep.getNextStepKey().getPhase()) == false) {
return currentState;
}
currentStep = policyStepsRegistry.getStep(index, currentStep.getNextStepKey());
currentStep = policyStepsRegistry.getStep(indexMetaData, currentStep.getNextStepKey());
}
return currentState;
} else {
Expand Down
Expand Up @@ -87,7 +87,7 @@ public void runPolicy(String policy, IndexMetaData indexMetaData, ClusterState c
+ LifecycleSettings.LIFECYCLE_SKIP + "== true");
return;
}
Step currentStep = getCurrentStep(stepRegistry, policy, indexMetaData.getIndex(), indexSettings);
Step currentStep = getCurrentStep(stepRegistry, policy, indexMetaData, indexSettings);
if (currentStep == null) {
// This may happen in the case that there is invalid ilm-step index settings or the stepRegistry is out of
// sync with the current cluster state
Expand Down Expand Up @@ -197,12 +197,12 @@ public static StepKey getCurrentStepKey(Settings indexSettings) {
}
}

static Step getCurrentStep(PolicyStepsRegistry stepRegistry, String policy, Index index, Settings indexSettings) {
static Step getCurrentStep(PolicyStepsRegistry stepRegistry, String policy, IndexMetaData indexMetaData, Settings indexSettings) {
StepKey currentStepKey = getCurrentStepKey(indexSettings);
if (currentStepKey == null) {
return stepRegistry.getFirstStep(policy);
} else {
return stepRegistry.getStep(index, currentStepKey);
return stepRegistry.getStep(indexMetaData, currentStepKey);
}
}

Expand Down
Expand Up @@ -140,11 +140,6 @@ public void clusterChanged(ClusterChangedEvent event) {
public void applyClusterState(ClusterChangedEvent event) {
if (event.localNodeMaster()) { // only act if we are master, otherwise
// keep idle until elected
// Since indices keep their current phase's details even if the policy changes, it's possible for a deleted index to have a
// policy, and then be re-created with the same name, so here we remove indices that have been delete so they don't waste memory
if (event.indicesDeleted().isEmpty() == false) {
policyRegistry.removeIndices(event.indicesDeleted());
}
if (event.state().metaData().custom(IndexLifecycleMetadata.TYPE) != null) {
policyRegistry.update(event.state());
}
Expand Down
Expand Up @@ -5,9 +5,9 @@
*/
package org.elasticsearch.xpack.indexlifecycle;

import com.carrotsearch.hppc.cursors.ObjectCursor;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.Diff;
Expand All @@ -19,6 +19,7 @@
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.DeprecationHandler;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentParseException;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.index.Index;
Expand Down Expand Up @@ -53,26 +54,18 @@ public class PolicyStepsRegistry {
private final Map<String, Step> firstStepMap;
// keeps track of a mapping from policy/step-name to respective Step, the key is policy name
private final Map<String, Map<Step.StepKey, Step>> stepMap;
// A map of index to a list of compiled steps for the current phase
private final Map<Index, List<Step>> indexPhaseSteps;
private final NamedXContentRegistry xContentRegistry;

public PolicyStepsRegistry(NamedXContentRegistry xContentRegistry, Client client) {
this.lifecyclePolicyMap = new TreeMap<>();
this.firstStepMap = new HashMap<>();
this.stepMap = new HashMap<>();
this.indexPhaseSteps = new HashMap<>();
this.xContentRegistry = xContentRegistry;
this.client = client;
this(new TreeMap<>(), new HashMap<>(), new HashMap<>(), xContentRegistry, client);
}

PolicyStepsRegistry(SortedMap<String, LifecyclePolicyMetadata> lifecyclePolicyMap,
Map<String, Step> firstStepMap, Map<String, Map<Step.StepKey, Step>> stepMap,
Map<Index, List<Step>> indexPhaseSteps, NamedXContentRegistry xContentRegistry, Client client) {
NamedXContentRegistry xContentRegistry, Client client) {
this.lifecyclePolicyMap = lifecyclePolicyMap;
this.firstStepMap = firstStepMap;
this.stepMap = stepMap;
this.indexPhaseSteps = indexPhaseSteps;
this.xContentRegistry = xContentRegistry;
this.client = client;
}
Expand All @@ -89,17 +82,6 @@ Map<String, Map<Step.StepKey, Step>> getStepMap() {
return stepMap;
}

/**
* Remove phase step lists for indices that have been deleted
* @param indices a list of indices that have been deleted
*/
public void removeIndices(List<Index> indices) {
indices.forEach(index -> {
logger.trace("removing cached phase steps for deleted index [{}]", index.getName());
indexPhaseSteps.remove(index);
});
}

@SuppressWarnings({ "unchecked", "rawtypes" })
public void update(ClusterState clusterState) {
final IndexLifecycleMetadata meta = clusterState.metaData().custom(IndexLifecycleMetadata.TYPE);
Expand Down Expand Up @@ -146,103 +128,86 @@ public LifecyclePolicyMetadata read(StreamInput in, String key) {
assert ErrorStep.NAME.equals(step.getKey().getName()) == false : "unexpected error step in policy";
stepMapForPolicy.put(step.getKey(), step);
}
logger.trace("updating cached steps for [{}] policy, new steps: {}",
policyMetadata.getName(), stepMapForPolicy.keySet());
stepMap.put(policyMetadata.getName(), stepMapForPolicy);
}
}
}
}

for (ObjectCursor<IndexMetaData> imd : clusterState.metaData().getIndices().values()) {
final Index index = imd.value.getIndex();
final String policy = imd.value.getSettings().get(LifecycleSettings.LIFECYCLE_NAME);
if (policy == null || lifecyclePolicyMap.containsKey(policy) == false) {
indexPhaseSteps.remove(index);
} else {
final List<Step> currentSteps = indexPhaseSteps.get(index);
// Get the current steps' phase, if there are steps stored
final String existingPhase = (currentSteps == null || currentSteps.size() == 0) ?
"_none_" : currentSteps.get(0).getKey().getPhase();
// Retrieve the current phase, defaulting to "new" if no phase is set
final String currentPhase = imd.value.getSettings().get(LifecycleSettings.LIFECYCLE_PHASE,
InitializePolicyContextStep.INITIALIZATION_PHASE);

if (existingPhase.equals(currentPhase) == false) {
logger.debug("index [{}] has transitioned phases [{} -> {}], rebuilding step list",
index, existingPhase, currentPhase);
// parse existing phase steps from the phase definition in the index settings
String phaseDef = imd.value.getSettings().get(LifecycleSettings.LIFECYCLE_PHASE_DEFINITION,
InitializePolicyContextStep.INITIALIZATION_PHASE);
final PhaseExecutionInfo phaseExecutionInfo;
LifecyclePolicy currentPolicy = lifecyclePolicyMap.get(policy).getPolicy();
final LifecyclePolicy policyToExecute;
if (InitializePolicyContextStep.INITIALIZATION_PHASE.equals(phaseDef)
|| TerminalPolicyStep.COMPLETED_PHASE.equals(phaseDef)) {
// It is ok to re-use potentially modified policy here since we are in an initialization or completed phase
policyToExecute = currentPolicy;
} else {
// if the current phase definition describes an internal step/phase, do not parse
try (XContentParser parser = JsonXContent.jsonXContent.createParser(xContentRegistry,
DeprecationHandler.THROW_UNSUPPORTED_OPERATION, phaseDef)) {
phaseExecutionInfo = PhaseExecutionInfo.parse(parser, currentPhase);
} catch (IOException e) {
logger.error("failed to configure phase [" + currentPhase + "] for index [" + index.getName() + "]", e);
indexPhaseSteps.remove(index);
continue;
}
Map<String, Phase> phaseMap = new HashMap<>(currentPolicy.getPhases());
if (phaseExecutionInfo.getPhase() != null) {
phaseMap.put(currentPhase, phaseExecutionInfo.getPhase());
}
policyToExecute = new LifecyclePolicy(currentPolicy.getType(), currentPolicy.getName(), phaseMap);
}
LifecyclePolicySecurityClient policyClient = new LifecyclePolicySecurityClient(client,
ClientHelper.INDEX_LIFECYCLE_ORIGIN, lifecyclePolicyMap.get(policy).getHeaders());
final List<Step> steps = policyToExecute.toSteps(policyClient);
// Build a list of steps that correspond with the phase the index is currently in
final List<Step> phaseSteps;
if (steps == null) {
phaseSteps = new ArrayList<>();
} else {
phaseSteps = steps.stream()
.filter(e -> e.getKey().getPhase().equals(currentPhase))
.collect(Collectors.toList());
}
indexPhaseSteps.put(index, phaseSteps);
}
private List<Step> parseStepsFromPhase(String policy, String currentPhase, String phaseDef) throws IOException {
final PhaseExecutionInfo phaseExecutionInfo;
LifecyclePolicy currentPolicy = lifecyclePolicyMap.get(policy).getPolicy();
final LifecyclePolicy policyToExecute;
if (InitializePolicyContextStep.INITIALIZATION_PHASE.equals(phaseDef)
|| TerminalPolicyStep.COMPLETED_PHASE.equals(phaseDef)) {
// It is ok to re-use potentially modified policy here since we are in an initialization or completed phase
policyToExecute = currentPolicy;
} else {
// if the current phase definition describes an internal step/phase, do not parse
try (XContentParser parser = JsonXContent.jsonXContent.createParser(xContentRegistry,
DeprecationHandler.THROW_UNSUPPORTED_OPERATION, phaseDef)) {
phaseExecutionInfo = PhaseExecutionInfo.parse(parser, currentPhase);
}
Map<String, Phase> phaseMap = new HashMap<>(currentPolicy.getPhases());
if (phaseExecutionInfo.getPhase() != null) {
phaseMap.put(currentPhase, phaseExecutionInfo.getPhase());
}
policyToExecute = new LifecyclePolicy(currentPolicy.getType(), currentPolicy.getName(), phaseMap);
}
LifecyclePolicySecurityClient policyClient = new LifecyclePolicySecurityClient(client,
ClientHelper.INDEX_LIFECYCLE_ORIGIN, lifecyclePolicyMap.get(policy).getHeaders());
final List<Step> steps = policyToExecute.toSteps(policyClient);
// Build a list of steps that correspond with the phase the index is currently in
final List<Step> phaseSteps;
if (steps == null) {
phaseSteps = new ArrayList<>();
} else {
phaseSteps = steps.stream()
.filter(e -> e.getKey().getPhase().equals(currentPhase))
.collect(Collectors.toList());
}
logger.trace("parsed steps for policy [{}] in phase [{}], definition: [{}], steps: [{}]",
policy, currentPhase, phaseDef, phaseSteps);
return phaseSteps;
}

/**
* returns the {@link Step} that matches the index name and
* stepkey specified. This is used by {@link ClusterState}
* readers that know the current policy and step by name
* as String values in the cluster state.
* @param index the index to get the step for
* @param stepKey the key to the requested {@link Step}
* @return the step for the given stepkey or null if the step was not found
*/
@Nullable
public Step getStep(final Index index, final Step.StepKey stepKey) {
public Step getStep(final IndexMetaData indexMetaData, final Step.StepKey stepKey) {
if (ErrorStep.NAME.equals(stepKey.getName())) {
return new ErrorStep(new Step.StepKey(stepKey.getPhase(), stepKey.getAction(), ErrorStep.NAME));
}

if (indexPhaseSteps.get(index) == null) {
return null;
final String phase = stepKey.getPhase();
final String policyName = indexMetaData.getSettings().get(LifecycleSettings.LIFECYCLE_NAME);
final Index index = indexMetaData.getIndex();

if (policyName == null) {
throw new IllegalArgumentException("failed to retrieve step " + stepKey + " as index [" + index.getName() + "] has no policy");
}

if (logger.isTraceEnabled()) {
logger.trace("[{}]: retrieving step [{}], found: [{}]\nall steps for this phase: [{}]", index, stepKey,
indexPhaseSteps.get(index).stream().filter(step -> step.getKey().equals(stepKey)).findFirst().orElse(null),
indexPhaseSteps.get(index));
} else if (logger.isDebugEnabled()) {
logger.debug("[{}]: retrieving step [{}], found: [{}]", index, stepKey,
indexPhaseSteps.get(index).stream().filter(step -> step.getKey().equals(stepKey)).findFirst().orElse(null));

// parse phase steps from the phase definition in the index settings
final String phaseJson = indexMetaData.getSettings().get(LifecycleSettings.LIFECYCLE_PHASE_DEFINITION,
InitializePolicyContextStep.INITIALIZATION_PHASE);

final List<Step> phaseSteps;
try {
phaseSteps = parseStepsFromPhase(policyName, phase, phaseJson);
} catch (IOException e) {
throw new ElasticsearchException("failed to load cached steps for " + stepKey, e);
} catch (XContentParseException parseErr) {
throw new XContentParseException(parseErr.getLocation(),
"failed to load cached steps for " + stepKey + " from [" + phaseJson + "]", parseErr);
}
assert indexPhaseSteps.get(index).stream().allMatch(step -> step.getKey().getPhase().equals(stepKey.getPhase())) :
"expected all steps for [" + index + "] to be in phase [" + stepKey.getPhase() +
"] but they were not, steps: " + indexPhaseSteps.get(index);
return indexPhaseSteps.get(index).stream().filter(step -> step.getKey().equals(stepKey)).findFirst().orElse(null);

assert phaseSteps.stream().allMatch(step -> step.getKey().getPhase().equals(phase)) :
"expected phase steps loaded from phase definition for [" + index.getName() + "] to be in phase [" + phase +
"] but they were not, steps: " + phaseSteps;

// Return the step that matches the given stepKey or else null if we couldn't find it
return phaseSteps.stream().filter(step -> step.getKey().equals(stepKey)).findFirst().orElse(null);
}

/**
Expand Down

0 comments on commit 27dd258

Please sign in to comment.