Skip to content

Commit

Permalink
Service to migrate indices and ILM policies to data tiers (#73689) (#…
Browse files Browse the repository at this point in the history
…74287)

This adds a service that migrates the indices and ILM policies away from
custom node attribute allocation routing to data tiers. Optionally, it also
deletes one legacy index template.

(cherry picked from commit 6285fac)
Signed-off-by: Andrei Dan <andrei.dan@elastic.co>
  • Loading branch information
andreidan committed Jun 18, 2021
1 parent 04bca48 commit 3ec3182
Show file tree
Hide file tree
Showing 11 changed files with 1,965 additions and 350 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@

import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;

/**
* The {@code DataTier} class encapsulates the formalization of the "content",
Expand All @@ -40,6 +42,10 @@ public class DataTier {
public static final Set<String> ALL_DATA_TIERS =
new HashSet<>(Arrays.asList(DATA_CONTENT, DATA_HOT, DATA_WARM, DATA_COLD, DATA_FROZEN));

// Represents an ordered list of data tiers from frozen to hot (or slow to fast)
private static final List<String> ORDERED_FROZEN_TO_HOT_TIERS =
org.elasticsearch.core.List.of(DataTier.DATA_FROZEN, DataTier.DATA_COLD, DataTier.DATA_WARM, DataTier.DATA_HOT);

/**
* Returns true if the given tier name is a valid tier
*/
Expand All @@ -51,6 +57,19 @@ public static boolean validTierName(String tierName) {
DATA_FROZEN.equals(tierName);
}

/**
* Based on the provided target tier it will return a comma separated list of preferred tiers.
* ie. if `data_cold` is the target tier, it will return `data_cold,data_warm,data_hot`
* This is usually used in conjunction with {@link DataTierAllocationDecider#INDEX_ROUTING_PREFER_SETTING}
*/
public static String getPreferredTiersConfiguration(String targetTier) {
int indexOfTargetTier = ORDERED_FROZEN_TO_HOT_TIERS.indexOf(targetTier);
if (indexOfTargetTier == -1) {
throw new IllegalArgumentException("invalid data tier [" + targetTier + "]");
}
return ORDERED_FROZEN_TO_HOT_TIERS.stream().skip(indexOfTargetTier).collect(Collectors.joining(","));
}

/**
* Returns true iff the given settings have a data tier setting configured
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@
import java.io.IOException;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;

import static org.elasticsearch.xpack.core.DataTier.getPreferredTiersConfiguration;

/**
* A {@link LifecycleAction} which enables or disables the automatic migration of data between
Expand All @@ -37,9 +38,6 @@ public class MigrateAction implements LifecycleAction {

private static final Logger logger = LogManager.getLogger(MigrateAction.class);
static final String CONDITIONAL_SKIP_MIGRATE_STEP = BranchingStep.NAME + "-check-skip-action";
// Represents an ordered list of data tiers from frozen to hot (or slow to fast)
private static final List<String> FROZEN_TO_HOT_TIERS =
org.elasticsearch.core.List.of(DataTier.DATA_FROZEN, DataTier.DATA_COLD, DataTier.DATA_WARM, DataTier.DATA_HOT);

private static final ConstructingObjectParser<MigrateAction, Void> PARSER = new ConstructingObjectParser<>(NAME,
a -> new MigrateAction(a[0] == null ? true : (boolean) a[0]));
Expand Down Expand Up @@ -128,19 +126,6 @@ public List<Step> toSteps(Client client, String phase, StepKey nextStepKey) {
}
}

/**
* Based on the provided target tier it will return a comma separated list of preferred tiers.
* ie. if `data_cold` is the target tier, it will return `data_cold,data_warm,data_hot`
* This is usually used in conjunction with {@link DataTierAllocationDecider#INDEX_ROUTING_PREFER_SETTING}
*/
static String getPreferredTiersConfiguration(String targetTier) {
int indexOfTargetTier = FROZEN_TO_HOT_TIERS.indexOf(targetTier);
if (indexOfTargetTier == -1) {
throw new IllegalArgumentException("invalid data tier [" + targetTier + "]");
}
return FROZEN_TO_HOT_TIERS.stream().skip(indexOfTargetTier).collect(Collectors.joining(","));
}

@Override
public int hashCode() {
return Objects.hash(enabled);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,245 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.core.ilm;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.xcontent.DeprecationHandler;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.core.Nullable;

import java.util.ArrayList;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Set;
import java.util.Spliterators;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

import static org.elasticsearch.xpack.core.ilm.LifecycleExecutionState.ILM_CUSTOM_METADATA_KEY;

/**
* We cache the currently executing ILM phase in the index metadata so the ILM execution for managed indices is not irrecoverably
* interrupted by a concurrent update policy that, say, would remove the current execution phase altogether.
* <p>
* This contains class contains a series of methods that help manage the cached ILM phase.
*/
public final class PhaseCacheManagement {

private static final Logger logger = LogManager.getLogger(PhaseCacheManagement.class);

private PhaseCacheManagement() {
}

/**
* Rereads the phase JSON for the given index, returning a new cluster state.
*/
public static ClusterState refreshPhaseDefinition(final ClusterState state, final String index,
final LifecyclePolicyMetadata updatedPolicy) {
final IndexMetadata idxMeta = state.metadata().index(index);
Metadata.Builder metadataBuilder = Metadata.builder(state.metadata());
refreshPhaseDefinition(metadataBuilder, idxMeta, updatedPolicy);
return ClusterState.builder(state).metadata(metadataBuilder).build();
}

/**
* Rereads the phase JSON for the given index, and updates the provided metadata.
*/
public static void refreshPhaseDefinition(final Metadata.Builder metadataBuilder, final IndexMetadata idxMeta,
final LifecyclePolicyMetadata updatedPolicy) {
String index = idxMeta.getIndex().getName();
assert eligibleToCheckForRefresh(idxMeta) : "index " + index + " is missing crucial information needed to refresh phase definition";

logger.trace("[{}] updating cached phase definition for policy [{}]", index, updatedPolicy.getName());
LifecycleExecutionState currentExState = LifecycleExecutionState.fromIndexMetadata(idxMeta);

String currentPhase = currentExState.getPhase();
PhaseExecutionInfo pei = new PhaseExecutionInfo(updatedPolicy.getName(),
updatedPolicy.getPolicy().getPhases().get(currentPhase), updatedPolicy.getVersion(), updatedPolicy.getModifiedDate());

LifecycleExecutionState newExState = LifecycleExecutionState.builder(currentExState)
.setPhaseDefinition(Strings.toString(pei, false, false))
.build();

metadataBuilder.put(IndexMetadata.builder(idxMeta)
.putCustom(ILM_CUSTOM_METADATA_KEY, newExState.asMap()));
}


/**
* Ensure that we have the minimum amount of metadata necessary to check for cache phase
* refresh. This includes:
* - An execution state
* - Existing phase definition JSON
* - A current step key
* - A current phase in the step key
* - Not currently in the ERROR step
*/
public static boolean eligibleToCheckForRefresh(final IndexMetadata metadata) {
LifecycleExecutionState executionState = LifecycleExecutionState.fromIndexMetadata(metadata);
if (executionState == null || executionState.getPhaseDefinition() == null) {
return false;
}

Step.StepKey currentStepKey = LifecycleExecutionState.getCurrentStepKey(executionState);
if (currentStepKey == null || currentStepKey.getPhase() == null) {
return false;
}

return ErrorStep.NAME.equals(currentStepKey.getName()) == false;
}

/**
* For the given new policy, returns a new cluster with all updateable indices' phase JSON refreshed.
*/
public static ClusterState updateIndicesForPolicy(final ClusterState state, final NamedXContentRegistry xContentRegistry,
final Client client, final LifecyclePolicy oldPolicy,
final LifecyclePolicyMetadata newPolicy) {
Metadata.Builder mb = Metadata.builder(state.metadata());
if (updateIndicesForPolicy(mb, state, xContentRegistry, client, oldPolicy, newPolicy)) {
return ClusterState.builder(state).metadata(mb).build();
}
return state;
}

/**
* For the given new policy, update the provided metadata to reflect the refreshed phase JSON for all updateable indices.
* Returns true if any indices were updated and false otherwise.
* Users of this API should consider the returned value and only create a new {@link ClusterState} if `true` is returned.
*/
public static boolean updateIndicesForPolicy(final Metadata.Builder mb, final ClusterState currentState,
final NamedXContentRegistry xContentRegistry, final Client client,
final LifecyclePolicy oldPolicy, final LifecyclePolicyMetadata newPolicy) {
assert oldPolicy.getName().equals(newPolicy.getName()) : "expected both policies to have the same id but they were: [" +
oldPolicy.getName() + "] vs. [" + newPolicy.getName() + "]";

// No need to update anything if the policies are identical in contents
if (oldPolicy.equals(newPolicy.getPolicy())) {
logger.debug("policy [{}] is unchanged and no phase definition refresh is needed", oldPolicy.getName());
return false;
}

final List<IndexMetadata> indicesThatCanBeUpdated =
StreamSupport.stream(Spliterators.spliteratorUnknownSize(currentState.metadata().indices().valuesIt(), 0), false)
.filter(meta -> newPolicy.getName().equals(LifecycleSettings.LIFECYCLE_NAME_SETTING.get(meta.getSettings())))
.filter(meta -> isIndexPhaseDefinitionUpdatable(xContentRegistry, client, meta, newPolicy.getPolicy()))
.collect(Collectors.toList());

final List<String> refreshedIndices = new ArrayList<>(indicesThatCanBeUpdated.size());
for (IndexMetadata index : indicesThatCanBeUpdated) {
try {
refreshPhaseDefinition(mb, index, newPolicy);
refreshedIndices.add(index.getIndex().getName());
} catch (Exception e) {
logger.warn(new ParameterizedMessage("[{}] unable to refresh phase definition for updated policy [{}]",
index, newPolicy.getName()), e);
}
}
logger.debug("refreshed policy [{}] phase definition for [{}] indices", newPolicy.getName(), refreshedIndices.size());
return refreshedIndices.size() > 0;
}

/**
* Returns 'true' if the index's cached phase JSON can be safely reread, 'false' otherwise.
*/
public static boolean isIndexPhaseDefinitionUpdatable(final NamedXContentRegistry xContentRegistry, final Client client,
final IndexMetadata metadata, final LifecyclePolicy newPolicy) {
final String index = metadata.getIndex().getName();
if (eligibleToCheckForRefresh(metadata) == false) {
logger.debug("[{}] does not contain enough information to check for eligibility of refreshing phase", index);
return false;
}
final String policyId = newPolicy.getName();

final LifecycleExecutionState executionState = LifecycleExecutionState.fromIndexMetadata(metadata);
final Step.StepKey currentStepKey = LifecycleExecutionState.getCurrentStepKey(executionState);
final String currentPhase = currentStepKey.getPhase();

final Set<Step.StepKey> newStepKeys = newPolicy.toSteps(client).stream()
.map(Step::getKey)
.collect(Collectors.toCollection(LinkedHashSet::new));

if (newStepKeys.contains(currentStepKey) == false) {
// The index is on a step that doesn't exist in the new policy, we
// can't safely re-read the JSON
logger.debug("[{}] updated policy [{}] does not contain the current step key [{}], so the policy phase will not be refreshed",
index, policyId, currentStepKey);
return false;
}

final String phaseDef = executionState.getPhaseDefinition();
final Set<Step.StepKey> oldStepKeys = readStepKeys(xContentRegistry, client, phaseDef, currentPhase);
if (oldStepKeys == null) {
logger.debug("[{}] unable to parse phase definition for cached policy [{}], policy phase will not be refreshed",
index, policyId);
return false;
}

final Set<Step.StepKey> oldPhaseStepKeys = oldStepKeys.stream()
.filter(sk -> currentPhase.equals(sk.getPhase()))
.collect(Collectors.toCollection(LinkedHashSet::new));

final PhaseExecutionInfo phaseExecutionInfo = new PhaseExecutionInfo(policyId, newPolicy.getPhases().get(currentPhase), 1L, 1L);
final String peiJson = Strings.toString(phaseExecutionInfo);

final Set<Step.StepKey> newPhaseStepKeys = readStepKeys(xContentRegistry, client, peiJson, currentPhase);
if (newPhaseStepKeys == null) {
logger.debug(new ParameterizedMessage("[{}] unable to parse phase definition for policy [{}] " +
"to determine if it could be refreshed", index, policyId));
return false;
}

if (newPhaseStepKeys.equals(oldPhaseStepKeys)) {
// The new and old phase have the same stepkeys for this current phase, so we can
// refresh the definition because we know it won't change the execution flow.
logger.debug("[{}] updated policy [{}] contains the same phase step keys and can be refreshed", index, policyId);
return true;
} else {
logger.debug("[{}] updated policy [{}] has different phase step keys and will NOT refresh phase " +
"definition as it differs too greatly. old: {}, new: {}",
index, policyId, oldPhaseStepKeys, newPhaseStepKeys);
return false;
}
}

/**
* Parse the {@code phaseDef} phase definition to get the stepkeys for the given phase.
* If there is an error parsing or if the phase definition is missing the required
* information, returns null.
*/
@Nullable
static Set<Step.StepKey> readStepKeys(final NamedXContentRegistry xContentRegistry, final Client client,
final String phaseDef, final String currentPhase) {
final PhaseExecutionInfo phaseExecutionInfo;
try (XContentParser parser = JsonXContent.jsonXContent.createParser(xContentRegistry,
DeprecationHandler.THROW_UNSUPPORTED_OPERATION, phaseDef)) {
phaseExecutionInfo = PhaseExecutionInfo.parse(parser, currentPhase);
} catch (Exception e) {
logger.trace(new ParameterizedMessage("exception reading step keys checking for refreshability, phase definition: {}",
phaseDef), e);
return null;
}

if (phaseExecutionInfo == null || phaseExecutionInfo.getPhase() == null) {
return null;
}

return phaseExecutionInfo.getPhase().getActions().values().stream()
.flatMap(a -> a.toSteps(client, phaseExecutionInfo.getPhase().getName(), null).stream())
.map(Step::getKey)
.collect(Collectors.toCollection(LinkedHashSet::new));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.StreamSupport;

import static org.elasticsearch.xpack.core.DataTier.DATA_COLD;
import static org.elasticsearch.xpack.core.DataTier.DATA_HOT;
import static org.elasticsearch.xpack.core.DataTier.DATA_WARM;
import static org.elasticsearch.xpack.core.DataTier.getPreferredTiersConfiguration;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.Matchers.arrayContainingInAnyOrder;
import static org.hamcrest.Matchers.hasItem;
import static org.hamcrest.Matchers.not;
Expand Down Expand Up @@ -131,6 +136,14 @@ public void testDisablingLegacyDataRoleDisablesTieredDataRoles() {
assertSettingDeprecationsAndWarnings(new Setting<?>[]{DiscoveryNodeRole.DATA_ROLE.legacySetting()});
}

public void testGetPreferredTiersConfiguration() {
assertThat(getPreferredTiersConfiguration(DATA_HOT), is(DATA_HOT));
assertThat(getPreferredTiersConfiguration(DATA_WARM), is(DATA_WARM + "," + DATA_HOT));
assertThat(getPreferredTiersConfiguration(DATA_COLD), is(DATA_COLD + "," + DATA_WARM + "," + DATA_HOT));
IllegalArgumentException exception = expectThrows(IllegalArgumentException.class, () -> getPreferredTiersConfiguration("no_tier"));
assertThat(exception.getMessage(), is("invalid data tier [no_tier]"));
}

private static DiscoveryNodes buildDiscoveryNodes() {
int numNodes = randomIntBetween(3, 15);
DiscoveryNodes.Builder discoBuilder = DiscoveryNodes.builder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import static org.elasticsearch.xpack.core.DataTier.DATA_COLD;
import static org.elasticsearch.xpack.core.DataTier.DATA_HOT;
import static org.elasticsearch.xpack.core.DataTier.DATA_WARM;
import static org.elasticsearch.xpack.core.ilm.MigrateAction.getPreferredTiersConfiguration;
import static org.elasticsearch.xpack.core.ilm.TimeseriesLifecycleType.COLD_PHASE;
import static org.elasticsearch.xpack.core.ilm.TimeseriesLifecycleType.DELETE_PHASE;
import static org.elasticsearch.xpack.core.ilm.TimeseriesLifecycleType.HOT_PHASE;
Expand Down Expand Up @@ -83,14 +82,6 @@ public void testToSteps() {
}
}

public void testGetPreferredTiersConfiguration() {
assertThat(getPreferredTiersConfiguration(DATA_HOT), is(DATA_HOT));
assertThat(getPreferredTiersConfiguration(DATA_WARM), is(DATA_WARM + "," + DATA_HOT));
assertThat(getPreferredTiersConfiguration(DATA_COLD), is(DATA_COLD + "," + DATA_WARM + "," + DATA_HOT));
IllegalArgumentException exception = expectThrows(IllegalArgumentException.class, () -> getPreferredTiersConfiguration("no_tier"));
assertThat(exception.getMessage(), is("invalid data tier [no_tier]"));
}

public void testMigrateActionsConfiguresTierPreference() {
StepKey nextStepKey = new StepKey(randomAlphaOfLengthBetween(1, 10), randomAlphaOfLengthBetween(1, 10),
randomAlphaOfLengthBetween(1, 10));
Expand Down

0 comments on commit 3ec3182

Please sign in to comment.