Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ private enum OpType {
* exist. If no nodes for any of the tiers are available, returns an empty
* {@code Optional<String>}.
*/
static Optional<String> preferredAvailableTier(String prioritizedTiers, DiscoveryNodes nodes) {
public static Optional<String> preferredAvailableTier(String prioritizedTiers, DiscoveryNodes nodes) {
String[] tiers = Strings.tokenizeToStringArray(prioritizedTiers, ",");
return Arrays.stream(tiers).filter(tier -> tierNodesPresent(tier, nodes)).findFirst();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.ClusterSettings;
Expand All @@ -24,9 +22,9 @@
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Optional;
import java.util.Set;

import static org.elasticsearch.cluster.node.DiscoveryNodeRole.DATA_ROLE;
import static org.elasticsearch.xpack.cluster.routing.allocation.DataTierAllocationDecider.INDEX_ROUTING_PREFER_SETTING;
import static org.elasticsearch.xpack.core.ilm.AllocationRoutedStep.getPendingAllocations;
import static org.elasticsearch.xpack.core.ilm.step.info.AllocationInfo.waitingForActiveShardsAllocationInfo;
Expand Down Expand Up @@ -73,44 +71,49 @@ public Result isConditionMet(Index index, ClusterState clusterState) {
logger.debug("[{}] lifecycle action for index [{}] executed but index no longer exists", getKey().getAction(), index.getName());
return new Result(false, null);
}
String destinationTier = INDEX_ROUTING_PREFER_SETTING.get(idxMeta.getSettings());
String preferredTierConfiguration = INDEX_ROUTING_PREFER_SETTING.get(idxMeta.getSettings());
Optional<String> availableDestinationTier = DataTierAllocationDecider.preferredAvailableTier(preferredTierConfiguration,
clusterState.getNodes());

if (ActiveShardCount.ALL.enoughShardsActive(clusterState, index.getName()) == false) {
if (Strings.isEmpty(destinationTier)) {
if (Strings.isEmpty(preferredTierConfiguration)) {
logger.debug("[{}] lifecycle action for index [{}] cannot make progress because not all shards are active",
getKey().getAction(), index.getName());
} else {
logger.debug("[{}] migration of index [{}] to the [{}] tier cannot progress, as not all shards are active",
getKey().getAction(), index.getName(), destinationTier);
if (availableDestinationTier.isPresent()) {
logger.debug("[{}] migration of index [{}] to the [{}] tier preference cannot progress, as not all shards are active",
getKey().getAction(), index.getName(), preferredTierConfiguration);
} else {
logger.debug("[{}] migration of index [{}] to the next tier cannot progress as there is no available tier for the " +
"configured preferred tiers [{}] and not all shards are active", getKey().getAction(), index.getName(),
preferredTierConfiguration);
}
}
return new Result(false, waitingForActiveShardsAllocationInfo(idxMeta.getNumberOfReplicas()));
}

if (Strings.isEmpty(destinationTier)) {
logger.debug("index [{}] has no data tier routing setting configured and all its shards are active. considering the [{}] " +
"step condition met and continuing to the next step", index.getName(), getKey().getName());
if (Strings.isEmpty(preferredTierConfiguration)) {
logger.debug("index [{}] has no data tier routing preference setting configured and all its shards are active. considering " +
"the [{}] step condition met and continuing to the next step", index.getName(), getKey().getName());
// the user removed the tier routing setting and all the shards are active so we'll cary on
return new Result(true, null);
}

int allocationPendingAllShards = getPendingAllocations(index, ALLOCATION_DECIDERS, clusterState);

if (allocationPendingAllShards > 0) {
boolean targetTierNodeFound = false;
for (DiscoveryNode node : clusterState.nodes()) {
for (DiscoveryNodeRole role : node.getRoles()) {
if (role.roleName().equals(DATA_ROLE.roleName()) || role.roleName().equals(destinationTier)) {
targetTierNodeFound = true;
break;
}
}
}
String statusMessage = String.format(Locale.ROOT, "%s lifecycle action [%s] waiting for [%s] shards to be moved to the [%s] " +
"tier" + (targetTierNodeFound ? "" : " but there are currently no [%s] nodes in the cluster"),
index, getKey().getAction(), allocationPendingAllShards, destinationTier, destinationTier);
String statusMessage = availableDestinationTier.map(
s -> String.format(Locale.ROOT, "[%s] lifecycle action [%s] waiting for [%s] shards to be moved to the [%s] tier (tier " +
"migration preference configuration is [%s])", index.getName(), getKey().getAction(), allocationPendingAllShards, s,
preferredTierConfiguration)
).orElseGet(
() -> String.format(Locale.ROOT, "index [%s] has a preference for tiers [%s], but no nodes for any of those tiers are " +
"available in the cluster", index.getName(), preferredTierConfiguration));
logger.debug(statusMessage);
return new Result(false, new AllocationInfo(idxMeta.getNumberOfReplicas(), allocationPendingAllShards, true, statusMessage));
} else {
logger.debug("[{}] migration of index [{}] to tier [{}] complete", getKey().getAction(), index, destinationTier);
logger.debug("[{}] migration of index [{}] to tier [{}] (preference [{}]) complete",
getKey().getAction(), index, availableDestinationTier, preferredTierConfiguration);
return new Result(true, null);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;

/**
* A {@link LifecycleAction} which enables or disables the automatic migration of data between
Expand All @@ -31,6 +32,9 @@ public class MigrateAction implements LifecycleAction {
public static final String NAME = "migrate";
public static final ParseField ENABLED_FIELD = new ParseField("enabled");

// Represents an ordered list of data tiers from cold to hot (or slow to fast)
private static final List<String> COLD_TO_HOT_TIERS = List.of(DataTier.DATA_COLD, DataTier.DATA_WARM, DataTier.DATA_HOT);

private static final ConstructingObjectParser<MigrateAction, Void> PARSER = new ConstructingObjectParser<>(NAME,
a -> new MigrateAction(a[0] == null ? true : (boolean) a[0]));

Expand Down Expand Up @@ -92,7 +96,7 @@ public List<Step> toSteps(Client client, String phase, StepKey nextStepKey) {
Settings.Builder migrationSettings = Settings.builder();
String dataTierName = "data_" + phase;
assert DataTier.validTierName(dataTierName) : "invalid data tier name:" + dataTierName;
migrationSettings.put(DataTierAllocationDecider.INDEX_ROUTING_PREFER, dataTierName);
migrationSettings.put(DataTierAllocationDecider.INDEX_ROUTING_PREFER, getPreferredTiersConfiguration(dataTierName));
UpdateSettingsStep updateMigrationSettingStep = new UpdateSettingsStep(migrationKey, migrationRoutedKey, client,
migrationSettings.build());
DataTierMigrationRoutedStep migrationRoutedStep = new DataTierMigrationRoutedStep(migrationRoutedKey, nextStepKey);
Expand All @@ -102,6 +106,19 @@ public List<Step> toSteps(Client client, String phase, StepKey nextStepKey) {
}
}

/**
* Based on the provided target tier it will return a comma separated list of preferred tiers.
* ie. if `data_cold` is the target tier, it will return `data_cold,data_warm,data_hot`
* This is usually used in conjunction with {@link DataTierAllocationDecider#INDEX_ROUTING_PREFER_SETTING}
*/
static String getPreferredTiersConfiguration(String targetTier) {
int indexOfTargetTier = COLD_TO_HOT_TIERS.indexOf(targetTier);
if (indexOfTargetTier == -1) {
throw new IllegalArgumentException("invalid data tier [" + targetTier + "]");
}
return COLD_TO_HOT_TIERS.stream().skip(indexOfTargetTier).collect(Collectors.joining(","));
}

@Override
public int hashCode() {
return Objects.hash(enabled);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ public void testExecuteWithPendingShards() {
DataTierMigrationRoutedStep step = createRandomInstance();
Result expectedResult = new Result(false, new AllocationInfo(0, 1, true,
"[" + index.getName() + "] lifecycle action [" + step.getKey().getAction() + "] waiting for " +
"[1] shards to be moved to the [data_warm] tier")
"[1] shards to be moved to the [data_warm] tier (tier migration preference configuration is [data_warm])")
);

Result actualResult = step.isConditionMet(index, clusterState);
Expand All @@ -137,9 +137,8 @@ public void testExecuteWithPendingShardsAndTargetRoleNotPresentInCluster() {
.build();
DataTierMigrationRoutedStep step = createRandomInstance();
Result expectedResult = new Result(false, new AllocationInfo(0, 1, true,
"[" + index.getName() + "] lifecycle action [" + step.getKey().getAction() + "] waiting for " +
"[1] shards to be moved to the [data_warm] tier but there are currently no [data_warm] nodes in the cluster")
);
"index [" + index.getName() + "] has a preference for tiers [data_warm], but no nodes for any of those tiers are available " +
"in the cluster"));

Result actualResult = step.isConditionMet(index, clusterState);
assertThat(actualResult.isComplete(), is(false));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,21 @@

import org.elasticsearch.common.io.stream.Writeable.Reader;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.xpack.cluster.routing.allocation.DataTierAllocationDecider;
import org.elasticsearch.xpack.core.ilm.Step.StepKey;

import java.io.IOException;
import java.util.List;

import static org.elasticsearch.xpack.core.DataTier.DATA_COLD;
import static org.elasticsearch.xpack.core.DataTier.DATA_HOT;
import static org.elasticsearch.xpack.core.DataTier.DATA_WARM;
import static org.elasticsearch.xpack.core.ilm.MigrateAction.getPreferredTiersConfiguration;
import static org.elasticsearch.xpack.core.ilm.TimeseriesLifecycleType.COLD_PHASE;
import static org.elasticsearch.xpack.core.ilm.TimeseriesLifecycleType.DELETE_PHASE;
import static org.elasticsearch.xpack.core.ilm.TimeseriesLifecycleType.HOT_PHASE;
import static org.elasticsearch.xpack.core.ilm.TimeseriesLifecycleType.WARM_PHASE;
import static org.hamcrest.CoreMatchers.is;

public class MigrateActionTests extends AbstractActionTestCase<MigrateAction> {

Expand Down Expand Up @@ -56,4 +65,36 @@ public void testToSteps() {
assertEquals(0, steps.size());
}
}

public void testGetPreferredTiersConfiguration() {
assertThat(getPreferredTiersConfiguration(DATA_HOT), is(DATA_HOT));
assertThat(getPreferredTiersConfiguration(DATA_WARM), is(DATA_WARM + "," + DATA_HOT));
assertThat(getPreferredTiersConfiguration(DATA_COLD), is(DATA_COLD + "," + DATA_WARM + "," + DATA_HOT));
IllegalArgumentException exception = expectThrows(IllegalArgumentException.class, () -> getPreferredTiersConfiguration("no_tier"));
assertThat(exception.getMessage(), is("invalid data tier [no_tier]"));
}

public void testMigrateActionsConfiguresTierPreference() {
StepKey nextStepKey = new StepKey(randomAlphaOfLengthBetween(1, 10), randomAlphaOfLengthBetween(1, 10),
randomAlphaOfLengthBetween(1, 10));
MigrateAction action = new MigrateAction();
{
List<Step> steps = action.toSteps(null, HOT_PHASE, nextStepKey);
UpdateSettingsStep firstStep = (UpdateSettingsStep) steps.get(0);
assertThat(DataTierAllocationDecider.INDEX_ROUTING_PREFER_SETTING.get(firstStep.getSettings()),
is(DATA_HOT));
}
{
List<Step> steps = action.toSteps(null, WARM_PHASE, nextStepKey);
UpdateSettingsStep firstStep = (UpdateSettingsStep) steps.get(0);
assertThat(DataTierAllocationDecider.INDEX_ROUTING_PREFER_SETTING.get(firstStep.getSettings()),
is(DATA_WARM + "," + DATA_HOT));
}
{
List<Step> steps = action.toSteps(null, COLD_PHASE, nextStepKey);
UpdateSettingsStep firstStep = (UpdateSettingsStep) steps.get(0);
assertThat(DataTierAllocationDecider.INDEX_ROUTING_PREFER_SETTING.get(firstStep.getSettings()),
is(DATA_COLD + "," + DATA_WARM + "," + DATA_HOT));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,19 @@ public static Settings coldNode(final Settings settings) {

public void testIndexDataTierMigration() throws Exception {
internalCluster().startMasterOnlyNodes(1, Settings.EMPTY);
logger.info("starting hot data node");
logger.info("starting 2 hot data nodes");
internalCluster().startNode(hotNode(Settings.EMPTY));
internalCluster().startNode(hotNode(Settings.EMPTY));

// it's important we start one node of each tear as otherwise all phases will be allocated on the 2 available hot nodes (as our
// tier preference configuration will not detect any available warm/cold tier node and will fallback to the available hot tier)
// we want ILM to stop in the check-migration step in the warm and cold phase so we can unblock it manually by starting another
// node in the corresponding tier (so that the index replica is allocated)
logger.info("starting a warm data node");
internalCluster().startNode(warmNode(Settings.EMPTY));

logger.info("starting a cold data node");
internalCluster().startNode(coldNode(Settings.EMPTY));

Phase hotPhase = new Phase("hot", TimeValue.ZERO, Collections.emptyMap());
Phase warmPhase = new Phase("warm", TimeValue.ZERO, Collections.emptyMap());
Expand All @@ -104,7 +115,7 @@ public void testIndexDataTierMigration() throws Exception {
assertAcked(putLifecycleResponse);

Settings settings = Settings.builder().put(indexSettings()).put(SETTING_NUMBER_OF_SHARDS, 1)
.put(SETTING_NUMBER_OF_REPLICAS, 0).put(LifecycleSettings.LIFECYCLE_NAME, policy).build();
.put(SETTING_NUMBER_OF_REPLICAS, 1).put(LifecycleSettings.LIFECYCLE_NAME, policy).build();
CreateIndexResponse res = client().admin().indices().prepareCreate(managedIndex).setSettings(settings).get();
assertTrue(res.isAcknowledged());

Expand All @@ -118,7 +129,7 @@ public void testIndexDataTierMigration() throws Exception {
assertThat(indexLifecycleExplainResponse.getStep(), is(DataTierMigrationRoutedStep.NAME));
});

logger.info("starting warm data node");
logger.info("starting a warm data node");
internalCluster().startNode(warmNode(Settings.EMPTY));
assertBusy(() -> {
ExplainLifecycleRequest explainRequest = new ExplainLifecycleRequest().indices(managedIndex);
Expand All @@ -130,7 +141,7 @@ public void testIndexDataTierMigration() throws Exception {
assertThat(indexLifecycleExplainResponse.getStep(), is(DataTierMigrationRoutedStep.NAME));
});

logger.info("starting cold data node");
logger.info("starting a cold data node");
internalCluster().startNode(coldNode(Settings.EMPTY));

// wait for lifecycle to complete in the cold phase after the index has been migrated to the cold node
Expand All @@ -147,9 +158,15 @@ public void testIndexDataTierMigration() throws Exception {

public void testUserOptsOutOfTierMigration() throws Exception {
internalCluster().startMasterOnlyNodes(1, Settings.EMPTY);
logger.info("starting hot data node");
logger.info("starting a hot data node");
internalCluster().startNode(hotNode(Settings.EMPTY));

logger.info("starting a warm data node");
internalCluster().startNode(warmNode(Settings.EMPTY));

logger.info("starting a cold data node");
internalCluster().startNode(coldNode(Settings.EMPTY));

Phase hotPhase = new Phase("hot", TimeValue.ZERO, Collections.emptyMap());
Phase warmPhase = new Phase("warm", TimeValue.ZERO, Collections.emptyMap());
Phase coldPhase = new Phase("cold", TimeValue.ZERO, Collections.emptyMap());
Expand All @@ -171,26 +188,14 @@ public void testUserOptsOutOfTierMigration() throws Exception {
IndexLifecycleExplainResponse indexLifecycleExplainResponse = explainResponse.getIndexResponses().get(managedIndex);
assertThat(indexLifecycleExplainResponse.getPhase(), is("warm"));
assertThat(indexLifecycleExplainResponse.getStep(), is(DataTierMigrationRoutedStep.NAME));
});
assertReplicaIsUnassigned();
}, 30, TimeUnit.SECONDS);

Settings removeTierRoutingSetting = Settings.builder().putNull(DataTierAllocationDecider.INDEX_ROUTING_PREFER).build();
UpdateSettingsRequest updateSettingsRequest = new UpdateSettingsRequest(managedIndex).settings(removeTierRoutingSetting);
assertAcked(client().admin().indices().updateSettings(updateSettingsRequest).actionGet());

assertBusy(() -> {
ExplainLifecycleRequest explainRequest = new ExplainLifecycleRequest().indices(managedIndex);
ExplainLifecycleResponse explainResponse = client().execute(ExplainLifecycleAction.INSTANCE,
explainRequest).get();

IndexLifecycleExplainResponse indexLifecycleExplainResponse = explainResponse.getIndexResponses().get(managedIndex);
assertThat(indexLifecycleExplainResponse.getPhase(), is("warm"));
assertThat(indexLifecycleExplainResponse.getStep(), is(DataTierMigrationRoutedStep.NAME));
assertReplicaIsUnassigned();
}, 30, TimeUnit.SECONDS);

internalCluster().startNode(coldNode(Settings.EMPTY));

// the index should successfully allocate
// the index should successfully allocate on any nodes
ensureGreen(managedIndex);

// the index is successfully allocated but the migrate action from the cold phase re-configured the tier migration setting to the
Expand All @@ -206,7 +211,7 @@ public void testUserOptsOutOfTierMigration() throws Exception {
IndexLifecycleExplainResponse indexLifecycleExplainResponse = explainResponse.getIndexResponses().get(managedIndex);
assertThat(indexLifecycleExplainResponse.getPhase(), is("cold"));
assertThat(indexLifecycleExplainResponse.getStep(), is(DataTierMigrationRoutedStep.NAME));
});
}, 30, TimeUnit.SECONDS);

// remove the tier routing setting again
assertAcked(client().admin().indices().updateSettings(updateSettingsRequest).actionGet());
Expand Down