Skip to content

Conversation

@zhubotang-wq
Copy link
Contributor

@zhubotang-wq zhubotang-wq commented Oct 2, 2025

For a cluster with n data nodes to host an index with m shards, each node ideally should host not significantly more than m / n shards each. This new allocation decider acts on this principle to respond with a NOT_PREFERRED in event of a node being allocated more shards than threshold.

Nodes in shutdown are excluded when computing fair workload for nodes. A load skew tolerance setting is added to permit nodes to own more than ideal number of shards for the index.

Relates ES-12080

In a balanced allocation, for an index with n shards on a cluster
of m nodes, each node should host not significantly more than n / m
shards. This decider enforces this principle.
In a balanced allocation, for an index with n shards on a cluster
of m nodes, each node should host not significantly more than n / m
shards. This decider enforces this principle.
In a balanced allocation, for an index with n shards on a cluster
of m nodes, each node should host not significantly more than n / m
shards. This decider enforces this principle.
In a balanced allocation, for an index with n shards on a cluster
of m nodes, each node should host not significantly more than n / m
shards. This decider enforces this principle.
In a balanced allocation, for an index with n shards on a cluster
of m nodes, each node should host not significantly more than n / m
shards. This decider enforces this principle.
In a balanced allocation, for an index with n shards on a cluster
of m nodes, each node should host not significantly more than n / m
shards. This decider enforces this principle.
In a balanced allocation, for an index with n shards on a cluster
of m nodes, each node should host not significantly more than n / m
shards. This decider enforces this principle.
@zhubotang-wq zhubotang-wq requested a review from a team as a code owner October 2, 2025 20:11
@zhubotang-wq zhubotang-wq added >enhancement :Distributed Coordination/Allocation All issues relating to the decision making around placing a shard (both master logic & on the nodes) :Distributed Coordination/Distributed A catch all label for anything in the Distributed Coordination area. Please avoid if you can. labels Oct 2, 2025
@elasticsearchmachine
Copy link
Collaborator

Pinging @elastic/es-distributed-coordination (Team:Distributed Coordination)

@elasticsearchmachine elasticsearchmachine added Team:Distributed Coordination Meta label for Distributed Coordination team v9.3.0 labels Oct 2, 2025
@zhubotang-wq
Copy link
Contributor Author

This is still very much work in progress (needs additional unit tests consolidated). Published this PR prematurely to accelerate elicitation of feedbacks.

@elasticsearchmachine
Copy link
Collaborator

Hi @zhubotang-wq, I've created a changelog YAML for you.

Copy link
Contributor

@DiannaHohensee DiannaHohensee left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I took a first pass. I didn't have time to take a peek at the testing yet.

@zhubotang-wq zhubotang-wq removed the :Distributed Coordination/Distributed A catch all label for anything in the Distributed Coordination area. Please avoid if you can. label Oct 14, 2025
In a balanced allocation, for an index with n shards on a cluster
of m nodes, each node should host not significantly more than n / m
shards. This decider enforces this principle.
@zhubotang-wq
Copy link
Contributor Author

Noticed an issue during testing, in a perfected balanced cluster, if the cluster is requested to further split indexes, the index shard count balance decider needs to use the newly proposed (after split) intended shard counts to calculate fair workload. However, this new shard count is not available in Cluster State,

At least allocation.getClusterState().routingTable(ProjectId.DEFAULT).index(index).size()` still gives the old shard counts.

This creates a case where all nodes are asked to host additional shards, but all consider themselves already doing enough.

In the FullClusterRestartIT integration test case, when
client().performRequest(newXContentRequest(HttpMethod.PUT, "/" + index + "/_split/" + target, (builder, params) -> {

All nodes saying that they have done enough.

[2025-10-16T04:56:00,074][DEBUG][o.e.c.r.a.d.IndexShardCountAllocationDecider] [test-cluster-0] For index [[testresize_split/UqXRWqRfRNSrBFZ_Yhzqfw]] with [6] shards, Node [Ex1StRCOQwG_ghw3EJugoA] is expected to hold [3] shards for index [[testresize_split/UqXRWqRfRNSrBFZ_Yhzqfw]], based on the total of [2]
nodes available. The configured load skew tolerance is [1.50], which yields an allocation threshold of
Math.ceil([3] × [1.50]) = [5] shards. Currently, node [Ex1StRCOQwG_ghw3EJugoA] is assigned [5] shards of index [[testresize_split/UqXRWqRfRNSrBFZ_Yhzqfw]]. Therefore,
assigning additional shards is not preferred.

[2025-10-16T04:55:59,560][DEBUG][o.e.c.r.a.d.IndexShardCountAllocationDecider] [test-cluster-0] For index [[testresize_split/UqXRWqRfRNSrBFZ_Yhzqfw]] with [6] shards, Node [Ex1StRCOQwG_ghw3EJugoA] is expected to hold [3] shards for index [[testresize_split/UqXRWqRfRNSrBFZ_Yhzqfw]], based on the total of [2]
nodes available. The configured load skew tolerance is [1.50], which yields an allocation threshold of
Math.ceil([3] × [1.50]) = [5] shards. Currently, node [Ex1StRCOQwG_ghw3EJugoA] is assigned [5] shards of index [[testresize_split/UqXRWqRfRNSrBFZ_Yhzqfw]]. Therefore,
assigning additional shards is not preferred.

Need to make code changes so that in event of a split is requested, deciders should use the proposed new primary shard count.

@ywangd
Copy link
Member

ywangd commented Oct 16, 2025

It would be helpful if you coud share the buildscan link for the failed test so that we have a clear context. Without it, I assume the failure is from FullClusterRestartIT.testResize

this new shard count is not available in Cluster State

This is not true. The original index is created with 3 primaries and 1 replica. The error message you shared above says

For index [[testresize_split/UqXRWqRfRNSrBFZ_Yhzqfw]] with [6] shards,

Note the 6 shards actually means 6 primary shards, that is double the original number of 3 due to splitting. This is because the PR computes the number as

var totalShards = allocation.getClusterState().routingTable(ProjectId.DEFAULT).index(index).size();

This is counting the size of a IndexRoutingTable which is determined by its array of IndexShardRoutingTable. The IndexShardRoutingTable in turn manages all copies (primary and replicas) of the same shard. Therefore the total number of shards (primaries and replicas) should be IndexRoutingTable.size() * IndexShardRoutingTable.size() = 12. But you might as well use IndexMetadata.getTotalNumberOfShards() for it.

I think using total number of shards makes sense for stateful. But in stateless, we probably should differentiate between primaries and replicas since their corresponding nodes are two disjoint sets (not sure what's the plan for this PR, I can see this as a 2nd step).

Also, IIUC,the new decider does not return NO. So why would shards remain unassigned when it says NOT_PREFERRED. That does not sound right, especially for newly created shards.

Copy link
Contributor

@nicktindall nicktindall left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking good, a bunch of minor questions/comments from me

public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
if (indexBalanceConstraintSettings.isDeciderEnabled() == false || isStateless == false || hasNoFilters() == false) {
return Decision.single(Decision.Type.YES, NAME, "Decider is disabled.");
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it's disabled for stateful, I wonder if we could just configure it to be added in the stateless plugin? Then we wouldn't need to check isStateless every time.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also hasNoFilters() == false is harder to read than filtersAreConfigured() or similar. It's Friday here, I can't process a double-negative.

Copy link
Contributor Author

@zhubotang-wq zhubotang-wq Nov 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Refactored to remove double negative. Since there are follow up plans to add stateful logic as well as protracted nature of this iteration, I am inclined to keep its current placement.

I completely agree that placing this in the stateless plugin is a much better choice than the current location. Since the decision to make this a stateless-only decider was made nearly 2 months ago, I’m a bit surprised this option didn’t come up earlier from previous dozens of comments.

Maybe the reviewers could concentrate more on the architectural/logic aspects here — those will have a greater impact than the variable naming details.

This would enable me to take this approach far earlier.


if (node.node().getRoles().contains(SEARCH_ROLE) && shardRouting.primary()) {
return Decision.single(Decision.Type.YES, NAME, "A search node cannot own primary shards. Decider inactive.");
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we perhaps combine this into a single check like co.elastic.elasticsearch.stateless.allocation.StatelessAllocationDecider#canAllocateShardToNode so it doesn't distract from the focus of this decider? (lines 84-92 could be replaced by such a check maybe?)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack. In hindsight, the deciders ought to have been place in the stateless repo as part of the stateless plugin.

Like I mentioned earlier. this feedback makes absolute sense since the canAllocateShardToNode deals with the precise requirement here.

At this stage, I am inclined to leave this refactoring to follow up PR when canRemain() is added.


final double idealAllocation = Math.ceil((double) totalShards / eligibleNodes.size());
final int threshold = (totalShards + eligibleNodes.size() - 1 + indexBalanceConstraintSettings.getExcessShards()) / eligibleNodes
.size();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand this formula, what am I missing, does it deserve an explanation?

currentAllocation
);

logger.trace(explanation);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be logger.debug?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For this logging statement, 4 different reviewers offered different opinions. At this stage, I am inclined to leave it unchanged.

addAllocationDecider(deciders, new ThrottlingAllocationDecider(clusterSettings));
addAllocationDecider(deciders, new ShardsLimitAllocationDecider(clusterSettings));
addAllocationDecider(deciders, new AwarenessAllocationDecider(settings, clusterSettings));
addAllocationDecider(deciders, new IndexBalanceAllocationDecider(settings, clusterSettings));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as suggested above, is this something we could add configure only in the serverless plugin?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack. My previous replies agree this is a sound advice. At this stage, I am inclined to leave this refactoring to follow up iterations.

}

public void testCanAllocateUnderThresholdWithExcessShards() {
setup(false, true);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: I think this is hard to read, especially outside of an IDE (methods with consecutive boolean params like this). Can we just pass in the settings or similar so it's clearer the starting point just by looking at the test?, or even replace the booleans with enums or something.

Could make a settings builder or something to reduce repetition if you think its worthwhile.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree. Fixed to use settings and clearly named parameters.

nomenclature = "search";
}

assert eligibleNodes.isEmpty() == false;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wondered about this also, if the cluster were shutting down all nodes would be marked as shutting down and we'd get an empty array here right?

Copy link
Contributor

@nicktindall nicktindall left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm happy to approve once we're using allocation.decision, that's my only real comment that's not just opinion

private List<RoutingNode> indexTier;
private List<RoutingNode> searchIier;

private void setup(Settings settings) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't quite what I meant... what I was thinking is you could write utility methods for populating those settings e.g.

    public Settings addRandomFilterSetting(Settings settings) {
           String setting = randomFrom(
                CLUSTER_ROUTING_REQUIRE_GROUP_PREFIX,
                CLUSTER_ROUTING_INCLUDE_GROUP_PREFIX,
                CLUSTER_ROUTING_EXCLUDE_GROUP_PREFIX
            );
            String attribute = randomFrom("_value", "name");
            String name = randomFrom("indexNodeOne", "indexNodeTwo", "searchNodeOne", "searchNodeTwo");
            String ip = randomFrom("192.168.0.1", "192.168.0.2", "192.168.7.1", "10.17.0.1");
            Settings.builder().put(settings)
                .put(setting + "." + attribute, attribute.equals("name") ? name : ip)
                .build();
    }

    public Settings allowExcessShards(Settings settings) {
        int excessShards = randomIntBetween(1, 5);

        return Settings.builder()
            .put(settings)           
            .put(IndexBalanceConstraintSettings.INDEX_BALANCE_DECIDER_EXCESS_SHARDS.getKey(), excessShards);
            .build();
    }

Then you could do e.g.

     Settings settings = addRandomFilterSetting(Setting.EMPTY);
     settings = allowExcessShards(settings);
     setup(settings);

     //...

Then you just use the provided settings as a starting point for in the setup:

    Settings Settings.builder().put(settings)
            .put("stateless.enabled", "true")
            .put(IndexBalanceConstraintSettings.INDEX_BALANCE_DECIDER_ENABLED_SETTING.getKey(), "true")
        .build();

That way the configuration of excess shards/random filters is just done in the places you need them and in the test rather than the setup method?

Settings.builder().put(settings) makes a builder with all the settings in the passed settings object copied.

@zhubotang-wq
Copy link
Contributor Author

I'm happy to approve once we're using allocation.decision, that's my only real comment that's not just opinion

Made the following fixes.

  • Decision.Single has been replaced.
  • add defensive if block for eligibleNodes.isEmpty() , retained the assertion based on previous discussion.
  • add extra comments on
    final int threshold = (totalShards + eligibleNodes.size() - 1 + indexBalanceConstraintSettings.getExcessShards()) / 

eligibleNodes
.size();

  • Integer division simpler to reason with than double.
  • Add eligibleNodes.size() - 1 so that rounding down does not happen.
  • eligibleNodes.size() - 1 is not so much that no automatic extra 1 shard allowed without touching ExcessShard setting

// The built-in "eligibleNodes.size() - 1" offers just enough buffer so threshold is not rounded down by integer division.
// But not too much so that threshold does not get an automatic 1 shard extra allowance.
final int threshold = (totalShards + eligibleNodes.size() - 1 + indexBalanceConstraintSettings.getExcessShards()) / eligibleNodes
.size();
Copy link
Contributor

@nicktindall nicktindall Nov 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we put this in a function called ceilingDivision or something?

I see that it's a common method for doing integer ceiling division. There's also Math.ceilDiv(int, int) which I think would be even better. Leaving it as-is is just a bit mind boggling unless you're familiar with the trick.

Copy link
Contributor

@nicktindall nicktindall Nov 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I really think we should use Math#ceilDiv it has very detailed javadoc.

Copy link
Contributor Author

@zhubotang-wq zhubotang-wq Nov 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current form of calculation was suggested in an earlier feedback.

#135875 (comment)

The reviewer's rationale is as follows :

Staying in int simplifies I think, though for common numbers, doubles do have exact precision, this helps me not speculate ;-). And adding the skew tolerance before division ensures that with tolerance 1 we get:

2 shards, 2 nodes, allow 2 on each
3 shards, 2 nodes, allow 2 on each (prior version 3, but there is already 1 shard wiggle room).
etc.

The threshold computation has gone through several iterations

This calculation has evolved in several iterations.

final int threshold = (int) Math.ceil(idealAllocation * indexBalanceConstraintSettings. getExcessShards());

final int threshold = (int) Math.ceil(idealAllocation) + indexBalanceConstraintSettings. getExcessShards();

final int threshold = (totalShards + eligibleNodes.size() - 1 + indexBalanceConstraintSettings. getExcessShards()) / eligibleNodes.size();

There’s a broad spectrum of perspectives on this topic. I’ve made an effort to incorporate as much feedback as possible, though aligning all perspectives has proven challenging. It’s been a bit difficult to understand the group’s decision-making dynamics in this process.

At this stage, I am inclined to keep it in its current form for this iteration.

Copy link
Contributor

@nicktindall nicktindall Nov 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think using the integer ceilDiv function from java.lang.Math contradicts any of the prior advice. For me, we want to have a pretty good reason to re-write logic that exists in the standard library and I don't think I see that here. It's just a straightforward readability/maintainability thing.

I think I do feel strongly about this one.

final int threshold = Math.ceilDiv(totalShards + indexBalanceConstraintSettings.getExcessShards(), eligibleNodes.size());

is much easier to understand than

final int threshold = (totalShards + eligibleNodes.size() - 1 + indexBalanceConstraintSettings.getExcessShards()) / eligibleNodes
            .size();

Math.ceilDiv(int n, int d) seems to be equivalent to n + d - 1 / d except that the former is very nicely documented. I don't think @henningandersen would disagree?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, I agree to this, using a standard function is better.

Copy link
Contributor

@nicktindall nicktindall left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This LGTM

@zhubotang-wq zhubotang-wq merged commit b698d38 into elastic:main Nov 26, 2025
34 checks passed
@zhubotang-wq zhubotang-wq deleted the 12080-index-shard-count-allocation-decider branch November 26, 2025 18:04
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

:Distributed Coordination/Allocation All issues relating to the decision making around placing a shard (both master logic & on the nodes) >feature serverless-linked Added by automation, don't add manually Team:Distributed Coordination Meta label for Distributed Coordination team v9.3.0

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants