Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove recovery threadpools and throttle outgoing recoveries on the master #15372

Merged
merged 3 commits into from Dec 28, 2015

Conversation

Projects
None yet
4 participants
@s1monw
Copy link
Contributor

commented Dec 10, 2015

Today we throttle recoveries only for incoming recoveries. Nodes that have a lot
of primaries can get overloaded due to too many recoveries. To still keep that at bay
we limit the number of threads that are sending files to the target to overcome this problem.

The right solution here is to also throttle the outgoing recoveries that are today unbounded on
the master and don't start the recovery until we have enough resources on both source and target nodes.

The concurrency aspects of the recovery source also added a lot of complexity and additional threadpools that are hard to configure. This commit removes the concurrent streams notion completely and sends files in the thread that drives the recovery simplifying the recovery code considerably.
Outgoing recoveries are not throttled on the master via a allocation decider.

Note: this PR is still a bit rough but all tests pass and it contains a lot of improvements. I just wanted to get a pair of eyes on it for initial feedback.

@dakrone

View changes

core/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java Outdated

private void addRecovery(ShardRouting routing, int howMany, boolean initializing) {
assert routing.initializing() : "routing must be initializing: " + routing;
assert howMany == -1 || howMany == 1 : howMany + " != math.abs(1)";

This comment has been minimized.

Copy link
@dakrone

dakrone Dec 10, 2015

Member

If howMany can only be -1 or 1, why not make it a boolean increment/decrement flag?

@dakrone

View changes

...n/java/org/elasticsearch/cluster/routing/allocation/decider/ThrottlingAllocationDecider.java Outdated
@@ -75,7 +81,7 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing
if (shardRouting.primary()) {
assert shardRouting.unassigned() || shardRouting.active();
if (shardRouting.unassigned()) {
// primary is unassigned, means we are going to do recovery from gateway
// primary is unassigned, means we are goingx to do recovery from gateway

This comment has been minimized.

Copy link
@dakrone

dakrone Dec 10, 2015

Member

minor typo here

@dakrone

View changes

...n/java/org/elasticsearch/cluster/routing/allocation/decider/ThrottlingAllocationDecider.java Outdated
int currentOutRecoveries = allocation.routingNodes().getOutgoingRecoveries(node.nodeId());
int currentInRecoveries = allocation.routingNodes().getIncomingRecoveries(node.nodeId());
if (currentOutRecoveries >= concurrentOutgoingRecoveries) {
return allocation.decision(Decision.THROTTLE, NAME, "too many shards currently recovering [%d], limit: [%d]",

This comment has been minimized.

Copy link
@dakrone

dakrone Dec 10, 2015

Member

Can you add the word "outgoing" to the description so we can differentiate it with the incoming message?

@dakrone

View changes

...n/java/org/elasticsearch/cluster/routing/allocation/decider/ThrottlingAllocationDecider.java Outdated
if (currentOutRecoveries >= concurrentOutgoingRecoveries) {
return allocation.decision(Decision.THROTTLE, NAME, "too many shards currently recovering [%d], limit: [%d]",
currentOutRecoveries, concurrentOutgoingRecoveries);
} else if (currentInRecoveries >= concurrentIncomingRecoveries) {
return allocation.decision(Decision.THROTTLE, NAME, "too many shards currently recovering [%d], limit: [%d]",

This comment has been minimized.

Copy link
@dakrone

dakrone Dec 10, 2015

Member

Same here for adding "incoming" to the message somewhere

@dakrone

View changes

...n/java/org/elasticsearch/cluster/routing/allocation/decider/ThrottlingAllocationDecider.java Outdated
return allocation.decision(Decision.YES, NAME, "below shard recovery limit of [%d]", concurrentRecoveries);
currentInRecoveries, concurrentIncomingRecoveries);
} else {
return allocation.decision(Decision.YES, NAME, "below shard recovery limit of [%d]", concurrentOutgoingRecoveries);

This comment has been minimized.

Copy link
@dakrone

dakrone Dec 10, 2015

Member

Should probably mention both limits here, or else it could be confusing if they are different values

@dakrone

View changes

...n/java/org/elasticsearch/cluster/routing/allocation/decider/ThrottlingAllocationDecider.java Outdated
int concurrentRecoveries = settings.getAsInt(CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES, ThrottlingAllocationDecider.this.concurrentOutgoingRecoveries);
if (concurrentRecoveries != ThrottlingAllocationDecider.this.concurrentOutgoingRecoveries) {
logger.info("updating [cluster.routing.allocation.node_concurrent_recoveries] from [{}] to [{}]", ThrottlingAllocationDecider.this.concurrentOutgoingRecoveries, concurrentRecoveries);
ThrottlingAllocationDecider.this.concurrentOutgoingRecoveries = concurrentRecoveries;

This comment has been minimized.

Copy link
@dakrone

dakrone Dec 10, 2015

Member

Only outgoing recovery limit is configurable? I think the incoming recovery limit needs code here too?

@dakrone

View changes

...n/java/org/elasticsearch/cluster/routing/allocation/decider/ThrottlingAllocationDecider.java Outdated
this.concurrentOutgoingRecoveries = settings.getAsInt(CLUSTER_ROUTING_ALLOCATION_CONCURRENT_OUTGOING_RECOVERIES, concurrentRecoveriesDefault);
this.concurrentIncomingRecoveries = settings.getAsInt(CLUSTER_ROUTING_ALLOCATION_CONCURRENT_INCOMING_RECOVERIES, concurrentRecoveriesDefault);

logger.debug("using node_concurrent_recoveries [{}], node_initial_primaries_recoveries [{}]", concurrentOutgoingRecoveries, primariesInitialRecoveries);

This comment has been minimized.

Copy link
@dakrone

dakrone Dec 10, 2015

Member

Should we log incoming recoveries setting here too?

@dakrone

View changes

core/src/test/java/org/elasticsearch/cluster/routing/allocation/ThrottlingAllocationTests.java Outdated

ClusterState clusterState = ClusterState.builder(org.elasticsearch.cluster.ClusterName.DEFAULT).metaData(metaData).routingTable(routingTable).build();

logger.info("start one node, do reroute, only 3 should initialize");

This comment has been minimized.

Copy link
@dakrone

dakrone Dec 10, 2015

Member

3 -> 5

@dakrone

View changes

core/src/test/java/org/elasticsearch/cluster/routing/allocation/ThrottlingAllocationTests.java Outdated
routingTable = strategy.applyStartedShards(clusterState, routingTable.shardsWithState(INITIALIZING)).routingTable();
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();

logger.info("start another node, replicas should start being allocated");

This comment has been minimized.

Copy link
@dakrone

dakrone Dec 10, 2015

Member

?? this index has no replicas

assertEquals(clusterState.getRoutingNodes().getIncomingRecoveries("node2"), 3);
assertEquals(clusterState.getRoutingNodes().getIncomingRecoveries("node3"), 2);
assertEquals(clusterState.getRoutingNodes().getIncomingRecoveries("node1"), 0);
assertEquals(clusterState.getRoutingNodes().getOutgoingRecoveries("node1"), 5);

This comment has been minimized.

Copy link
@dakrone

dakrone Dec 10, 2015

Member

Being able to assert this is really great, thanks for adding it!

@dakrone

This comment has been minimized.

Copy link
Member

commented Dec 10, 2015

I really like how this cleans up the RecoverySourceHandler!

One thing I don't understand is how many times the sendFiles(...) method will actually be called concurrently? I'm not familiar enough with the code to know exactly where it is called. Is it going to be called sequentially for every recovery operation now, with the number of recovery operations limited by the new throttling mechanism?

@nik9000

This comment has been minimized.

Copy link
Contributor

commented Dec 10, 2015

Nodes that have a lot
of primaries can get overloaded due to too many recoveries.

I've certainly noticed this in the past.

@s1monw

This comment has been minimized.

Copy link
Contributor Author

commented Dec 11, 2015

One thing I don't understand is how many times the sendFiles(...) method will actually be called concurrently? I'm not familiar enough with the code to know exactly where it is called. Is it going to be called sequentially for every recovery operation now, with the number of recovery operations limited by the new throttling mechanism?

Yeah so previously we had one thread driving the recovery on the source that passed all file sending task to the threadpools and then blocked until all files where transferred. Now we don't have the threadpools and just use that driver thread to send all the files sequentially. We only use throttling to limit the rate of how fast we are sending. Now with the changes on the allocation end we also limit how many senders we have per node to not overload the node. But essentially we use a single thread for this IO bound problem instead of multiple.

I've certainly noticed this in the past.

thats a nice side-effect of this simplification

@s1monw s1monw force-pushed the s1monw:trash_recovery_threads branch 2 times, most recently Dec 22, 2015

Remove recovery threadpools and throttle outgoing recoveries on the m…
…aster

Today we throttle recoveries only for incoming recoveries. Nodes that have a lot
of primaries can get overloaded due to too many recoveries. To still keep that at bay
we limit the number of threads that are sending files to the target to overcome this problem.

The right solution here is to also throttle the outgoing recoveries that are today unbounded on
the master and don't start the recovery until we have enough resources on both source and target nodes.

The concurrency aspects of the recovery source also added a lot of complexity and additional threadpools
that are hard to configure. This commit removes the concurrent streamns notion completely and sends files
in the thread that drives the recovery simplifying the recovery code considerably.
Outgoing recoveries are not throttled on the master via a allocation decider.

@s1monw s1monw force-pushed the s1monw:trash_recovery_threads branch to f5e4cd4 Dec 22, 2015

@s1monw

This comment has been minimized.

Copy link
Contributor Author

commented Dec 22, 2015

@bleskes @dakrone I rebased this to current master - reviews would be appreciated

}
}
}
// if (outgoing != value.outgoing) {

This comment has been minimized.

Copy link
@dakrone

dakrone Dec 22, 2015

Member

Can you add a comment for why this block is commented out? (why we should keep it I mean)

This comment has been minimized.

Copy link
@s1monw

s1monw Dec 22, 2015

Author Contributor

oh this is a left over from debugging I will remove

for (ShardRouting shardEntry : routingNodes.unassigned()) {
if (shardEntry.primary()) {
// remove dangling replicas that are initializing for primary shards
changed |= failReplicasForUnassignedPrimary(allocation, shardEntry);

This comment has been minimized.

Copy link
@dakrone

dakrone Dec 22, 2015

Member

Thank you far factoring this into a function!

corruptedEngine = handleExecutionException(store, corruptedEngine, exceptions, md, t.getCause());
} catch (InterruptedException t) {
corruptedEngine = handleExecutionException(store, corruptedEngine, exceptions, md, t);
ArrayUtil.timSort(files, (a,b) -> Long.compare(a.length(), b.length())); // send smallest first

This comment has been minimized.

Copy link
@dakrone

dakrone Dec 22, 2015

Member

Out of curiosity (entirely out of the scope of this PR), do you think it would be worthwhile to implement the index priority logic here so higher priority indices get relocated before lower priority indices?

This comment has been minimized.

Copy link
@s1monw

s1monw Dec 22, 2015

Author Contributor

all these files belong to the same index shard... - one RecoverySourceHandler per shard

This comment has been minimized.

Copy link
@dakrone

dakrone Dec 22, 2015

Member

Ahh okay, makes sense.

for (int i = 0; i < files.length; i++) {
final StoreFileMetaData md = files[i];
try (final IndexInput indexInput = store.directory().openInput(md.name(), IOContext.READONCE)) {
// it's fine that we are only having the indexInput int he try/with block. The copy methods handles

This comment has been minimized.

Copy link
@dakrone

dakrone Dec 22, 2015

Member

minor nit: "int he" -> "in the"

@dakrone

This comment has been minimized.

Copy link
Member

commented Dec 22, 2015

LGTM

s1monw added a commit that referenced this pull request Dec 28, 2015

Merge pull request #15372 from s1monw/trash_recovery_threads
Remove recovery threadpools and throttle outgoing recoveries on the master

@s1monw s1monw merged commit ba755a5 into elastic:master Dec 28, 2015

1 check passed

CLA Commit author is a member of Elasticsearch
Details

@s1monw s1monw deleted the s1monw:trash_recovery_threads branch Dec 28, 2015

n0othing added a commit that referenced this pull request Dec 15, 2016

[DOCS] Remove indices.recovery.concurrent_streams from breaking changes
As per #15372 `indices.recovery.concurrent_streams` was replaced by `cluster.routing.allocation.node_concurrent_recoveries`

n0othing added a commit that referenced this pull request Dec 15, 2016

[DOCS] Remove indices.recovery.concurrent_streams from breaking chang…
…es (#22208)

As per #15372 `indices.recovery.concurrent_streams` was replaced by `cluster.routing.allocation.node_concurrent_recoveries`

pickypg added a commit that referenced this pull request Dec 15, 2016

[DOCS] Remove indices.recovery.concurrent_streams from breaking chang…
…es (#22208)

As per #15372 `indices.recovery.concurrent_streams` was replaced by `cluster.routing.allocation.node_concurrent_recoveries`

pickypg added a commit that referenced this pull request Dec 15, 2016

[DOCS] Remove indices.recovery.concurrent_streams from breaking chang…
…es (#22208)

As per #15372 `indices.recovery.concurrent_streams` was replaced by `cluster.routing.allocation.node_concurrent_recoveries`
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.