-
Notifications
You must be signed in to change notification settings - Fork 25.6k
Allocation: introduce a new decider that balances the index shard count among nodes #135875
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
8d4d036
19dea87
f1bff0d
3016a65
25b5bca
7913808
b0e7186
0d79752
5fc01c8
6d95721
f4610f3
faf8187
5212d02
264a5ab
11aaace
0365a62
4a55cce
760b878
eb32f72
81cf332
6cc0845
ab2bc00
45c11b6
3a9f656
206f215
850a7e7
25d467d
ff29c07
aabb099
b54975c
79e1805
2fb51c3
304c4ce
d14661e
277e4d6
dc8ac5b
f69bc6f
868fc33
5fc5c6c
be83b88
09f2602
7d6da1f
ef43160
ff666be
27c5f8a
9358387
e66278d
b95a7ba
d45908f
83c6101
a6bb01f
67105f2
5f59868
a21e2d6
7057bf0
aebd924
89aea86
292c550
1e0b9f8
14c50e8
fdc69c7
4f83a9e
e0db4ac
3314a01
f6a1500
c2a0d75
12b3e27
937bdad
aba5dcf
2693b32
0860cca
76688d6
333331e
03e5d1b
d75c2cf
a902ede
3e52b29
d8911aa
b081af2
608d5c0
7451a90
21c403c
e7671a8
f1eb4f0
de4cb7e
902a8d8
ee3739e
06c1e0e
bf230b4
25b656e
f96e988
f5e8a09
dc08221
cbe8a5b
27bcff8
350c2db
4d56505
ba6db1b
68fb74b
cdd2ecc
406c6ea
1abb626
e9d0d32
09d8714
f56aa5f
719b8fb
7fe2155
da55db3
c43babe
e2f576d
4526700
4fd29ee
860d514
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,59 @@ | ||
| /* | ||
| * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
| * or more contributor license agreements. Licensed under the "Elastic License | ||
| * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side | ||
| * Public License v 1"; you may not use this file except in compliance with, at | ||
| * your election, the "Elastic License 2.0", the "GNU Affero General Public | ||
| * License v3.0 only", or the "Server Side Public License, v 1". | ||
| */ | ||
|
|
||
| package org.elasticsearch.cluster.routing.allocation; | ||
|
|
||
| import org.elasticsearch.common.settings.ClusterSettings; | ||
| import org.elasticsearch.common.settings.Setting; | ||
|
|
||
| /** | ||
| * Settings definitions for the index shard count allocation decider and associated infrastructure | ||
| */ | ||
| public class IndexBalanceConstraintSettings { | ||
|
|
||
| private static final String SETTING_PREFIX = "cluster.routing.allocation.index_balance_decider."; | ||
|
|
||
| public static final Setting<Boolean> INDEX_BALANCE_DECIDER_ENABLED_SETTING = Setting.boolSetting( | ||
| SETTING_PREFIX + "enabled", | ||
| false, | ||
| Setting.Property.Dynamic, | ||
| Setting.Property.NodeScope | ||
| ); | ||
|
|
||
| /** | ||
| * This setting permits nodes to host more than ideally balanced number of index shards. | ||
| * Maximum tolerated index shard count = ideal + skew_tolerance | ||
| * i.e. ideal = 4 shards, skew_tolerance = 1 | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. replace 'skew' in comment
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am inclined to keep this unchanged see skew tolerance is synonymous to excess shards. |
||
| * maximum tolerated index shards = 4 + 1 = 5. | ||
| */ | ||
| public static final Setting<Integer> INDEX_BALANCE_DECIDER_EXCESS_SHARDS = Setting.intSetting( | ||
| SETTING_PREFIX + "excess_shards", | ||
| 0, | ||
| 0, | ||
| Setting.Property.Dynamic, | ||
| Setting.Property.NodeScope | ||
| ); | ||
|
|
||
| private volatile boolean deciderEnabled; | ||
| private volatile int excessShards; | ||
|
|
||
| public IndexBalanceConstraintSettings(ClusterSettings clusterSettings) { | ||
| clusterSettings.initializeAndWatch(INDEX_BALANCE_DECIDER_ENABLED_SETTING, enabled -> this.deciderEnabled = enabled); | ||
| clusterSettings.initializeAndWatch(INDEX_BALANCE_DECIDER_EXCESS_SHARDS, value -> this.excessShards = value); | ||
| } | ||
|
|
||
| public boolean isDeciderEnabled() { | ||
| return this.deciderEnabled; | ||
| } | ||
|
|
||
| public int getExcessShards() { | ||
| return this.excessShards; | ||
| } | ||
|
|
||
| } | ||
| Original file line number | Diff line number | Diff line change | ||||||
|---|---|---|---|---|---|---|---|---|
| @@ -0,0 +1,172 @@ | ||||||||
| /* | ||||||||
| * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||||||||
| * or more contributor license agreements. Licensed under the "Elastic License | ||||||||
| * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side | ||||||||
| * Public License v 1"; you may not use this file except in compliance with, at | ||||||||
| * your election, the "Elastic License 2.0", the "GNU Affero General Public | ||||||||
| * License v3.0 only", or the "Server Side Public License, v 1". | ||||||||
| */ | ||||||||
|
|
||||||||
| package org.elasticsearch.cluster.routing.allocation.decider; | ||||||||
|
|
||||||||
| import org.apache.logging.log4j.LogManager; | ||||||||
| import org.apache.logging.log4j.Logger; | ||||||||
| import org.elasticsearch.cluster.metadata.IndexMetadata; | ||||||||
| import org.elasticsearch.cluster.metadata.ProjectId; | ||||||||
| import org.elasticsearch.cluster.node.DiscoveryNode; | ||||||||
| import org.elasticsearch.cluster.node.DiscoveryNodeFilters; | ||||||||
| import org.elasticsearch.cluster.node.DiscoveryNodeRole; | ||||||||
| import org.elasticsearch.cluster.routing.RoutingNode; | ||||||||
| import org.elasticsearch.cluster.routing.ShardRouting; | ||||||||
| import org.elasticsearch.cluster.routing.allocation.IndexBalanceConstraintSettings; | ||||||||
| import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; | ||||||||
| import org.elasticsearch.common.settings.ClusterSettings; | ||||||||
| import org.elasticsearch.common.settings.Settings; | ||||||||
| import org.elasticsearch.core.Strings; | ||||||||
| import org.elasticsearch.index.Index; | ||||||||
|
|
||||||||
| import java.util.HashSet; | ||||||||
| import java.util.List; | ||||||||
| import java.util.Map; | ||||||||
| import java.util.Set; | ||||||||
|
|
||||||||
| import static org.elasticsearch.cluster.node.DiscoveryNodeFilters.OpType.AND; | ||||||||
| import static org.elasticsearch.cluster.node.DiscoveryNodeFilters.OpType.OR; | ||||||||
| import static org.elasticsearch.cluster.node.DiscoveryNodeRole.INDEX_ROLE; | ||||||||
| import static org.elasticsearch.cluster.node.DiscoveryNodeRole.SEARCH_ROLE; | ||||||||
| import static org.elasticsearch.cluster.routing.allocation.decider.FilterAllocationDecider.CLUSTER_ROUTING_EXCLUDE_GROUP_SETTING; | ||||||||
| import static org.elasticsearch.cluster.routing.allocation.decider.FilterAllocationDecider.CLUSTER_ROUTING_INCLUDE_GROUP_SETTING; | ||||||||
| import static org.elasticsearch.cluster.routing.allocation.decider.FilterAllocationDecider.CLUSTER_ROUTING_REQUIRE_GROUP_SETTING; | ||||||||
|
|
||||||||
| /** | ||||||||
| * For an index of n shards hosted by a cluster of m nodes, a node should not host | ||||||||
| * significantly more than n / m shards. This allocation decider enforces this principle. | ||||||||
| * This allocation decider excludes any nodes flagged for shutdown from consideration | ||||||||
| * when computing optimal shard distributions. | ||||||||
| */ | ||||||||
| public class IndexBalanceAllocationDecider extends AllocationDecider { | ||||||||
|
|
||||||||
| private static final Logger logger = LogManager.getLogger(IndexBalanceAllocationDecider.class); | ||||||||
| private static final String EMPTY = ""; | ||||||||
|
|
||||||||
| public static final String NAME = "index_balance"; | ||||||||
|
|
||||||||
| private final IndexBalanceConstraintSettings indexBalanceConstraintSettings; | ||||||||
| private final boolean isStateless; | ||||||||
|
|
||||||||
| private volatile DiscoveryNodeFilters clusterRequireFilters; | ||||||||
| private volatile DiscoveryNodeFilters clusterIncludeFilters; | ||||||||
| private volatile DiscoveryNodeFilters clusterExcludeFilters; | ||||||||
|
|
||||||||
| public IndexBalanceAllocationDecider(Settings settings, ClusterSettings clusterSettings) { | ||||||||
| this.indexBalanceConstraintSettings = new IndexBalanceConstraintSettings(clusterSettings); | ||||||||
| setClusterRequireFilters(CLUSTER_ROUTING_REQUIRE_GROUP_SETTING.getAsMap(settings)); | ||||||||
| setClusterExcludeFilters(CLUSTER_ROUTING_EXCLUDE_GROUP_SETTING.getAsMap(settings)); | ||||||||
zhubotang-wq marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||
| setClusterIncludeFilters(CLUSTER_ROUTING_INCLUDE_GROUP_SETTING.getAsMap(settings)); | ||||||||
| clusterSettings.addAffixMapUpdateConsumer(CLUSTER_ROUTING_REQUIRE_GROUP_SETTING, this::setClusterRequireFilters, (a, b) -> {}); | ||||||||
| clusterSettings.addAffixMapUpdateConsumer(CLUSTER_ROUTING_EXCLUDE_GROUP_SETTING, this::setClusterExcludeFilters, (a, b) -> {}); | ||||||||
| clusterSettings.addAffixMapUpdateConsumer(CLUSTER_ROUTING_INCLUDE_GROUP_SETTING, this::setClusterIncludeFilters, (a, b) -> {}); | ||||||||
| isStateless = DiscoveryNode.isStateless(settings); | ||||||||
| } | ||||||||
|
|
||||||||
| @Override | ||||||||
| public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) { | ||||||||
| if (indexBalanceConstraintSettings.isDeciderEnabled() == false || isStateless == false || hasFilters()) { | ||||||||
| return allocation.decision(Decision.YES, NAME, "Decider is disabled."); | ||||||||
| } | ||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If it's disabled for stateful, I wonder if we could just configure it to be added in the stateless plugin? Then we wouldn't need to check
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Also
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Refactored to remove double negative. Since there are follow up plans to add stateful logic as well as protracted nature of this iteration, I am inclined to keep its current placement. I completely agree that placing this in the stateless plugin is a much better choice than the current location. Since the decision to make this a stateless-only decider was made nearly 2 months ago, I’m a bit surprised this option didn’t come up earlier from previous dozens of comments. Maybe the reviewers could concentrate more on the architectural/logic aspects here — those will have a greater impact than the variable naming details. This would enable me to take this approach far earlier. |
||||||||
|
|
||||||||
| Index index = shardRouting.index(); | ||||||||
| if (node.hasIndex(index) == false) { | ||||||||
| return allocation.decision(Decision.YES, NAME, "Node does not currently host this index."); | ||||||||
| } | ||||||||
|
|
||||||||
| assert node.node() != null; | ||||||||
| assert node.node().getRoles().contains(INDEX_ROLE) || node.node().getRoles().contains(SEARCH_ROLE); | ||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am inclined to keep it unchanged. In testing this assert message is superfluous .
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If this assert were to fail (if there is a bug), it would not be clear what event caused the code to fail. I believe all the assert will currently tell you is that the expression was overall false. Supplying what type of node leaked through would make a potential test failure faster to debug, is the idea. Not a hill I'll die on, though. |
||||||||
|
|
||||||||
| if (node.node().getRoles().contains(INDEX_ROLE) && shardRouting.primary() == false) { | ||||||||
| return allocation.decision(Decision.YES, NAME, "An index node cannot own search shards. Decider inactive."); | ||||||||
| } | ||||||||
|
|
||||||||
| if (node.node().getRoles().contains(SEARCH_ROLE) && shardRouting.primary()) { | ||||||||
| return allocation.decision(Decision.YES, NAME, "A search node cannot own primary shards. Decider inactive."); | ||||||||
| } | ||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could we perhaps combine this into a single check like
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ack. In hindsight, the deciders ought to have been place in the stateless repo as part of the stateless plugin. Like I mentioned earlier. this feedback makes absolute sense since the At this stage, I am inclined to leave this refactoring to follow up PR when canRemain() is added. |
||||||||
|
|
||||||||
| final ProjectId projectId = allocation.getClusterState().metadata().projectFor(index).id(); | ||||||||
| final Set<DiscoveryNode> eligibleNodes = new HashSet<>(); | ||||||||
| int totalShards = 0; | ||||||||
| String nomenclature = EMPTY; | ||||||||
|
|
||||||||
| if (node.node().getRoles().contains(INDEX_ROLE)) { | ||||||||
| collectEligibleNodes(allocation, eligibleNodes, INDEX_ROLE); | ||||||||
| // Primary shards only. | ||||||||
| totalShards = allocation.getClusterState().routingTable(projectId).index(index).size(); | ||||||||
| nomenclature = "index"; | ||||||||
| } else if (node.node().getRoles().contains(SEARCH_ROLE)) { | ||||||||
| collectEligibleNodes(allocation, eligibleNodes, SEARCH_ROLE); | ||||||||
| // Replicas only. | ||||||||
| final IndexMetadata indexMetadata = allocation.getClusterState().metadata().getProject(projectId).index(index); | ||||||||
| totalShards = indexMetadata.getNumberOfShards() * indexMetadata.getNumberOfReplicas(); | ||||||||
| nomenclature = "search"; | ||||||||
| } | ||||||||
|
|
||||||||
| assert eligibleNodes.isEmpty() == false; | ||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We haven't already discussed this, have we? I'd think we'd want to exit early rather than assert here. We could say YES, and that there are no non-shutting down nodes to consider. We know that the node in question has a shard of the index in question. But what if that were the only index node in the cluster, and it's also shutting down?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If I’m remembering correctly, this assertion was introduced based on earlier review guidance. The new feedback seems to take a different position, and I’m concerned the shifting expectations may be introducing avoidable delays. Could we clarify the preferred direction?
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There have been multiple evolutions of this code, including changes to the logic in the method filtering down to the
In this case, the preferred direction is making the code robust against failure. I reviewed the code and this seems like an issue. It should be fixed if you agree it's possible, or otherwise discussed why it is not.
I could have been wrong in what I previously advised, I'm not sure. I do try to be correct. The best way to avoid this would be to try to make sure you understand why I recommend something: trust, but verify.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I wondered about this also, if the cluster were shutting down all nodes would be marked as shutting down and we'd get an empty array here right? |
||||||||
| if (eligibleNodes.isEmpty()) { | ||||||||
| return allocation.decision(Decision.YES, NAME, "There are no eligible nodes available."); | ||||||||
| } | ||||||||
| assert totalShards > 0; | ||||||||
| final double idealAllocation = Math.ceil((double) totalShards / eligibleNodes.size()); | ||||||||
|
|
||||||||
| // Adding the excess shards before division ensures that with tolerance 1 we get: | ||||||||
| // 2 shards, 2 nodes, allow 2 on each | ||||||||
| // 3 shards, 2 nodes, allow 2 on each etc. | ||||||||
| final int threshold = Math.ceilDiv(totalShards + indexBalanceConstraintSettings.getExcessShards(), eligibleNodes.size()); | ||||||||
| final int currentAllocation = node.numberOfOwningShardsForIndex(index); | ||||||||
|
|
||||||||
| if (currentAllocation >= threshold) { | ||||||||
| String explanation = Strings.format( | ||||||||
| "There are [%d] eligible nodes in the [%s] tier for assignment of [%d] shards in index [%s]. Ideally no more than [%.0f] " | ||||||||
| + "shard would be assigned per node (the index balance excess shards setting is [%d]). This node is already assigned" | ||||||||
| + " [%d] shards of the index.", | ||||||||
| eligibleNodes.size(), | ||||||||
| nomenclature, | ||||||||
| totalShards, | ||||||||
| index, | ||||||||
| idealAllocation, | ||||||||
| indexBalanceConstraintSettings.getExcessShards(), | ||||||||
| currentAllocation | ||||||||
| ); | ||||||||
DiannaHohensee marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||
|
|
||||||||
| logger.trace(explanation); | ||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should this be
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For this logging statement, 4 different reviewers offered different opinions. At this stage, I am inclined to leave it unchanged. |
||||||||
|
|
||||||||
| return allocation.decision(Decision.NOT_PREFERRED, NAME, explanation); | ||||||||
| } | ||||||||
|
|
||||||||
| return allocation.decision(Decision.YES, NAME, "Node index shard allocation is under the threshold."); | ||||||||
| } | ||||||||
|
|
||||||||
| private void collectEligibleNodes(RoutingAllocation allocation, Set<DiscoveryNode> eligibleNodes, DiscoveryNodeRole role) { | ||||||||
| for (DiscoveryNode discoveryNode : allocation.nodes()) { | ||||||||
| if (discoveryNode.getRoles().contains(role) && allocation.metadata().nodeShutdowns().contains(discoveryNode.getId()) == false) { | ||||||||
| eligibleNodes.add(discoveryNode); | ||||||||
| } | ||||||||
| } | ||||||||
| } | ||||||||
|
|
||||||||
| private void setClusterRequireFilters(Map<String, List<String>> filters) { | ||||||||
| clusterRequireFilters = DiscoveryNodeFilters.trimTier(DiscoveryNodeFilters.buildFromKeyValues(AND, filters)); | ||||||||
| } | ||||||||
|
|
||||||||
| private void setClusterIncludeFilters(Map<String, List<String>> filters) { | ||||||||
| clusterIncludeFilters = DiscoveryNodeFilters.trimTier(DiscoveryNodeFilters.buildFromKeyValues(OR, filters)); | ||||||||
| } | ||||||||
|
|
||||||||
| private void setClusterExcludeFilters(Map<String, List<String>> filters) { | ||||||||
| clusterExcludeFilters = DiscoveryNodeFilters.trimTier(DiscoveryNodeFilters.buildFromKeyValues(OR, filters)); | ||||||||
| } | ||||||||
|
|
||||||||
| private boolean hasFilters() { | ||||||||
| return (clusterExcludeFilters != null && clusterExcludeFilters.hasFilters()) | ||||||||
| || (clusterIncludeFilters != null && clusterIncludeFilters.hasFilters()) | ||||||||
| || (clusterRequireFilters != null && clusterRequireFilters.hasFilters()); | ||||||||
| } | ||||||||
nicktindall marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||
| } | ||||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as suggested above, is this something we could add configure only in the serverless plugin?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ack. My previous replies agree this is a sound advice. At this stage, I am inclined to leave this refactoring to follow up iterations.