diff --git a/server/src/main/java/org/elasticsearch/cluster/ClusterModule.java b/server/src/main/java/org/elasticsearch/cluster/ClusterModule.java index e1e9444d983fb..53c2477530708 100644 --- a/server/src/main/java/org/elasticsearch/cluster/ClusterModule.java +++ b/server/src/main/java/org/elasticsearch/cluster/ClusterModule.java @@ -58,6 +58,7 @@ import org.elasticsearch.cluster.routing.allocation.decider.DiskThresholdDecider; import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider; import org.elasticsearch.cluster.routing.allocation.decider.FilterAllocationDecider; +import org.elasticsearch.cluster.routing.allocation.decider.IndexBalanceAllocationDecider; import org.elasticsearch.cluster.routing.allocation.decider.IndexVersionAllocationDecider; import org.elasticsearch.cluster.routing.allocation.decider.MaxRetryAllocationDecider; import org.elasticsearch.cluster.routing.allocation.decider.NodeReplacementAllocationDecider; @@ -497,6 +498,7 @@ public static Collection createAllocationDeciders( addAllocationDecider(deciders, new ThrottlingAllocationDecider(clusterSettings)); addAllocationDecider(deciders, new ShardsLimitAllocationDecider(clusterSettings)); addAllocationDecider(deciders, new AwarenessAllocationDecider(settings, clusterSettings)); + addAllocationDecider(deciders, new IndexBalanceAllocationDecider(settings, clusterSettings)); clusterPlugins.stream() .flatMap(p -> p.createAllocationDeciders(settings, clusterSettings).stream()) diff --git a/server/src/main/java/org/elasticsearch/cluster/node/DiscoveryNodeFilters.java b/server/src/main/java/org/elasticsearch/cluster/node/DiscoveryNodeFilters.java index 4251acfa301f2..c1bbc345633d9 100644 --- a/server/src/main/java/org/elasticsearch/cluster/node/DiscoveryNodeFilters.java +++ b/server/src/main/java/org/elasticsearch/cluster/node/DiscoveryNodeFilters.java @@ -250,6 +250,10 @@ private boolean isSingleNodeFilterInternal() { || (filters.size() > 1 && opType == OpType.AND && NON_ATTRIBUTE_NAMES.containsAll(filters.keySet())); } + public boolean hasFilters() { + return filters.isEmpty() == false; + } + /** * Generates a human-readable string for the DiscoverNodeFilters. * Example: {@code _id:"id1 OR blah",name:"blah OR name2"} diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/IndexBalanceConstraintSettings.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/IndexBalanceConstraintSettings.java new file mode 100644 index 0000000000000..775ecce8f4774 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/IndexBalanceConstraintSettings.java @@ -0,0 +1,59 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.cluster.routing.allocation; + +import org.elasticsearch.common.settings.ClusterSettings; +import org.elasticsearch.common.settings.Setting; + +/** + * Settings definitions for the index shard count allocation decider and associated infrastructure + */ +public class IndexBalanceConstraintSettings { + + private static final String SETTING_PREFIX = "cluster.routing.allocation.index_balance_decider."; + + public static final Setting INDEX_BALANCE_DECIDER_ENABLED_SETTING = Setting.boolSetting( + SETTING_PREFIX + "enabled", + false, + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + + /** + * This setting permits nodes to host more than ideally balanced number of index shards. + * Maximum tolerated index shard count = ideal + skew_tolerance + * i.e. ideal = 4 shards, skew_tolerance = 1 + * maximum tolerated index shards = 4 + 1 = 5. + */ + public static final Setting INDEX_BALANCE_DECIDER_EXCESS_SHARDS = Setting.intSetting( + SETTING_PREFIX + "excess_shards", + 0, + 0, + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + + private volatile boolean deciderEnabled; + private volatile int excessShards; + + public IndexBalanceConstraintSettings(ClusterSettings clusterSettings) { + clusterSettings.initializeAndWatch(INDEX_BALANCE_DECIDER_ENABLED_SETTING, enabled -> this.deciderEnabled = enabled); + clusterSettings.initializeAndWatch(INDEX_BALANCE_DECIDER_EXCESS_SHARDS, value -> this.excessShards = value); + } + + public boolean isDeciderEnabled() { + return this.deciderEnabled; + } + + public int getExcessShards() { + return this.excessShards; + } + +} diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java index be7b5220ed333..d6cb27bdf088e 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java @@ -1581,6 +1581,11 @@ private boolean tryRelocateShard(ModelNode minNode, ModelNode maxNode, ProjectIn logger.trace("No shards of [{}] can relocate from [{}] to [{}]", idx, maxNode.getNodeId(), minNode.getNodeId()); return false; } + + // Visible for testing. + public RoutingAllocation getAllocation() { + return this.allocation; + } } public static class ModelNode implements Iterable { @@ -1824,7 +1829,8 @@ public WeightFunction getWeightFunction() { } } - record ProjectIndex(ProjectId project, String indexName) { + // Visible for testing. + public record ProjectIndex(ProjectId project, String indexName) { ProjectIndex(RoutingAllocation allocation, ShardRouting shard) { this(allocation.metadata().projectFor(shard.index()).id(), shard.getIndexName()); } diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/WeightFunction.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/WeightFunction.java index 5b58b1d022590..74d90ff87f441 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/WeightFunction.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/WeightFunction.java @@ -60,7 +60,8 @@ public WeightFunction(float shardBalance, float indexBalance, float writeLoadBal theta3 = diskUsageBalance / sum; } - float calculateNodeWeightWithIndex( + // Visible for testing + public float calculateNodeWeightWithIndex( BalancedShardsAllocator.Balancer balancer, BalancedShardsAllocator.ModelNode node, ProjectIndex index diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/FilterAllocationDecider.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/FilterAllocationDecider.java index a7f0aa3cea89f..91e3b42bbb46d 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/FilterAllocationDecider.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/FilterAllocationDecider.java @@ -62,9 +62,9 @@ public class FilterAllocationDecider extends AllocationDecider { public static final String NAME = "filter"; - private static final String CLUSTER_ROUTING_REQUIRE_GROUP_PREFIX = "cluster.routing.allocation.require"; - private static final String CLUSTER_ROUTING_INCLUDE_GROUP_PREFIX = "cluster.routing.allocation.include"; - private static final String CLUSTER_ROUTING_EXCLUDE_GROUP_PREFIX = "cluster.routing.allocation.exclude"; + public static final String CLUSTER_ROUTING_REQUIRE_GROUP_PREFIX = "cluster.routing.allocation.require"; + public static final String CLUSTER_ROUTING_INCLUDE_GROUP_PREFIX = "cluster.routing.allocation.include"; + public static final String CLUSTER_ROUTING_EXCLUDE_GROUP_PREFIX = "cluster.routing.allocation.exclude"; public static final Setting.AffixSetting> CLUSTER_ROUTING_REQUIRE_GROUP_SETTING = Setting.prefixKeySetting( CLUSTER_ROUTING_REQUIRE_GROUP_PREFIX + ".", diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/IndexBalanceAllocationDecider.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/IndexBalanceAllocationDecider.java new file mode 100644 index 0000000000000..289b003c1d08e --- /dev/null +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/IndexBalanceAllocationDecider.java @@ -0,0 +1,172 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.cluster.routing.allocation.decider; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.metadata.ProjectId; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodeFilters; +import org.elasticsearch.cluster.node.DiscoveryNodeRole; +import org.elasticsearch.cluster.routing.RoutingNode; +import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.allocation.IndexBalanceConstraintSettings; +import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; +import org.elasticsearch.common.settings.ClusterSettings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.core.Strings; +import org.elasticsearch.index.Index; + +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.elasticsearch.cluster.node.DiscoveryNodeFilters.OpType.AND; +import static org.elasticsearch.cluster.node.DiscoveryNodeFilters.OpType.OR; +import static org.elasticsearch.cluster.node.DiscoveryNodeRole.INDEX_ROLE; +import static org.elasticsearch.cluster.node.DiscoveryNodeRole.SEARCH_ROLE; +import static org.elasticsearch.cluster.routing.allocation.decider.FilterAllocationDecider.CLUSTER_ROUTING_EXCLUDE_GROUP_SETTING; +import static org.elasticsearch.cluster.routing.allocation.decider.FilterAllocationDecider.CLUSTER_ROUTING_INCLUDE_GROUP_SETTING; +import static org.elasticsearch.cluster.routing.allocation.decider.FilterAllocationDecider.CLUSTER_ROUTING_REQUIRE_GROUP_SETTING; + +/** + * For an index of n shards hosted by a cluster of m nodes, a node should not host + * significantly more than n / m shards. This allocation decider enforces this principle. + * This allocation decider excludes any nodes flagged for shutdown from consideration + * when computing optimal shard distributions. + */ +public class IndexBalanceAllocationDecider extends AllocationDecider { + + private static final Logger logger = LogManager.getLogger(IndexBalanceAllocationDecider.class); + private static final String EMPTY = ""; + + public static final String NAME = "index_balance"; + + private final IndexBalanceConstraintSettings indexBalanceConstraintSettings; + private final boolean isStateless; + + private volatile DiscoveryNodeFilters clusterRequireFilters; + private volatile DiscoveryNodeFilters clusterIncludeFilters; + private volatile DiscoveryNodeFilters clusterExcludeFilters; + + public IndexBalanceAllocationDecider(Settings settings, ClusterSettings clusterSettings) { + this.indexBalanceConstraintSettings = new IndexBalanceConstraintSettings(clusterSettings); + setClusterRequireFilters(CLUSTER_ROUTING_REQUIRE_GROUP_SETTING.getAsMap(settings)); + setClusterExcludeFilters(CLUSTER_ROUTING_EXCLUDE_GROUP_SETTING.getAsMap(settings)); + setClusterIncludeFilters(CLUSTER_ROUTING_INCLUDE_GROUP_SETTING.getAsMap(settings)); + clusterSettings.addAffixMapUpdateConsumer(CLUSTER_ROUTING_REQUIRE_GROUP_SETTING, this::setClusterRequireFilters, (a, b) -> {}); + clusterSettings.addAffixMapUpdateConsumer(CLUSTER_ROUTING_EXCLUDE_GROUP_SETTING, this::setClusterExcludeFilters, (a, b) -> {}); + clusterSettings.addAffixMapUpdateConsumer(CLUSTER_ROUTING_INCLUDE_GROUP_SETTING, this::setClusterIncludeFilters, (a, b) -> {}); + isStateless = DiscoveryNode.isStateless(settings); + } + + @Override + public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) { + if (indexBalanceConstraintSettings.isDeciderEnabled() == false || isStateless == false || hasFilters()) { + return allocation.decision(Decision.YES, NAME, "Decider is disabled."); + } + + Index index = shardRouting.index(); + if (node.hasIndex(index) == false) { + return allocation.decision(Decision.YES, NAME, "Node does not currently host this index."); + } + + assert node.node() != null; + assert node.node().getRoles().contains(INDEX_ROLE) || node.node().getRoles().contains(SEARCH_ROLE); + + if (node.node().getRoles().contains(INDEX_ROLE) && shardRouting.primary() == false) { + return allocation.decision(Decision.YES, NAME, "An index node cannot own search shards. Decider inactive."); + } + + if (node.node().getRoles().contains(SEARCH_ROLE) && shardRouting.primary()) { + return allocation.decision(Decision.YES, NAME, "A search node cannot own primary shards. Decider inactive."); + } + + final ProjectId projectId = allocation.getClusterState().metadata().projectFor(index).id(); + final Set eligibleNodes = new HashSet<>(); + int totalShards = 0; + String nomenclature = EMPTY; + + if (node.node().getRoles().contains(INDEX_ROLE)) { + collectEligibleNodes(allocation, eligibleNodes, INDEX_ROLE); + // Primary shards only. + totalShards = allocation.getClusterState().routingTable(projectId).index(index).size(); + nomenclature = "index"; + } else if (node.node().getRoles().contains(SEARCH_ROLE)) { + collectEligibleNodes(allocation, eligibleNodes, SEARCH_ROLE); + // Replicas only. + final IndexMetadata indexMetadata = allocation.getClusterState().metadata().getProject(projectId).index(index); + totalShards = indexMetadata.getNumberOfShards() * indexMetadata.getNumberOfReplicas(); + nomenclature = "search"; + } + + assert eligibleNodes.isEmpty() == false; + if (eligibleNodes.isEmpty()) { + return allocation.decision(Decision.YES, NAME, "There are no eligible nodes available."); + } + assert totalShards > 0; + final double idealAllocation = Math.ceil((double) totalShards / eligibleNodes.size()); + + // Adding the excess shards before division ensures that with tolerance 1 we get: + // 2 shards, 2 nodes, allow 2 on each + // 3 shards, 2 nodes, allow 2 on each etc. + final int threshold = Math.ceilDiv(totalShards + indexBalanceConstraintSettings.getExcessShards(), eligibleNodes.size()); + final int currentAllocation = node.numberOfOwningShardsForIndex(index); + + if (currentAllocation >= threshold) { + String explanation = Strings.format( + "There are [%d] eligible nodes in the [%s] tier for assignment of [%d] shards in index [%s]. Ideally no more than [%.0f] " + + "shard would be assigned per node (the index balance excess shards setting is [%d]). This node is already assigned" + + " [%d] shards of the index.", + eligibleNodes.size(), + nomenclature, + totalShards, + index, + idealAllocation, + indexBalanceConstraintSettings.getExcessShards(), + currentAllocation + ); + + logger.trace(explanation); + + return allocation.decision(Decision.NOT_PREFERRED, NAME, explanation); + } + + return allocation.decision(Decision.YES, NAME, "Node index shard allocation is under the threshold."); + } + + private void collectEligibleNodes(RoutingAllocation allocation, Set eligibleNodes, DiscoveryNodeRole role) { + for (DiscoveryNode discoveryNode : allocation.nodes()) { + if (discoveryNode.getRoles().contains(role) && allocation.metadata().nodeShutdowns().contains(discoveryNode.getId()) == false) { + eligibleNodes.add(discoveryNode); + } + } + } + + private void setClusterRequireFilters(Map> filters) { + clusterRequireFilters = DiscoveryNodeFilters.trimTier(DiscoveryNodeFilters.buildFromKeyValues(AND, filters)); + } + + private void setClusterIncludeFilters(Map> filters) { + clusterIncludeFilters = DiscoveryNodeFilters.trimTier(DiscoveryNodeFilters.buildFromKeyValues(OR, filters)); + } + + private void setClusterExcludeFilters(Map> filters) { + clusterExcludeFilters = DiscoveryNodeFilters.trimTier(DiscoveryNodeFilters.buildFromKeyValues(OR, filters)); + } + + private boolean hasFilters() { + return (clusterExcludeFilters != null && clusterExcludeFilters.hasFilters()) + || (clusterIncludeFilters != null && clusterIncludeFilters.hasFilters()) + || (clusterRequireFilters != null && clusterRequireFilters.hasFilters()); + } +} diff --git a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java index 5ad7c2af94c81..3c73cd0e7b462 100644 --- a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java @@ -46,6 +46,7 @@ import org.elasticsearch.cluster.routing.OperationRouting; import org.elasticsearch.cluster.routing.allocation.DataTier; import org.elasticsearch.cluster.routing.allocation.DiskThresholdSettings; +import org.elasticsearch.cluster.routing.allocation.IndexBalanceConstraintSettings; import org.elasticsearch.cluster.routing.allocation.WriteLoadConstraintSettings; import org.elasticsearch.cluster.routing.allocation.allocator.AllocationBalancingRoundSummaryService; import org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator; @@ -659,6 +660,8 @@ public void apply(Settings value, Settings current, Settings previous) { WriteLoadConstraintSettings.WRITE_LOAD_DECIDER_HIGH_UTILIZATION_DURATION_SETTING, WriteLoadConstraintSettings.WRITE_LOAD_DECIDER_QUEUE_LATENCY_THRESHOLD_SETTING, WriteLoadConstraintSettings.WRITE_LOAD_DECIDER_REROUTE_INTERVAL_SETTING, + IndexBalanceConstraintSettings.INDEX_BALANCE_DECIDER_ENABLED_SETTING, + IndexBalanceConstraintSettings.INDEX_BALANCE_DECIDER_EXCESS_SHARDS, WriteLoadConstraintSettings.WRITE_LOAD_DECIDER_MINIMUM_LOGGING_INTERVAL, SamplingService.TTL_POLL_INTERVAL_SETTING, BlobStoreRepository.MAX_HEAP_SIZE_FOR_SNAPSHOT_DELETION_SETTING, diff --git a/server/src/test/java/org/elasticsearch/cluster/ClusterModuleTests.java b/server/src/test/java/org/elasticsearch/cluster/ClusterModuleTests.java index f63fe302004eb..3c2b0a263fa0f 100644 --- a/server/src/test/java/org/elasticsearch/cluster/ClusterModuleTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/ClusterModuleTests.java @@ -23,6 +23,7 @@ import org.elasticsearch.cluster.routing.allocation.decider.DiskThresholdDecider; import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider; import org.elasticsearch.cluster.routing.allocation.decider.FilterAllocationDecider; +import org.elasticsearch.cluster.routing.allocation.decider.IndexBalanceAllocationDecider; import org.elasticsearch.cluster.routing.allocation.decider.IndexVersionAllocationDecider; import org.elasticsearch.cluster.routing.allocation.decider.MaxRetryAllocationDecider; import org.elasticsearch.cluster.routing.allocation.decider.NodeReplacementAllocationDecider; @@ -286,7 +287,8 @@ public void testAllocationDeciderOrder() { DiskThresholdDecider.class, ThrottlingAllocationDecider.class, ShardsLimitAllocationDecider.class, - AwarenessAllocationDecider.class + AwarenessAllocationDecider.class, + IndexBalanceAllocationDecider.class ); Collection deciders = ClusterModule.createAllocationDeciders( Settings.EMPTY, diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocatorTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocatorTests.java index 6386b56759dcf..49e90245561ce 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocatorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocatorTests.java @@ -1277,7 +1277,7 @@ private static class NodeNameDrivenWeightFunction extends WeightFunction { } @Override - float calculateNodeWeightWithIndex( + public float calculateNodeWeightWithIndex( BalancedShardsAllocator.Balancer balancer, BalancedShardsAllocator.ModelNode node, BalancedShardsAllocator.ProjectIndex index diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/IndexBalanceAllocationDeciderTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/IndexBalanceAllocationDeciderTests.java new file mode 100644 index 0000000000000..aa9324bfaf1e2 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/IndexBalanceAllocationDeciderTests.java @@ -0,0 +1,365 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.cluster.routing.allocation.decider; + +import org.elasticsearch.cluster.ClusterInfo; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ESAllocationTestCase; +import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.metadata.Metadata; +import org.elasticsearch.cluster.metadata.ProjectId; +import org.elasticsearch.cluster.metadata.ProjectMetadata; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodeRole; +import org.elasticsearch.cluster.node.DiscoveryNodeUtils; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.routing.GlobalRoutingTable; +import org.elasticsearch.cluster.routing.IndexRoutingTable; +import org.elasticsearch.cluster.routing.IndexShardRoutingTable; +import org.elasticsearch.cluster.routing.RoutingNode; +import org.elasticsearch.cluster.routing.RoutingNodesHelper; +import org.elasticsearch.cluster.routing.RoutingTable; +import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.ShardRoutingState; +import org.elasticsearch.cluster.routing.TestShardRouting; +import org.elasticsearch.cluster.routing.allocation.IndexBalanceConstraintSettings; +import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.IndexVersion; +import org.elasticsearch.index.shard.IndexLongFieldRange; +import org.elasticsearch.index.shard.ShardId; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_CREATION_DATE; +import static org.elasticsearch.cluster.routing.TestShardRouting.shardRoutingBuilder; +import static org.elasticsearch.cluster.routing.allocation.decider.FilterAllocationDecider.CLUSTER_ROUTING_EXCLUDE_GROUP_PREFIX; +import static org.elasticsearch.cluster.routing.allocation.decider.FilterAllocationDecider.CLUSTER_ROUTING_INCLUDE_GROUP_PREFIX; +import static org.elasticsearch.cluster.routing.allocation.decider.FilterAllocationDecider.CLUSTER_ROUTING_REQUIRE_GROUP_PREFIX; +import static org.elasticsearch.common.settings.ClusterSettings.createBuiltInClusterSettings; + +public class IndexBalanceAllocationDeciderTests extends ESAllocationTestCase { + + public static final String INCLUDE_DISCOVERY_NODE_FILTERS = "include.discovery.node.filters"; + public static final String ALLOW_EXCESS_SHARDS = "allow.excess.shards"; + + private DiscoveryNode indexNodeOne; + private DiscoveryNode indexNodeTwo; + private DiscoveryNode searchNodeOne; + private DiscoveryNode searchNodeTwo; + private DiscoveryNode masterNode; + private DiscoveryNode machineLearningNode; + + private RoutingNode routingIndexNodeOne; + private RoutingNode routingIndexNodeTwo; + private RoutingNode routingSearchNodeOne; + private RoutingNode routingSearchNodeTwo; + private RoutingNode routingMachineLearningNode; + + private List allNodes; + private int numberOfPrimaryShards; + private int replicationFactor; + private ClusterState clusterState; + private IndexMetadata indexMetadata; + private RoutingAllocation routingAllocation; + private IndexBalanceAllocationDecider indexBalanceAllocationDecider; + private int excessShards; + private ShardRouting indexTierShardRouting; + private ShardRouting searchTierShardRouting; + private List indexTier; + private List searchIier; + + private void setup(Settings settings) { + boolean hasDiscoveryNodeFilters = settings.getAsBoolean(INCLUDE_DISCOVERY_NODE_FILTERS, true); + boolean allowExcessShards = settings.getAsBoolean(ALLOW_EXCESS_SHARDS, true); + + final String indexName = "IndexBalanceAllocationDeciderIndex"; + final Map> nodeToShardRoutings = new HashMap<>(); + + excessShards = allowExcessShards ? randomIntBetween(1, 5) : 0; + + Settings.Builder builder = Settings.builder() + .put("stateless.enabled", "true") + .put(IndexBalanceConstraintSettings.INDEX_BALANCE_DECIDER_ENABLED_SETTING.getKey(), "true") + .put(IndexBalanceConstraintSettings.INDEX_BALANCE_DECIDER_EXCESS_SHARDS.getKey(), excessShards); + + numberOfPrimaryShards = randomIntBetween(2, 10) * 2; + replicationFactor = 2; + + indexNodeOne = DiscoveryNodeUtils.builder("indexNodeOne").roles(Collections.singleton(DiscoveryNodeRole.INDEX_ROLE)).build(); + indexNodeTwo = DiscoveryNodeUtils.builder("indexNodeTwo").roles(Collections.singleton(DiscoveryNodeRole.INDEX_ROLE)).build(); + searchNodeOne = DiscoveryNodeUtils.builder("searchNodeOne").roles(Collections.singleton(DiscoveryNodeRole.SEARCH_ROLE)).build(); + searchNodeTwo = DiscoveryNodeUtils.builder("searchNodeTwo").roles(Collections.singleton(DiscoveryNodeRole.SEARCH_ROLE)).build(); + masterNode = DiscoveryNodeUtils.builder("masterNode").roles(Collections.singleton(DiscoveryNodeRole.MASTER_ROLE)).build(); + machineLearningNode = DiscoveryNodeUtils.builder("machineLearningNode") + .roles(Collections.singleton(DiscoveryNodeRole.ML_ROLE)) + .build(); + allNodes = List.of(indexNodeOne, indexNodeTwo, searchNodeOne, searchNodeTwo, masterNode, machineLearningNode); + + if (hasDiscoveryNodeFilters) { + String setting = randomFrom( + CLUSTER_ROUTING_REQUIRE_GROUP_PREFIX, + CLUSTER_ROUTING_INCLUDE_GROUP_PREFIX, + CLUSTER_ROUTING_EXCLUDE_GROUP_PREFIX + ); + String attribute = randomFrom("_value", "name"); + String name = randomFrom("indexNodeOne", "indexNodeTwo", "searchNodeOne", "searchNodeTwo"); + String ip = randomFrom("192.168.0.1", "192.168.0.2", "192.168.7.1", "10.17.0.1"); + builder.put(setting + "." + attribute, attribute.equals("name") ? name : ip); + } + + DiscoveryNodes.Builder discoveryNodeBuilder = DiscoveryNodes.builder(); + for (DiscoveryNode node : allNodes) { + discoveryNodeBuilder.add(node); + } + + ProjectId projectId = ProjectId.fromId("test-IndexBalanceAllocationDecider"); + ClusterState.Builder state = ClusterState.builder(new ClusterName("test-IndexBalanceAllocationDecider")); + + final ProjectMetadata.Builder projectBuilder = ProjectMetadata.builder(projectId); + + indexMetadata = IndexMetadata.builder(indexName) + .settings( + indexSettings(IndexVersion.current(), numberOfPrimaryShards, replicationFactor).put( + SETTING_CREATION_DATE, + System.currentTimeMillis() + ).build() + ) + .timestampRange(IndexLongFieldRange.UNKNOWN) + .eventIngestedRange(IndexLongFieldRange.UNKNOWN) + .build(); + + projectBuilder.put(indexMetadata, false); + IndexRoutingTable.Builder indexRoutingTableBuilder = IndexRoutingTable.builder(indexMetadata.getIndex()); + + RoutingTable.Builder routingTableBuilder = RoutingTable.builder(); + Metadata.Builder metadataBuilder = Metadata.builder(); + + ShardId[] shardIds = new ShardId[numberOfPrimaryShards]; + for (int i = 0; i < numberOfPrimaryShards; i++) { + shardIds[i] = new ShardId(indexMetadata.getIndex(), i); + IndexShardRoutingTable.Builder indexShardRoutingBuilder = IndexShardRoutingTable.builder(shardIds[i]); + + DiscoveryNode indexNode = i % 2 == 0 ? indexNodeOne : indexNodeTwo; + ShardRouting primaryShardRouting = TestShardRouting.newShardRouting( + shardIds[i], + indexNode.getId(), + null, + true, + ShardRoutingState.STARTED + ); + indexShardRoutingBuilder.addShard(primaryShardRouting); + nodeToShardRoutings.putIfAbsent(indexNode, new ArrayList<>()); + nodeToShardRoutings.get(indexNode).add(primaryShardRouting); + + for (int j = 1; j <= replicationFactor; j++) { + DiscoveryNode searchNode = j % 2 == 0 ? searchNodeOne : searchNodeTwo; + ShardRouting replicaShardRouting = shardRoutingBuilder(shardIds[i], searchNode.getId(), false, ShardRoutingState.STARTED) + .withRole(ShardRouting.Role.DEFAULT) + .build(); + indexShardRoutingBuilder.addShard(replicaShardRouting); + nodeToShardRoutings.putIfAbsent(searchNode, new ArrayList<>()); + nodeToShardRoutings.get(searchNode).add(replicaShardRouting); + } + indexRoutingTableBuilder.addIndexShard(indexShardRoutingBuilder); + routingTableBuilder.add(indexRoutingTableBuilder.build()); + } + + routingMachineLearningNode = RoutingNodesHelper.routingNode(machineLearningNode.getId(), machineLearningNode); + metadataBuilder.put(projectBuilder).generateClusterUuidIfNeeded(); + state.nodes(discoveryNodeBuilder); + state.metadata(metadataBuilder); + state.routingTable(GlobalRoutingTable.builder().put(projectId, routingTableBuilder).build()); + clusterState = state.build(); + + routingIndexNodeOne = RoutingNodesHelper.routingNode( + indexNodeOne.getId(), + indexNodeOne, + nodeToShardRoutings.get(indexNodeOne).toArray(new ShardRouting[0]) + ); + routingIndexNodeTwo = RoutingNodesHelper.routingNode( + indexNodeTwo.getId(), + indexNodeTwo, + nodeToShardRoutings.get(indexNodeTwo).toArray(new ShardRouting[0]) + ); + routingSearchNodeOne = RoutingNodesHelper.routingNode( + searchNodeOne.getId(), + searchNodeOne, + nodeToShardRoutings.get(searchNodeOne).toArray(new ShardRouting[0]) + ); + routingSearchNodeTwo = RoutingNodesHelper.routingNode( + searchNodeTwo.getId(), + searchNodeTwo, + nodeToShardRoutings.get(searchNodeTwo).toArray(new ShardRouting[0]) + ); + + ClusterInfo clusterInfo = ClusterInfo.builder().build(); + routingAllocation = new RoutingAllocation(null, clusterState.getRoutingNodes(), clusterState, clusterInfo, null, System.nanoTime()); + routingAllocation.setDebugMode(RoutingAllocation.DebugMode.ON); + + indexBalanceAllocationDecider = new IndexBalanceAllocationDecider(builder.build(), createBuiltInClusterSettings(builder.build())); + + indexTierShardRouting = TestShardRouting.newShardRouting( + new ShardId(indexMetadata.getIndex(), 1), + randomFrom(indexNodeOne, indexNodeTwo).getId(), + null, + true, + ShardRoutingState.STARTED + ); + + searchTierShardRouting = TestShardRouting.newShardRouting( + new ShardId(indexMetadata.getIndex(), 1), + randomFrom(searchNodeOne, searchNodeTwo).getId(), + null, + false, + ShardRoutingState.STARTED + ); + + indexTier = List.of(routingIndexNodeOne, routingIndexNodeTwo); + searchIier = List.of(routingSearchNodeOne, routingSearchNodeTwo); + } + + public void testCanAllocateUnderThresholdWithExcessShards() { + Settings testSettings = Settings.builder().put(INCLUDE_DISCOVERY_NODE_FILTERS, false).put(ALLOW_EXCESS_SHARDS, true).build(); + setup(testSettings); + ShardRouting newIndexShardRouting = TestShardRouting.newShardRouting( + new ShardId("newIndex", "uuid", 1), + indexNodeTwo.getId(), + null, + true, + ShardRoutingState.STARTED + ); + + for (RoutingNode routingNode : List.of( + routingIndexNodeOne, + routingIndexNodeTwo, + routingSearchNodeOne, + routingSearchNodeTwo, + routingMachineLearningNode + )) { + assertDecisionMatches( + "Assigning a new index to a node should succeed", + indexBalanceAllocationDecider.canAllocate(newIndexShardRouting, routingNode, routingAllocation), + Decision.Type.YES, + "Node does not currently host this index." + ); + } + + for (RoutingNode routingNode : searchIier) { + assertDecisionMatches( + "Assigning a new primary shard to a search tier node should succeed", + indexBalanceAllocationDecider.canAllocate(indexTierShardRouting, routingNode, routingAllocation), + Decision.Type.YES, + "A search node cannot own primary shards. Decider inactive." + ); + } + + for (RoutingNode routingNode : indexTier) { + assertDecisionMatches( + "Assigning a replica shard to a index tier node should succeed", + indexBalanceAllocationDecider.canAllocate(searchTierShardRouting, routingNode, routingAllocation), + Decision.Type.YES, + "An index node cannot own search shards. Decider inactive." + ); + } + + verifyCanAllocate(); + } + + private void verifyCanAllocate() { + for (RoutingNode routingNode : indexTier) { + assertDecisionMatches( + "Assigning an additional primary shard to an index node has capacity should succeed", + indexBalanceAllocationDecider.canAllocate(indexTierShardRouting, routingNode, routingAllocation), + Decision.Type.YES, + "Node index shard allocation is under the threshold." + ); + } + + for (RoutingNode routingNode : searchIier) { + assertDecisionMatches( + "Assigning an additional replica shard to an search node has capacity should succeed", + indexBalanceAllocationDecider.canAllocate(searchTierShardRouting, routingNode, routingAllocation), + Decision.Type.YES, + "Node index shard allocation is under the threshold." + ); + } + } + + public void testCanAllocateExceedThreshold() { + Settings testSettings = Settings.builder().put(INCLUDE_DISCOVERY_NODE_FILTERS, false).put(ALLOW_EXCESS_SHARDS, false).build(); + setup(testSettings); + + int ideal = numberOfPrimaryShards / 2; + int current = numberOfPrimaryShards / 2; + + assertDecisionMatches( + "Assigning an additional primary shard to an index node at capacity should fail", + indexBalanceAllocationDecider.canAllocate(indexTierShardRouting, routingIndexNodeOne, routingAllocation), + Decision.Type.NOT_PREFERRED, + "There are [2] eligible nodes in the [index] tier for assignment of [" + + numberOfPrimaryShards + + "] shards in index [[IndexBalanceAllocationDeciderIndex]]. Ideally no more than [" + + ideal + + "] shard would be assigned per node (the index balance excess shards setting is [0]). This node is already assigned [" + + current + + "] shards of the index." + ); + + int total = numberOfPrimaryShards * replicationFactor; + ideal = numberOfPrimaryShards * replicationFactor / 2; + current = numberOfPrimaryShards * replicationFactor / 2; + + assertDecisionMatches( + "Assigning an additional replica shard to an replica node at capacity should fail", + indexBalanceAllocationDecider.canAllocate(searchTierShardRouting, routingSearchNodeOne, routingAllocation), + Decision.Type.NOT_PREFERRED, + "There are [2] eligible nodes in the [search] tier for assignment of [" + + total + + "] shards in index [[IndexBalanceAllocationDeciderIndex]]. Ideally no more than [" + + ideal + + "] shard would be assigned per node (the index balance excess shards setting is [0]). This node is already assigned [" + + current + + "] shards of the index." + ); + } + + public void testCanAllocateHasDiscoveryNodeFilters() { + Settings testSettings = Settings.builder() + .put(INCLUDE_DISCOVERY_NODE_FILTERS, true) + .put(ALLOW_EXCESS_SHARDS, randomBoolean()) + .build(); + setup(testSettings); + + for (RoutingNode routingNode : indexTier) { + assertDecisionMatches( + "Having DiscoveryNodeFilters disables this decider", + indexBalanceAllocationDecider.canAllocate(indexTierShardRouting, routingNode, routingAllocation), + Decision.Type.YES, + "Decider is disabled." + ); + } + + for (RoutingNode routingNode : searchIier) { + assertDecisionMatches( + "Having DiscoveryNodeFilters disables this decider", + indexBalanceAllocationDecider.canAllocate(searchTierShardRouting, routingNode, routingAllocation), + Decision.Type.YES, + "Decider is disabled." + ); + } + } + +} diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDeciderTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDeciderTests.java index adbf5e6bac9cd..324292a13cf8f 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDeciderTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDeciderTests.java @@ -29,8 +29,6 @@ import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; import org.elasticsearch.cluster.routing.allocation.WriteLoadConstraintSettings; import org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator; -import org.elasticsearch.common.Strings; -import org.elasticsearch.common.regex.Regex; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.core.TimeValue; @@ -371,18 +369,6 @@ public void testWriteLoadDeciderShouldPreventBalancerMovingShardsBack() { assertEquals(3, routingAllocation.routingNodes().node(otherNode.getId()).numberOfOwningShards()); } - private void assertDecisionMatches(String description, Decision decision, Decision.Type type, String explanationPattern) { - assertEquals(description, type, decision.type()); - if (explanationPattern == null) { - assertNull(decision.getExplanation()); - } else { - assertTrue( - Strings.format("Expected: \"%s\", got \"%s\"", explanationPattern, decision.getExplanation()), - Regex.simpleMatch(explanationPattern, decision.getExplanation()) - ); - } - } - /** * Carries all the cluster state objects needed for testing after {@link #createClusterStateAndRoutingAllocation} sets them up. */ diff --git a/test/framework/src/main/java/org/elasticsearch/cluster/ESAllocationTestCase.java b/test/framework/src/main/java/org/elasticsearch/cluster/ESAllocationTestCase.java index 34fc441389137..f0f25e98f429a 100644 --- a/test/framework/src/main/java/org/elasticsearch/cluster/ESAllocationTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/cluster/ESAllocationTestCase.java @@ -44,6 +44,7 @@ import org.elasticsearch.cluster.routing.allocation.decider.Decision; import org.elasticsearch.cluster.routing.allocation.decider.SameShardAllocationDecider; import org.elasticsearch.cluster.service.ClusterApplierService; +import org.elasticsearch.common.regex.Regex; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.DeterministicTaskQueue; @@ -403,6 +404,18 @@ public static ClusterState reroute(AllocationService allocationService, ClusterS return result; } + public static void assertDecisionMatches(String description, Decision decision, Decision.Type type, String explanationPattern) { + assertEquals(description, type, decision.type()); + if (explanationPattern == null) { + assertNull(decision.getExplanation()); + } else { + assertTrue( + org.elasticsearch.common.Strings.format("Expected: \"%s\", got \"%s\"", explanationPattern, decision.getExplanation()), + Regex.simpleMatch(explanationPattern, decision.getExplanation()) + ); + } + } + public static class TestAllocateDecision extends AllocationDecider { private final Decision decision;