Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 0 additions & 21 deletions muted-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -549,12 +549,6 @@ tests:
- class: org.elasticsearch.xpack.test.rest.XPackRestIT
method: test {p0=transform/transforms_start_stop/Test stop transform with force and wait_for_checkpoint true}
issue: https://github.com/elastic/elasticsearch/issues/135135
- class: org.elasticsearch.lucene.RollingUpgradeSearchableSnapshotIndexCompatibilityIT
method: testSearchableSnapshotUpgrade {p0=[9.2.0, 9.2.0, 9.2.0]}
issue: https://github.com/elastic/elasticsearch/issues/135150
- class: org.elasticsearch.lucene.RollingUpgradeSearchableSnapshotIndexCompatibilityIT
method: testMountSearchableSnapshot {p0=[9.2.0, 9.2.0, 9.2.0]}
issue: https://github.com/elastic/elasticsearch/issues/135151
- class: org.elasticsearch.discovery.ClusterDisruptionIT
method: testAckedIndexing
issue: https://github.com/elastic/elasticsearch/issues/117024
Expand Down Expand Up @@ -591,27 +585,12 @@ tests:
- class: org.elasticsearch.xpack.esql.expression.function.scalar.score.DecayTests
method: "testEvaluateBlockWithoutNulls {TestCase=<date_nanos>, <date_nanos>, <time_duration>, <_source> #12}"
issue: https://github.com/elastic/elasticsearch/issues/135394
- class: org.elasticsearch.upgrades.DataStreamsUpgradeIT
method: testDataStreamValidationDoesNotBreakUpgrade
issue: https://github.com/elastic/elasticsearch/issues/135406
- class: org.elasticsearch.upgrades.IndexingIT
method: testIndexing
issue: https://github.com/elastic/elasticsearch/issues/135407
- class: org.elasticsearch.upgrades.QueryableBuiltInRolesUpgradeIT
method: testBuiltInRolesSyncedOnClusterUpgrade
issue: https://github.com/elastic/elasticsearch/issues/135194
- class: org.elasticsearch.gradle.TestClustersPluginFuncTest
method: override jdk usage via ES_JAVA_HOME for known jdk os incompatibilities
issue: https://github.com/elastic/elasticsearch/issues/135413
- class: org.elasticsearch.xpack.esql.qa.single_node.EsqlSpecIT
method: test {csv-spec:spatial_shapes.ConvertCartesianShapeFromStringParseError}
issue: https://github.com/elastic/elasticsearch/issues/135455
- class: org.elasticsearch.upgrades.SearchableSnapshotsRollingUpgradeIT
method: testBlobStoreCacheWithPartialCopyInMixedVersions
issue: https://github.com/elastic/elasticsearch/issues/135473
- class: org.elasticsearch.upgrades.SearchableSnapshotsRollingUpgradeIT
method: testBlobStoreCacheWithFullCopyInMixedVersions
issue: https://github.com/elastic/elasticsearch/issues/135474
- class: org.elasticsearch.xpack.esql.qa.multi_node.GenerativeIT
method: test
issue: https://github.com/elastic/elasticsearch/issues/134407
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,25 +140,83 @@ public BalancedShardsAllocator(

@Override
public void allocate(RoutingAllocation allocation) {
assert allocation.isSimulating() == false || balancerSettings.completeEarlyOnShardAssignmentChange()
: "inconsistent states: isSimulating ["
+ allocation.isSimulating()
+ "] vs completeEarlyOnShardAssignmentChange ["
+ balancerSettings.completeEarlyOnShardAssignmentChange()
+ "]";
if (allocation.metadata().hasAnyIndices()) {
// must not use licensed features when just starting up
writeLoadForecaster.refreshLicense();
}

assert allocation.ignoreDisable() == false;
assert allocation.isSimulating() == false || allocation.routingNodes().hasInactiveShards() == false
: "expect no initializing shard, but got " + allocation.routingNodes();
// TODO: ES-12943 cannot assert the following because shards moved by commands are not simulated promptly in DesiredBalanceComputer
// assert allocation.isSimulating() == false || allocation.routingNodes().getRelocatingShardCount() == 0
// : "expect no relocating shard, but got " + allocation.routingNodes();

if (allocation.routingNodes().size() == 0) {
failAllocationOfNewPrimaries(allocation);
return;
}
final BalancingWeights balancingWeights = balancingWeightsFactory.create();
final Balancer balancer = new Balancer(writeLoadForecaster, allocation, balancerSettings.getThreshold(), balancingWeights);
balancer.allocateUnassigned();
balancer.moveShards();
balancer.balance();
final Balancer balancer = new Balancer(
writeLoadForecaster,
allocation,
balancerSettings.getThreshold(),
balancingWeights,
balancerSettings.completeEarlyOnShardAssignmentChange()
);

boolean shardAssigned = false, shardMoved = false, shardBalanced = false;
try {
shardAssigned = balancer.allocateUnassigned();
if (shardAssigned && balancerSettings.completeEarlyOnShardAssignmentChange()) {
return;
}

// Node weights are calculated after each internal balancing round and saved to the RoutingNodes copy.
collectAndRecordNodeWeightStats(balancer, balancingWeights, allocation);
shardMoved = balancer.moveShards();
if (shardMoved && balancerSettings.completeEarlyOnShardAssignmentChange()) {
return;
}

shardBalanced = balancer.balance();
} finally {
if (logger.isDebugEnabled()) {
logger.debug(
"shards assigned: {}, shards moved: {}, shards balanced: {}, "
+ "routingNodes hasInactiveShards [{}], relocation count [{}]",
shardAssigned,
shardMoved,
shardBalanced,
allocation.routingNodes().hasInactiveShards(),
allocation.routingNodes().getRelocatingShardCount()
);
}
assert assertShardAssignmentChanges(allocation, shardAssigned, shardMoved, shardBalanced);
// Node weights are calculated after each internal balancing round and saved to the RoutingNodes copy.
collectAndRecordNodeWeightStats(balancer, balancingWeights, allocation);
}
}

private boolean assertShardAssignmentChanges(
RoutingAllocation allocation,
boolean shardAssigned,
boolean shardMoved,
boolean shardBalanced
) {
if (allocation.isSimulating() == false) {
return true;
}
assert shardAssigned == false || allocation.routingNodes().hasInactiveShards()
: "expect initializing shard, but got " + allocation.routingNodes();

assert (shardMoved == false && shardBalanced == false) || allocation.routingNodes().getRelocatingShardCount() > 0
: "expect relocating shard, but got " + allocation.routingNodes();
return true;
}

private void collectAndRecordNodeWeightStats(Balancer balancer, BalancingWeights balancingWeights, RoutingAllocation allocation) {
Expand Down Expand Up @@ -188,7 +246,8 @@ public ShardAllocationDecision decideShardAllocation(final ShardRouting shard, f
writeLoadForecaster,
allocation,
balancerSettings.getThreshold(),
balancingWeightsFactory.create()
balancingWeightsFactory.create(),
balancerSettings.completeEarlyOnShardAssignmentChange()
);
AllocateUnassignedDecision allocateUnassignedDecision = AllocateUnassignedDecision.NOT_TAKEN;
MoveDecision moveDecision = MoveDecision.NOT_TAKEN;
Expand Down Expand Up @@ -248,12 +307,14 @@ public static class Balancer {
private final Map<String, ModelNode> nodes;
private final BalancingWeights balancingWeights;
private final NodeSorters nodeSorters;
private final boolean completeEarlyOnShardAssignmentChange;

private Balancer(
WriteLoadForecaster writeLoadForecaster,
RoutingAllocation allocation,
float threshold,
BalancingWeights balancingWeights
BalancingWeights balancingWeights,
boolean completeEarlyOnShardAssignmentChange
) {
this.writeLoadForecaster = writeLoadForecaster;
this.allocation = allocation;
Expand All @@ -266,6 +327,7 @@ private Balancer(
nodes = Collections.unmodifiableMap(buildModelFromAssigned());
this.nodeSorters = balancingWeights.createNodeSorters(nodesArray(), this);
this.balancingWeights = balancingWeights;
this.completeEarlyOnShardAssignmentChange = completeEarlyOnShardAssignmentChange;
}

private static long getShardDiskUsageInBytes(ShardRouting shardRouting, IndexMetadata indexMetadata, ClusterInfo clusterInfo) {
Expand Down Expand Up @@ -358,7 +420,7 @@ private IndexMetadata indexMetadata(ProjectIndex index) {
* Balances the nodes on the cluster model according to the weight function.
* The actual balancing is delegated to {@link #balanceByWeights(NodeSorter)}
*/
private void balance() {
private boolean balance() {
if (logger.isTraceEnabled()) {
logger.trace("Start balancing cluster");
}
Expand All @@ -371,21 +433,27 @@ private void balance() {
* Therefore we only do a rebalance if we have fetched all information.
*/
logger.debug("skipping rebalance due to in-flight shard/store fetches");
return;
return false;
}
if (allocation.deciders().canRebalance(allocation).type() != Type.YES) {
logger.trace("skipping rebalance as it is disabled");
return;
return false;
}

boolean shardBalanced = false;
// Balance each partition
for (NodeSorter nodeSorter : nodeSorters) {
if (nodeSorter.modelNodes.length < 2) { /* skip if we only have one node */
logger.trace("skipping rebalance as the partition has single node only");
continue;
}
balanceByWeights(nodeSorter);
shardBalanced |= balanceByWeights(nodeSorter);
// TODO: We could choose to account shardBalanced separately for each partition since they do not overlap.
if (shardBalanced && completeEarlyOnShardAssignmentChange) {
return true;
}
}
return shardBalanced;
}

/**
Expand Down Expand Up @@ -531,7 +599,8 @@ private MoveDecision decideRebalance(final ProjectIndex index, final ShardRoutin
* only, or in other words relocations that move the weight delta closer
* to {@code 0.0}
*/
private void balanceByWeights(NodeSorter sorter) {
private boolean balanceByWeights(NodeSorter sorter) {
boolean shardBalanced = false;
final AllocationDeciders deciders = allocation.deciders();
final ModelNode[] modelNodes = sorter.modelNodes;
final float[] weights = sorter.weights;
Expand Down Expand Up @@ -630,6 +699,18 @@ private void balanceByWeights(NodeSorter sorter) {
sorter.sort(0, relevantNodes);
lowIdx = 0;
highIdx = relevantNodes - 1;

if (routingNodes.getRelocatingShardCount() > 0) {
// ES-12955: Check routingNodes.getRelocatingShardCount() > 0 in case the first relocation is a THROTTLE.
// This should rarely happen since in most cases, we don't throttle unless there is an existing relocation.
// But it can happen in production for frozen indices when the cache is still being prepared. It can also
// happen in tests because we have decider like RandomAllocationDecider that can randomly return THROTTLE
// when there is no existing relocation.
shardBalanced = true;
}
if (completeEarlyOnShardAssignmentChange && shardBalanced) {
return true;
}
Comment on lines +703 to +713
Copy link
Member Author

@ywangd ywangd Sep 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the fix. See also 70b99de for its commit.

There were two issues:

  1. The main one is the we should only flag shardBalanced to true when there is shard movement on the cluster instead of just on the model node.
  2. The second issue is that in production, we can actually have balance being throttled without any prior shard movement due to HasFrozenCacheAllocationDecider. I updated the comment to reflect it.

continue;
}
}
Expand All @@ -651,6 +732,7 @@ private void balanceByWeights(NodeSorter sorter) {
}
}
}
return shardBalanced;
}

/**
Expand Down Expand Up @@ -721,7 +803,8 @@ protected int comparePivot(int j) {
* shard is created with an incremented version in the state
* {@link ShardRoutingState#INITIALIZING}.
*/
public void moveShards() {
public boolean moveShards() {
boolean shardMoved = false;
// Iterate over the started shards interleaving between nodes, and check if they can remain. In the presence of throttling
// shard movements, the goal of this iteration order is to achieve a fairer movement of shards from the nodes that are
// offloading the shards.
Expand All @@ -745,10 +828,15 @@ public void moveShards() {
if (logger.isTraceEnabled()) {
logger.trace("Moved shard [{}] to node [{}]", shardRouting, targetNode.getRoutingNode());
}
shardMoved = true;
if (completeEarlyOnShardAssignmentChange) {
return true;
}
} else if (moveDecision.isDecisionTaken() && moveDecision.canRemain() == false) {
logger.trace("[{}][{}] can't move", shardRouting.index(), shardRouting.id());
}
}
return shardMoved;
}

/**
Expand Down Expand Up @@ -888,14 +976,14 @@ private Map<String, ModelNode> buildModelFromAssigned() {
* Allocates all given shards on the minimal eligible node for the shards index
* with respect to the weight function. All given shards must be unassigned.
*/
private void allocateUnassigned() {
private boolean allocateUnassigned() {
RoutingNodes.UnassignedShards unassigned = routingNodes.unassigned();
assert nodes.isEmpty() == false;
if (logger.isTraceEnabled()) {
logger.trace("Start allocating unassigned shards");
}
if (unassigned.isEmpty()) {
return;
return false;
}

/*
Expand Down Expand Up @@ -932,6 +1020,7 @@ private void allocateUnassigned() {
int secondaryLength = 0;
int primaryLength = primary.length;
ArrayUtil.timSort(primary, comparator);
boolean shardAssignmentChanged = false;
do {
for (int i = 0; i < primaryLength; i++) {
ShardRouting shard = primary[i];
Expand All @@ -949,6 +1038,7 @@ private void allocateUnassigned() {

final long shardSize = getExpectedShardSize(shard, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE, allocation);
shard = routingNodes.initializeShard(shard, minNode.getNodeId(), null, shardSize, allocation.changes());
shardAssignmentChanged = true;
minNode.addShard(index, shard);
if (shard.primary() == false) {
// copy over the same replica shards to the secondary array so they will get allocated
Expand All @@ -972,6 +1062,9 @@ private void allocateUnassigned() {
assert allocationDecision.getAllocationStatus() == AllocationStatus.DECIDERS_THROTTLED;
final long shardSize = getExpectedShardSize(shard, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE, allocation);
minNode.addShard(projectIndex(shard), shard.initialize(minNode.getNodeId(), null, shardSize));
// If we see a throttle decision in simulation, there must be other shards that got assigned before it.
assert allocation.isSimulating() == false || shardAssignmentChanged
: "shard " + shard + " was throttled but no other shards were assigned";
} else {
if (logger.isTraceEnabled()) {
logger.trace("No Node found to assign shard [{}]", shard);
Expand All @@ -994,6 +1087,7 @@ private void allocateUnassigned() {
secondaryLength = 0;
} while (primaryLength > 0);
// clear everything we have either added it or moved to ignoreUnassigned
return shardAssignmentChanged;
}

private ProjectIndex projectIndex(ShardRouting shardRouting) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

package org.elasticsearch.cluster.routing.allocation.allocator;

import org.elasticsearch.cluster.ClusterModule;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;

Expand All @@ -26,6 +27,7 @@ public class BalancerSettings {
private volatile float writeLoadBalanceFactor;
private volatile float diskUsageBalanceFactor;
private volatile float threshold;
private final boolean completeEarlyOnShardAssignmentChange;

public BalancerSettings(Settings settings) {
this(ClusterSettings.createBuiltInClusterSettings(settings));
Expand All @@ -37,6 +39,9 @@ public BalancerSettings(ClusterSettings clusterSettings) {
clusterSettings.initializeAndWatch(WRITE_LOAD_BALANCE_FACTOR_SETTING, value -> this.writeLoadBalanceFactor = value);
clusterSettings.initializeAndWatch(DISK_USAGE_BALANCE_FACTOR_SETTING, value -> this.diskUsageBalanceFactor = value);
clusterSettings.initializeAndWatch(THRESHOLD_SETTING, value -> this.threshold = value);
this.completeEarlyOnShardAssignmentChange = ClusterModule.DESIRED_BALANCE_ALLOCATOR.equals(
clusterSettings.get(ClusterModule.SHARDS_ALLOCATOR_TYPE_SETTING)
);
}

/**
Expand Down Expand Up @@ -67,4 +72,8 @@ public float getDiskUsageBalanceFactor() {
public float getThreshold() {
return threshold;
}

public boolean completeEarlyOnShardAssignmentChange() {
return completeEarlyOnShardAssignmentChange;
}
}
Loading