Skip to content

Commit

Permalink
[ILM] Version the order of the actions in a phase.
Browse files Browse the repository at this point in the history
  • Loading branch information
andreidan committed Jul 19, 2023
1 parent 83a7a9d commit 05a4cd1
Show file tree
Hide file tree
Showing 16 changed files with 396 additions and 146 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,16 @@ public record LifecycleExecutionState(
String snapshotName,
String shrinkIndexName,
String snapshotIndexName,
String downsampleIndexName
String downsampleIndexName,
Integer actionsOrderVersion
) {

public LifecycleExecutionState {
if (actionsOrderVersion == null) {
actionsOrderVersion = 1;
}
}

public static final String ILM_CUSTOM_METADATA_KEY = "ilm";

private static final String PHASE = "phase";
Expand All @@ -58,6 +65,7 @@ public record LifecycleExecutionState(
private static final String SNAPSHOT_INDEX_NAME = "snapshot_index_name";
private static final String SHRINK_INDEX_NAME = "shrink_index_name";
private static final String DOWNSAMPLE_INDEX_NAME = "rollup_index_name";
private static final String ACTIONS_ORDER_VERSION = "actions_order_version";

public static final LifecycleExecutionState EMPTY_STATE = LifecycleExecutionState.builder().build();

Expand All @@ -82,7 +90,8 @@ public static Builder builder(LifecycleExecutionState state) {
.setShrinkIndexName(state.shrinkIndexName)
.setSnapshotIndexName(state.snapshotIndexName)
.setDownsampleIndexName(state.downsampleIndexName)
.setStepTime(state.stepTime);
.setStepTime(state.stepTime)
.setActionsOrderVersion(state.actionsOrderVersion);
}

public static LifecycleExecutionState fromCustomMetadata(Map<String, String> customData) {
Expand Down Expand Up @@ -191,6 +200,10 @@ public static LifecycleExecutionState fromCustomMetadata(Map<String, String> cus
if (downsampleIndexName != null) {
builder.setDownsampleIndexName(downsampleIndexName);
}
String actionsOrderVersion = customData.get(ACTIONS_ORDER_VERSION);
if (actionsOrderVersion != null) {
builder.setActionsOrderVersion(Integer.parseInt(actionsOrderVersion));
}
return builder.build();
}

Expand Down Expand Up @@ -253,6 +266,9 @@ public Map<String, String> asMap() {
if (downsampleIndexName != null) {
result.put(DOWNSAMPLE_INDEX_NAME, downsampleIndexName);
}
if (actionsOrderVersion != null) {
result.put(ACTIONS_ORDER_VERSION, String.valueOf(actionsOrderVersion));
}
return Collections.unmodifiableMap(result);
}

Expand All @@ -274,6 +290,7 @@ public static class Builder {
private String shrinkIndexName;
private String snapshotIndexName;
private String downsampleIndexName;
private Integer actionsOrderVersion;

public Builder setPhase(String phase) {
this.phase = phase;
Expand Down Expand Up @@ -360,6 +377,11 @@ public Builder setDownsampleIndexName(String downsampleIndexName) {
return this;
}

public Builder setActionsOrderVersion(Integer actionsOrderVersion) {
this.actionsOrderVersion = actionsOrderVersion;
return this;
}

public LifecycleExecutionState build() {
return new LifecycleExecutionState(
phase,
Expand All @@ -378,7 +400,8 @@ public LifecycleExecutionState build() {
snapshotName,
shrinkIndexName,
snapshotIndexName,
downsampleIndexName
downsampleIndexName,
actionsOrderVersion
);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,10 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
* @return The list of {@link Step} objects in order of their execution.
*/
public List<Step> toSteps(Client client, XPackLicenseState licenseState) {
return toSteps(client, type.getLatestActionsOrderVersion(), licenseState);
}

public List<Step> toSteps(Client client, int orderVersion, XPackLicenseState licenseState) {
List<Step> steps = new ArrayList<>();
List<Phase> orderedPhases = type.getOrderedPhases(phases);
ListIterator<Phase> phaseIterator = orderedPhases.listIterator(orderedPhases.size());
Expand All @@ -233,7 +237,7 @@ public List<Step> toSteps(Client client, XPackLicenseState licenseState) {
}

phase = previousPhase;
List<LifecycleAction> orderedActions = type.getOrderedActions(phase);
List<LifecycleAction> orderedActions = type.getOrderedActions(phase, orderVersion);
ListIterator<LifecycleAction> actionIterator = orderedActions.listIterator(orderedActions.size());
// add steps for each action, in reverse
while (actionIterator.hasPrevious()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ public interface LifecycleType extends NamedWriteable {
*/
List<Phase> getOrderedPhases(Map<String, Phase> phases);

List<LifecycleAction> getOrderedActions(Phase phase);
List<LifecycleAction> getOrderedActions(Phase phase, int orderVersion);

/**
* validates whether the specified <code>phases</code> are valid for this
Expand All @@ -31,4 +31,6 @@ public interface LifecycleType extends NamedWriteable {
* if a specific phase or lack of a specific phase is invalid.
*/
void validate(Collection<Phase> phases);

int getLatestActionsOrderVersion();
}
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ public static void refreshPhaseDefinition(
updatedPolicy.getModifiedDate()
);

// we're not updating the actions order version here as we're still executing the same phase (and the step keys are identical)
LifecycleExecutionState newExState = LifecycleExecutionState.builder(currentExState)
.setPhaseDefinition(Strings.toString(pei, false, false))
.build();
Expand Down Expand Up @@ -192,7 +193,9 @@ public static boolean isIndexPhaseDefinitionUpdatable(
final Step.StepKey currentStepKey = Step.getCurrentStepKey(executionState);
final String currentPhase = currentStepKey.phase();

final Set<Step.StepKey> newStepKeys = newPolicy.toSteps(client, licenseState)
// reading the same order version of the new policy as if we refresh the cached phase, being the same phase
// as it's currently executing, we won't update the actions order version
final Set<Step.StepKey> newStepKeys = newPolicy.toSteps(client, executionState.actionsOrderVersion(), licenseState)
.stream()
.map(Step::getKey)
.collect(Collectors.toCollection(LinkedHashSet::new));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.core.ilm;

import org.elasticsearch.common.util.set.Sets;

import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Stream;

/**
* Defines a record of the actions that are allowed for the timeseries lifecycle.
* The order of actions is versioned, newer versions must contain all previous defined actions and are
* allowed to make additive changes (we do not support removing actions, but actions to be removed
* must be converted to no-ops)
*/
public final class TimeseriesLifecycleActionsRegistry {

public static final int VERSION_ONE = 1;
public static final int CURRENT_VERSION = VERSION_ONE;

static final String HOT_PHASE = "hot";
static final String WARM_PHASE = "warm";
static final String COLD_PHASE = "cold";
static final String FROZEN_PHASE = "frozen";
static final String DELETE_PHASE = "delete";

public static final Map<Integer, List<String>> ORDERED_VALID_HOT_ACTIONS = Map.of(
VERSION_ONE,
Stream.of(
SetPriorityAction.NAME,
UnfollowAction.NAME,
RolloverAction.NAME,
ReadOnlyAction.NAME,
DownsampleAction.NAME,
ShrinkAction.NAME,
ForceMergeAction.NAME,
SearchableSnapshotAction.NAME
).filter(Objects::nonNull).toList()
);

public static final Map<Integer, List<String>> ORDERED_VALID_WARM_ACTIONS = Map.of(
VERSION_ONE,
Stream.of(
SetPriorityAction.NAME,
UnfollowAction.NAME,
ReadOnlyAction.NAME,
DownsampleAction.NAME,
AllocateAction.NAME,
MigrateAction.NAME,
ShrinkAction.NAME,
ForceMergeAction.NAME
).filter(Objects::nonNull).toList()
);

public static final Map<Integer, List<String>> ORDERED_VALID_COLD_ACTIONS = Map.of(
VERSION_ONE,
Stream.of(
SetPriorityAction.NAME,
UnfollowAction.NAME,
ReadOnlyAction.NAME,
DownsampleAction.NAME,
SearchableSnapshotAction.NAME,
AllocateAction.NAME,
MigrateAction.NAME,
FreezeAction.NAME
).filter(Objects::nonNull).toList()
);

public static final Map<Integer, List<String>> ORDERED_VALID_FROZEN_ACTIONS = Map.of(
VERSION_ONE,
List.of(UnfollowAction.NAME, SearchableSnapshotAction.NAME)
);
public static final Map<Integer, List<String>> ORDERED_VALID_DELETE_ACTIONS = Map.of(
VERSION_ONE,
List.of(WaitForSnapshotAction.NAME, DeleteAction.NAME)
);

static final Set<String> VALID_HOT_ACTIONS = Sets.newHashSet(ORDERED_VALID_HOT_ACTIONS.get(CURRENT_VERSION));
static final Set<String> VALID_WARM_ACTIONS = Sets.newHashSet(ORDERED_VALID_WARM_ACTIONS.get(CURRENT_VERSION));
static final Set<String> VALID_COLD_ACTIONS = Sets.newHashSet(ORDERED_VALID_COLD_ACTIONS.get(CURRENT_VERSION));
static final Set<String> VALID_FROZEN_ACTIONS = Sets.newHashSet(ORDERED_VALID_FROZEN_ACTIONS.get(CURRENT_VERSION));
static final Set<String> VALID_DELETE_ACTIONS = Sets.newHashSet(ORDERED_VALID_DELETE_ACTIONS.get(CURRENT_VERSION));

static final Map<String, Set<String>> ALLOWED_ACTIONS = Map.of(
HOT_PHASE,
VALID_HOT_ACTIONS,
WARM_PHASE,
VALID_WARM_ACTIONS,
COLD_PHASE,
VALID_COLD_ACTIONS,
DELETE_PHASE,
VALID_DELETE_ACTIONS,
FROZEN_PHASE,
VALID_FROZEN_ACTIONS
);
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,15 @@
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static java.util.stream.Collectors.toList;
import static org.elasticsearch.xpack.core.ilm.TimeseriesLifecycleActionsRegistry.ALLOWED_ACTIONS;
import static org.elasticsearch.xpack.core.ilm.TimeseriesLifecycleActionsRegistry.CURRENT_VERSION;
import static org.elasticsearch.xpack.core.ilm.TimeseriesLifecycleActionsRegistry.ORDERED_VALID_COLD_ACTIONS;
import static org.elasticsearch.xpack.core.ilm.TimeseriesLifecycleActionsRegistry.ORDERED_VALID_DELETE_ACTIONS;
import static org.elasticsearch.xpack.core.ilm.TimeseriesLifecycleActionsRegistry.ORDERED_VALID_FROZEN_ACTIONS;
import static org.elasticsearch.xpack.core.ilm.TimeseriesLifecycleActionsRegistry.ORDERED_VALID_HOT_ACTIONS;
import static org.elasticsearch.xpack.core.ilm.TimeseriesLifecycleActionsRegistry.ORDERED_VALID_WARM_ACTIONS;

/**
* Represents the lifecycle of an index from creation to deletion. A
Expand All @@ -52,58 +58,6 @@ public class TimeseriesLifecycleType implements LifecycleType {
static final String DELETE_PHASE = "delete";
public static final List<String> ORDERED_VALID_PHASES = List.of(HOT_PHASE, WARM_PHASE, COLD_PHASE, FROZEN_PHASE, DELETE_PHASE);

public static final List<String> ORDERED_VALID_HOT_ACTIONS = Stream.of(
SetPriorityAction.NAME,
UnfollowAction.NAME,
RolloverAction.NAME,
ReadOnlyAction.NAME,
DownsampleAction.NAME,
ShrinkAction.NAME,
ForceMergeAction.NAME,
SearchableSnapshotAction.NAME
).filter(Objects::nonNull).toList();
public static final List<String> ORDERED_VALID_WARM_ACTIONS = Stream.of(
SetPriorityAction.NAME,
UnfollowAction.NAME,
ReadOnlyAction.NAME,
DownsampleAction.NAME,
AllocateAction.NAME,
MigrateAction.NAME,
ShrinkAction.NAME,
ForceMergeAction.NAME
).filter(Objects::nonNull).toList();
public static final List<String> ORDERED_VALID_COLD_ACTIONS = Stream.of(
SetPriorityAction.NAME,
UnfollowAction.NAME,
ReadOnlyAction.NAME,
DownsampleAction.NAME,
SearchableSnapshotAction.NAME,
AllocateAction.NAME,
MigrateAction.NAME,
FreezeAction.NAME
).filter(Objects::nonNull).toList();
public static final List<String> ORDERED_VALID_FROZEN_ACTIONS = List.of(UnfollowAction.NAME, SearchableSnapshotAction.NAME);
public static final List<String> ORDERED_VALID_DELETE_ACTIONS = List.of(WaitForSnapshotAction.NAME, DeleteAction.NAME);

static final Set<String> VALID_HOT_ACTIONS = Sets.newHashSet(ORDERED_VALID_HOT_ACTIONS);
static final Set<String> VALID_WARM_ACTIONS = Sets.newHashSet(ORDERED_VALID_WARM_ACTIONS);
static final Set<String> VALID_COLD_ACTIONS = Sets.newHashSet(ORDERED_VALID_COLD_ACTIONS);
static final Set<String> VALID_FROZEN_ACTIONS = Sets.newHashSet(ORDERED_VALID_FROZEN_ACTIONS);
static final Set<String> VALID_DELETE_ACTIONS = Sets.newHashSet(ORDERED_VALID_DELETE_ACTIONS);

private static final Map<String, Set<String>> ALLOWED_ACTIONS = Map.of(
HOT_PHASE,
VALID_HOT_ACTIONS,
WARM_PHASE,
VALID_WARM_ACTIONS,
COLD_PHASE,
VALID_COLD_ACTIONS,
DELETE_PHASE,
VALID_DELETE_ACTIONS,
FROZEN_PHASE,
VALID_FROZEN_ACTIONS
);

static final Set<String> HOT_ACTIONS_THAT_REQUIRE_ROLLOVER = Set.of(
ReadOnlyAction.NAME,
ShrinkAction.NAME,
Expand Down Expand Up @@ -177,14 +131,40 @@ public static boolean shouldInjectMigrateStepForPhase(Phase phase) {
return true;
}

public List<LifecycleAction> getOrderedActions(Phase phase) {
@Override
public List<LifecycleAction> getOrderedActions(Phase phase, int actionsOrderVersion) {
if (actionsOrderVersion > getLatestActionsOrderVersion()) {
throw new IllegalArgumentException(
"invalid actions order requested [" + actionsOrderVersion + "]. Latest version is [" + getLatestActionsOrderVersion() + "]"
);
}
Map<String, LifecycleAction> actions = phase.getActions();
return switch (phase.getName()) {
case HOT_PHASE -> ORDERED_VALID_HOT_ACTIONS.stream().map(actions::get).filter(Objects::nonNull).collect(toList());
case WARM_PHASE -> ORDERED_VALID_WARM_ACTIONS.stream().map(actions::get).filter(Objects::nonNull).collect(toList());
case COLD_PHASE -> ORDERED_VALID_COLD_ACTIONS.stream().map(actions::get).filter(Objects::nonNull).collect(toList());
case FROZEN_PHASE -> ORDERED_VALID_FROZEN_ACTIONS.stream().map(actions::get).filter(Objects::nonNull).collect(toList());
case DELETE_PHASE -> ORDERED_VALID_DELETE_ACTIONS.stream().map(actions::get).filter(Objects::nonNull).collect(toList());
case HOT_PHASE -> ORDERED_VALID_HOT_ACTIONS.get(actionsOrderVersion)
.stream()
.map(actions::get)
.filter(Objects::nonNull)
.collect(toList());
case WARM_PHASE -> ORDERED_VALID_WARM_ACTIONS.get(actionsOrderVersion)
.stream()
.map(actions::get)
.filter(Objects::nonNull)
.collect(toList());
case COLD_PHASE -> ORDERED_VALID_COLD_ACTIONS.get(actionsOrderVersion)
.stream()
.map(actions::get)
.filter(Objects::nonNull)
.collect(toList());
case FROZEN_PHASE -> ORDERED_VALID_FROZEN_ACTIONS.get(actionsOrderVersion)
.stream()
.map(actions::get)
.filter(Objects::nonNull)
.collect(toList());
case DELETE_PHASE -> ORDERED_VALID_DELETE_ACTIONS.get(actionsOrderVersion)
.stream()
.map(actions::get)
.filter(Objects::nonNull)
.collect(toList());
default -> throw new IllegalArgumentException("lifecycle type [" + TYPE + "] does not support phase [" + phase.getName() + "]");
};
}
Expand Down Expand Up @@ -254,6 +234,11 @@ && definesAllocationRules((AllocateAction) phase.getActions().get(AllocateAction
validateDownsamplingIntervals(phases);
}

@Override
public int getLatestActionsOrderVersion() {
return CURRENT_VERSION;
}

static void validateActionsFollowingSearchableSnapshot(Collection<Phase> phases) {
// invalid configurations can occur if searchable_snapshot is defined in the `hot` phase, with subsequent invalid actions
// being defined in the warm/cold/frozen phases, or if it is defined in the `cold` phase with subsequent invalid actions
Expand Down
Loading

0 comments on commit 05a4cd1

Please sign in to comment.