Skip to content

Commit

Permalink
[7.17] ILM: fix allocate action to allow only total_shards_per_node (#…
Browse files Browse the repository at this point in the history
…81944) (#82192)

* ILM: fix allocate action to allow only total_shards_per_node (#81944)

The allocate action can specify only `number_of_replicas` (without
routing configuration) but failed if it attempted to only specify
`total_shards_per_node`.

This fixes the action to allow specifying only `total_shards_per_node`.

* Language level adjustments

* Spotless
  • Loading branch information
andreidan committed Jan 4, 2022
1 parent a624548 commit 12ba22c
Show file tree
Hide file tree
Showing 4 changed files with 116 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -88,16 +88,25 @@ public AllocateAction(
} else {
this.require = require;
}
if (this.include.isEmpty() && this.exclude.isEmpty() && this.require.isEmpty() && numberOfReplicas == null) {
if (this.include.isEmpty()
&& this.exclude.isEmpty()
&& this.require.isEmpty()
&& numberOfReplicas == null
&& totalShardsPerNode == null) {
throw new IllegalArgumentException(
"At least one of "
+ INCLUDE_FIELD.getPreferredName()
+ ", "
+ EXCLUDE_FIELD.getPreferredName()
+ " or "
+ REQUIRE_FIELD.getPreferredName()
+ "must contain attributes for action "
+ " must contain attributes for action "
+ NAME
+ ". Otherwise the "
+ NUMBER_OF_REPLICAS_FIELD.getPreferredName()
+ " or the "
+ TOTAL_SHARDS_PER_NODE_FIELD.getPreferredName()
+ " options must be configured."
);
}
if (numberOfReplicas != null && numberOfReplicas < 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@
import java.util.stream.Collectors;

import static org.elasticsearch.cluster.routing.allocation.decider.ShardsLimitAllocationDecider.INDEX_TOTAL_SHARDS_PER_NODE_SETTING;
import static org.elasticsearch.xpack.core.ilm.AllocateAction.NUMBER_OF_REPLICAS_FIELD;
import static org.elasticsearch.xpack.core.ilm.AllocateAction.TOTAL_SHARDS_PER_NODE_FIELD;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;

public class AllocateActionTests extends AbstractActionTestCase<AllocateAction> {

Expand Down Expand Up @@ -58,7 +61,7 @@ static AllocateAction randomInstance() {
requires = randomBoolean() ? null : Collections.emptyMap();
}
Integer numberOfReplicas = randomBoolean() ? null : randomIntBetween(0, 10);
Integer totalShardsPerNode = randomBoolean() ? null : randomIntBetween(-1, 300);
Integer totalShardsPerNode = randomBoolean() ? null : randomIntBetween(-1, 10);
return new AllocateAction(numberOfReplicas, totalShardsPerNode, includes, excludes, requires);
}

Expand All @@ -74,7 +77,7 @@ protected AllocateAction mutateInstance(AllocateAction instance) {
Map<String, String> require = instance.getRequire();
Integer numberOfReplicas = instance.getNumberOfReplicas();
Integer totalShardsPerNode = instance.getTotalShardsPerNode();
switch (randomIntBetween(0, 3)) {
switch (randomIntBetween(0, 4)) {
case 0:
include = new HashMap<>(include);
include.put(randomAlphaOfLengthBetween(11, 15), randomAlphaOfLengthBetween(1, 20));
Expand All @@ -90,6 +93,9 @@ protected AllocateAction mutateInstance(AllocateAction instance) {
case 3:
numberOfReplicas = randomIntBetween(11, 20);
break;
case 4:
totalShardsPerNode = randomIntBetween(11, 20);
break;
default:
throw new AssertionError("Illegal randomisation branch");
}
Expand All @@ -111,8 +117,13 @@ public void testAllMapsNullOrEmpty() {
+ AllocateAction.EXCLUDE_FIELD.getPreferredName()
+ " or "
+ AllocateAction.REQUIRE_FIELD.getPreferredName()
+ "must contain attributes for action "
+ AllocateAction.NAME,
+ " must contain attributes for action "
+ AllocateAction.NAME
+ ". Otherwise the "
+ NUMBER_OF_REPLICAS_FIELD.getPreferredName()
+ " or the "
+ TOTAL_SHARDS_PER_NODE_FIELD.getPreferredName()
+ " options must be configured.",
exception.getMessage()
);
}
Expand All @@ -125,7 +136,7 @@ public void testInvalidNumberOfReplicas() {
IllegalArgumentException.class,
() -> new AllocateAction(randomIntBetween(-1000, -1), randomIntBetween(0, 300), include, exclude, require)
);
assertEquals("[" + AllocateAction.NUMBER_OF_REPLICAS_FIELD.getPreferredName() + "] must be >= 0", exception.getMessage());
assertEquals("[" + NUMBER_OF_REPLICAS_FIELD.getPreferredName() + "] must be >= 0", exception.getMessage());
}

public void testInvalidTotalShardsPerNode() {
Expand All @@ -136,7 +147,7 @@ public void testInvalidTotalShardsPerNode() {
IllegalArgumentException.class,
() -> new AllocateAction(randomIntBetween(0, 300), randomIntBetween(-1000, -2), include, exclude, require)
);
assertEquals("[" + AllocateAction.TOTAL_SHARDS_PER_NODE_FIELD.getPreferredName() + "] must be >= -1", exception.getMessage());
assertEquals("[" + TOTAL_SHARDS_PER_NODE_FIELD.getPreferredName() + "] must be >= -1", exception.getMessage());
}

public static Map<String, String> randomAllocationRoutingMap(int minEntries, int maxEntries) {
Expand Down Expand Up @@ -208,6 +219,10 @@ public void testTotalNumberOfShards() throws Exception {
steps = action.toSteps(null, phase, nextStepKey);
firstStep = (UpdateSettingsStep) steps.get(0);
assertEquals(null, firstStep.getSettings().get(INDEX_TOTAL_SHARDS_PER_NODE_SETTING.getKey()));

// allow an allocate action that only specifies total shards per node (don't expect any exceptions in this case)
action = new AllocateAction(null, 5, null, null, null);
assertThat(action.getTotalShardsPerNode(), is(5));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -381,8 +381,8 @@ private static LifecyclePolicy migrateSingleILMPolicy(String nodeAttrName, Lifec
Map<String, LifecycleAction> actionMap = new HashMap<>(phase.getActions());
// this phase contains an allocate action that defines a require rule for the attribute name so we'll remove all the
// rules to allow for the migrate action to be injected
if (allocateAction.getNumberOfReplicas() != null) {
// keep the number of replicas configuration
if (allocateAction.getNumberOfReplicas() != null || allocateAction.getTotalShardsPerNode() != null) {
// keep the number of replicas configuration and/or the total shards per node configuration
AllocateAction updatedAllocateAction = new AllocateAction(
allocateAction.getNumberOfReplicas(),
allocateAction.getTotalShardsPerNode(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.Map;

import static org.elasticsearch.cluster.metadata.IndexMetadata.INDEX_ROUTING_EXCLUDE_GROUP_SETTING;
Expand Down Expand Up @@ -286,6 +287,65 @@ public void testMigrateIlmPolicyRefreshesCachedPhase() {
assertThat(allocateDef.get("require"), is(Collections.emptyMap()));
}

{
// index is in the cold phase and the migrated allocate action is not removed due to allocate specifying
// total_shards_per_node
LifecyclePolicyMetadata policyMetadataWithTotalShardsPerNode = getWarmColdPolicyMeta(
warmSetPriority,
shrinkAction,
warmAllocateAction,
new AllocateAction(null, 1, null, null, org.elasticsearch.core.Map.of("data", "cold"))
);

LifecycleExecutionState preMigrationExecutionState = LifecycleExecutionState.builder()
.setPhase("cold")
.setAction("allocate")
.setStep("allocate")
.setPhaseDefinition(getColdPhaseDefinitionWithTotalShardsPerNode())
.build();

IndexMetadata.Builder indexMetadata = IndexMetadata.builder(indexName)
.settings(getBaseIndexSettings())
.putCustom(ILM_CUSTOM_METADATA_KEY, preMigrationExecutionState.asMap());

ClusterState state = ClusterState.builder(ClusterName.DEFAULT)
.metadata(
Metadata.builder()
.putCustom(
IndexLifecycleMetadata.TYPE,
new IndexLifecycleMetadata(
Collections.singletonMap(
policyMetadataWithTotalShardsPerNode.getName(),
policyMetadataWithTotalShardsPerNode
),
OperationMode.STOPPED
)
)
.put(indexMetadata)
.build()
)
.build();

Metadata.Builder newMetadata = Metadata.builder(state.metadata());
List<String> migratedPolicies = migrateIlmPolicies(newMetadata, state, "data", REGISTRY, client, null);

assertThat(migratedPolicies.get(0), is(lifecycleName));
ClusterState newState = ClusterState.builder(state).metadata(newMetadata).build();
LifecycleExecutionState newLifecycleState = LifecycleExecutionState.fromIndexMetadata(newState.metadata().index(indexName));

Map<String, Object> migratedPhaseDefAsMap = getPhaseDefinitionAsMap(newLifecycleState);

// expecting the phase definition to be refreshed with the migrated phase representation
// ie. allocate action does not contain any allocation rules
Map<String, Object> actions = (Map<String, Object>) migratedPhaseDefAsMap.get("actions");
assertThat(actions.size(), is(1));
Map<String, Object> allocateDef = (Map<String, Object>) actions.get(AllocateAction.NAME);
assertThat(allocateDef, notNullValue());
assertThat(allocateDef.get("include"), is(org.elasticsearch.core.Map.of()));
assertThat(allocateDef.get("exclude"), is(org.elasticsearch.core.Map.of()));
assertThat(allocateDef.get("require"), is(org.elasticsearch.core.Map.of()));
}

{
// index is in the warm phase executing the allocate action, the migrated allocate action is removed
LifecycleExecutionState preMigrationExecutionState = LifecycleExecutionState.builder()
Expand Down Expand Up @@ -1002,6 +1062,28 @@ private String getWarmPhaseDef() {
+ " }";
}

private String getColdPhaseDefinitionWithTotalShardsPerNode() {
return String.format(
Locale.ROOT,
" {\n"
+ " \"policy\": \"%s\",\n"
+ " \"phase_definition\": {\n"
+ " \"min_age\": \"0m\",\n"
+ " \"actions\": {\n"
+ " \"allocate\": {\n"
+ " \"total_shards_per_node\": \"1\",\n"
+ " \"require\": {\n"
+ " \"data\": \"cold\"\n"
+ " }\n"
+ " }\n"
+ " }\n"
+ " },\n"
+ " \"version\": 1,\n"
+ " \"modified_date_in_millis\": 1578521007076 \n}",
lifecycleName
);
}

private String getColdPhaseDefinition() {
return "{\n"
+ " \"policy\" : \""
Expand Down

0 comments on commit 12ba22c

Please sign in to comment.