Skip to content

Commit

Permalink
Reroute eagerly on shard started events
Browse files Browse the repository at this point in the history
We have an optimization where we try to delay reroute after we processed the shard started events to try and combine a few into the same event. With teh queueing of shard started events in places, we don't need to do it, and we can reroute right away, which will actually reduce the amount of cluster state events we send.

This will also have a nice side effect of not missing on "waitForRelocatingShards(0)" on cluster health checks since relocations will happen right away.

closes #3417
  • Loading branch information
kimchy committed Jul 31, 2013
1 parent 2f8a397 commit e3480a1
Showing 1 changed file with 3 additions and 30 deletions.
Expand Up @@ -23,7 +23,6 @@
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.ProcessedClusterStateUpdateTask;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
Expand All @@ -45,7 +44,6 @@
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;

import static org.elasticsearch.cluster.ClusterState.newClusterStateBuilder;
import static org.elasticsearch.cluster.routing.ImmutableShardRouting.readShardRoutingEntry;
Expand All @@ -61,7 +59,6 @@ public class ShardStateAction extends AbstractComponent {
private final ThreadPool threadPool;

private final BlockingQueue<ShardRouting> startedShardsQueue = ConcurrentCollections.newBlockingQueue();
private final AtomicBoolean rerouteRequired = new AtomicBoolean();

@Inject
public ShardStateAction(Settings settings, ClusterService clusterService, TransportService transportService,
Expand Down Expand Up @@ -145,7 +142,7 @@ private void innerShardStarted(final ShardRouting shardRouting, final String rea
// process started events as fast as possible, to make shards available
startedShardsQueue.add(shardRouting);

clusterService.submitStateUpdateTask("shard-started (" + shardRouting + "), reason [" + reason + "]", Priority.HIGH, new ProcessedClusterStateUpdateTask() {
clusterService.submitStateUpdateTask("shard-started (" + shardRouting + "), reason [" + reason + "]", Priority.HIGH, new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {

Expand Down Expand Up @@ -190,8 +187,8 @@ public ClusterState execute(ClusterState currentState) {
if (logger.isDebugEnabled()) {
logger.debug("applying started shards {}, reason [{}]", shards, reason);
}
// we don't do reroute right away, we do it after publishing the fact that it was started
RoutingAllocation.Result routingResult = allocationService.applyStartedShards(currentState, shards, false);

RoutingAllocation.Result routingResult = allocationService.applyStartedShards(currentState, shards, true);
if (!routingResult.changed()) {
return currentState;
}
Expand All @@ -202,30 +199,6 @@ public ClusterState execute(ClusterState currentState) {
public void onFailure(String source, Throwable t) {
logger.error("unexpected failure during [{}]", t, source);
}

@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
rerouteRequired.set(true);
clusterService.submitStateUpdateTask("reroute post shard-started (" + shardRouting + "), reason [" + reason + "]", Priority.HIGH, new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
if (rerouteRequired.compareAndSet(true, false)) {
RoutingAllocation.Result routingResult = allocationService.reroute(currentState);
if (!routingResult.changed()) {
return currentState;
}
return newClusterStateBuilder().state(currentState).routingResult(routingResult).build();
} else {
return currentState;
}
}

@Override
public void onFailure(String source, Throwable t) {
logger.error("unexpected failure during [{}]", t, source);
}
});
}
});
}

Expand Down

0 comments on commit e3480a1

Please sign in to comment.