Skip to content

Conversation

pxsalehi
Copy link
Member

@pxsalehi pxsalehi commented Sep 10, 2025

This is needed for a new Stateless decider that limits concurrent recoveries based on node heap size.

Relates ES-12554

@pxsalehi pxsalehi added >non-issue :Distributed Coordination/Allocation All issues relating to the decision making around placing a shard (both master logic & on the nodes) labels Sep 10, 2025
@pxsalehi pxsalehi force-pushed the ps250908-limitConcRecoveries branch from a538165 to d83e803 Compare September 10, 2025 09:23
Comment on lines 169 to 170
if (shardRouting.isPromotableToPrimary()
&& shardRouting.isSearchable() == false
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've chosen to do this to only impact primary relocation in stateless, since that is the only place where this is needed. I didn't want to complicate the stateful flow as there is no need for that.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There could be also different ways of doing this, e.g. passing Settings to the decider and using STATELESS_ENABLED, I did this as in other places e.g. when to use TransportStatelessPrimaryRelocationAction, we do the same kind of check on the shard.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we could do this as a completely separate decider that's added by the Stateless plugin? might be nice to keep it separate from the existing throttling logic, because it's just an additional constraint right? I don't think there's any reason it needs to be in here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's an interesting idea. But this seems too much of a niche for a separate decider. It seems very relevant to where it is now and if we decide to apply the same limitation to stateful it can be done here. I don't have a strong argument for any of them frankly. It seemed small enough of a change to add it to the existing decider to begin with. We might also change or remove it if we go in the direction of coming up with some relationship between concurrent relocations and node size.

@pxsalehi pxsalehi marked this pull request as ready for review September 10, 2025 10:06
@pxsalehi pxsalehi requested a review from a team as a code owner September 10, 2025 10:06
@elasticsearchmachine elasticsearchmachine added the Team:Distributed Coordination Meta label for Distributed Coordination team label Sep 10, 2025
@elasticsearchmachine
Copy link
Collaborator

Pinging @elastic/es-distributed-coordination (Team:Distributed Coordination)

Copy link
Contributor

@nicktindall nicktindall left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good, just a question whether we should split this out to its own decider.

Comment on lines 169 to 170
if (shardRouting.isPromotableToPrimary()
&& shardRouting.isSearchable() == false
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we could do this as a completely separate decider that's added by the Stateless plugin? might be nice to keep it separate from the existing throttling logic, because it's just an additional constraint right? I don't think there's any reason it needs to be in here?


// Allocating a shard to this node will increase the incoming recoveries
int currentInRecoveries = allocation.routingNodes().getIncomingRecoveries(node.nodeId());
if (currentInRecoveries >= concurrentIncomingRecoveries) {
Copy link
Contributor

@mhl-b mhl-b Sep 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we change concurrentIncomingRecoveries be a function of maxHeapSize? There are projects with large number of shards(10k+ per node) and large heaps that take long time (1h+) to relocate.

Something like

int concurrentRecoveriesThreshold = concurrentIncomingRecoveries;
if (dynamicConcurrentRecoveriesSetting) {
  concurrentRecoveriesThreshold = fn(maxHeap);
}
if (currentInRecoveries >= concurrentRecoveriesThreshold) {
...

As heuristic I would use 1 recovery for every 2GB of max-heap. That would solve problem for 4GB(2GB heap, 1 recovery) and 64GB(32GB heap, 16 recoveries) nodes as well.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

once we decide that's what we want and how to do it, we can extend the new stateless decider.

Copy link
Contributor

@henningandersen henningandersen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am slightly worried about adding a setting that we may want to remove again and also that it is settable for non-stateless. I am good with this otherwise if we can find a way around that to avoid any bwc implications of removing the setting.

Setting.memorySizeSetting(
"cluster.routing.allocation.min_heap_required_for_concurrent_primary_recoveries",
ByteSizeValue.ZERO,
Property.Dynamic,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we make it OperatorDynamic (seems more appropriate but may not matter too much).

ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_INCOMING_RECOVERIES_SETTING,
ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_OUTGOING_RECOVERIES_SETTING,
ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES_SETTING,
ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_MIN_HEAP_REQUIRED_FOR_CONCURRENT_PRIMARY_RECOVERIES_SETTING,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we can register this in stateless only (and tests)? To avoid any bwc implications if we end up removing this again and replace it with a more elaborate mechanism. Hmm, might make any lookup fail which complicates things.

Perhaps we can "bomb" the validation by only allowing this when running stateless?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd rather go with Nick's suggestion. I've update the PR.

@pxsalehi pxsalehi changed the title Make concurrent primary recoveries dependant on heap size Expose node heap size in cluster info Sep 12, 2025
@pxsalehi
Copy link
Member Author

This PR now only exposes the node max heap size in cluster info. I've added a new stateless decider that does the throttling.

* This method returns the corresponding initializing shard that would be allocated to this node.
*/
private static ShardRouting initializingShard(ShardRouting shardRouting, String currentNodeId) {
public static ShardRouting initializingShard(ShardRouting shardRouting, String currentNodeId) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is needed to do the same assertion that is done in this decider, but in the stateless decider.

Copy link
Contributor

@henningandersen henningandersen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM.

nodeThreadPoolUsageStatsPerNode,
indicesStatsSummary.shardWriteLoads()
indicesStatsSummary.shardWriteLoads(),
maxHeapPerNode
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: maxHeapPerNode is a volatile field. While it will not matter in the current work, I would find it slightly better to capture the value of maxHeapPerNode once in this method such that the maxHeapPerNode and the estimatedHeapUsages are guaranteed to be created based on the same maxHeapPerNode instance.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure. 75f7cd0.

return result == null ? ReservedSpace.EMPTY : result;
}

public Map<String, ByteSizeValue> getMaxHeapSizePerNode() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a test similar to IndexShardIT.testHeapUsageEstimateIsPresent? We can postpone to follow-up work if that matches timing better.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added test in 44dcc78

Copy link
Contributor

@DaveCTurner DaveCTurner left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM2

final Map<String, EstimatedHeapUsage> estimatedHeapUsages;
final Map<String, NodeUsageStatsForThreadPools> nodeUsageStatsForThreadPools;
final Map<ShardId, Double> shardWriteLoads;
final Map<String, ByteSizeValue> maxHeapSizePerNode;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: not really a problem introduced here but it'd be nice to indicate what the opaque String keys are -- presumably persistent node IDs?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added comment in 44dcc78

@pxsalehi pxsalehi added the auto-merge-without-approval Automatically merge pull request when CI checks pass (NB doesn't wait for reviews!) label Sep 12, 2025
@elasticsearchmachine elasticsearchmachine merged commit 4db0011 into elastic:main Sep 12, 2025
34 checks passed
@pxsalehi pxsalehi deleted the ps250908-limitConcRecoveries branch September 12, 2025 13:09
gmjehovich pushed a commit to gmjehovich/elasticsearch that referenced this pull request Sep 18, 2025
This is needed for a new Stateless decider that limits concurrent
recoveries based on node heap size.

Relates ES-12554
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
auto-merge-without-approval Automatically merge pull request when CI checks pass (NB doesn't wait for reviews!) :Distributed Coordination/Allocation All issues relating to the decision making around placing a shard (both master logic & on the nodes) >non-issue Team:Distributed Coordination Meta label for Distributed Coordination team v9.2.0
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants