Skip to content

Commit

Permalink
Remove scheduled routing
Browse files Browse the repository at this point in the history
Today, we have scheduled reroute that kicks every 10 seconds and checks if a
reroute is needed. We use it when adding nodes, since we don't reroute right
away once its added, and give it a time window to add additional nodes.

We do have recover after nodes setting and such in order to wait for enough
nodes to be added, and also, it really depends at what part of the 10s window
you end up, sometimes, it might not be effective at all. In general, its historic
from the times before we had recover after nodes and such.

This change removes the 10s scheduling, simplifies RoutingService, and adds
explicit reroute when a node is added to the system. It also adds unit tests
to RoutingService.

closes #11776
  • Loading branch information
kimchy committed Jun 23, 2015
1 parent f9bd3db commit 9223ceb
Show file tree
Hide file tree
Showing 12 changed files with 190 additions and 121 deletions.
Expand Up @@ -191,7 +191,7 @@ public void onFailure(String source, Throwable t) {
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
if (oldState != newState && newState.getRoutingNodes().hasUnassigned()) {
logger.trace("unassigned shards after shard failures. scheduling a reroute.");
routingService.scheduleReroute();
routingService.reroute("unassigned shards after shard failures, scheduling a reroute");
}
}
});
Expand Down
100 changes: 27 additions & 73 deletions src/main/java/org/elasticsearch/cluster/routing/RoutingService.java
Expand Up @@ -19,9 +19,7 @@

package org.elasticsearch.cluster.routing;

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.cluster.*;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.common.Priority;
Expand All @@ -33,12 +31,9 @@
import org.elasticsearch.common.util.concurrent.FutureUtils;
import org.elasticsearch.threadpool.ThreadPool;

import java.util.concurrent.Future;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.AtomicBoolean;

import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds;

/**
* A {@link RoutingService} listens to clusters state. When this service
* receives a {@link ClusterChangedEvent} the cluster state will be verified and
Expand All @@ -53,21 +48,13 @@
*/
public class RoutingService extends AbstractLifecycleComponent<RoutingService> implements ClusterStateListener {

private static final String CLUSTER_UPDATE_TASK_SOURCE = "routing-table-updater";

private final ThreadPool threadPool;
private static final String CLUSTER_UPDATE_TASK_SOURCE = "cluster_reroute";

final ThreadPool threadPool;
private final ClusterService clusterService;

private final AllocationService allocationService;

private final TimeValue schedule;

private volatile boolean routingTableDirty = false;

private volatile Future scheduledRoutingTableFuture;
private AtomicBoolean rerouting = new AtomicBoolean();

private volatile long registeredNextDelaySetting = Long.MAX_VALUE;
private volatile ScheduledFuture registeredNextDelayFuture;

Expand All @@ -77,63 +64,39 @@ public RoutingService(Settings settings, ThreadPool threadPool, ClusterService c
this.threadPool = threadPool;
this.clusterService = clusterService;
this.allocationService = allocationService;
this.schedule = componentSettings.getAsTime("schedule", timeValueSeconds(10));
clusterService.addFirst(this);
if (clusterService != null) {
clusterService.addFirst(this);
}
}

@Override
protected void doStart() throws ElasticsearchException {
protected void doStart() {
}

@Override
protected void doStop() throws ElasticsearchException {
protected void doStop() {
}

@Override
protected void doClose() throws ElasticsearchException {
FutureUtils.cancel(scheduledRoutingTableFuture);
scheduledRoutingTableFuture = null;
protected void doClose() {
FutureUtils.cancel(registeredNextDelayFuture);
clusterService.remove(this);
}

/** make sure that a reroute will be done by the next scheduled check */
public void scheduleReroute() {
routingTableDirty = true;
/**
* Initiates a reroute.
*/
public final void reroute(String reason) {
performReroute(reason);
}

@Override
public void clusterChanged(ClusterChangedEvent event) {
if (event.source().equals(CLUSTER_UPDATE_TASK_SOURCE)) {
if (event.source().startsWith(CLUSTER_UPDATE_TASK_SOURCE)) {
// that's us, ignore this event
return;
}
if (event.state().nodes().localNodeMaster()) {
// we are master, schedule the routing table updater
if (scheduledRoutingTableFuture == null) {
// a new master (us), make sure we reroute shards
routingTableDirty = true;
scheduledRoutingTableFuture = threadPool.scheduleWithFixedDelay(new RoutingTableUpdater(), schedule);
}
if (event.nodesRemoved()) {
// if nodes were removed, we don't want to wait for the scheduled task
// since we want to get primary election as fast as possible
routingTableDirty = true;
reroute();
// Commented out since we make sure to reroute whenever shards changes state or metadata changes state
// } else if (event.routingTableChanged()) {
// routingTableDirty = true;
// reroute();
} else {
if (event.nodesAdded()) {
for (DiscoveryNode node : event.nodesDelta().addedNodes()) {
if (node.dataNode()) {
routingTableDirty = true;
break;
}
}
}
}

// figure out when the next unassigned allocation need to happen from now. If this is larger or equal
// then the last time we checked and scheduled, we are guaranteed to have a reroute until then, so no need
// to schedule again
Expand All @@ -146,9 +109,8 @@ public void clusterChanged(ClusterChangedEvent event) {
registeredNextDelayFuture = threadPool.schedule(nextDelay, ThreadPool.Names.SAME, new AbstractRunnable() {
@Override
protected void doRun() throws Exception {
routingTableDirty = true;
registeredNextDelaySetting = Long.MAX_VALUE;
reroute();
reroute("assign delayed unassigned shards");
}

@Override
Expand All @@ -159,25 +121,26 @@ public void onFailure(Throwable t) {
} else {
logger.trace("no need to schedule reroute due to delayed unassigned, next_delay_setting [{}], registered [{}]", nextDelaySetting, registeredNextDelaySetting);
}
} else {
FutureUtils.cancel(scheduledRoutingTableFuture);
scheduledRoutingTableFuture = null;
}
}

private void reroute() {
// visible for testing
long getRegisteredNextDelaySetting() {
return this.registeredNextDelaySetting;
}

// visible for testing
void performReroute(String reason) {
try {
if (!routingTableDirty) {
return;
}
if (lifecycle.stopped()) {
return;
}
if (rerouting.compareAndSet(false, true) == false) {
logger.trace("already has pending reroute, ignoring");
logger.trace("already has pending reroute, ignoring {}", reason);
return;
}
clusterService.submitStateUpdateTask(CLUSTER_UPDATE_TASK_SOURCE, Priority.HIGH, new ClusterStateUpdateTask() {
logger.trace("rerouting {}", reason);
clusterService.submitStateUpdateTask(CLUSTER_UPDATE_TASK_SOURCE + "(" + reason + ")", Priority.HIGH, new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
rerouting.set(false);
Expand Down Expand Up @@ -206,19 +169,10 @@ public void onFailure(String source, Throwable t) {
}
}
});
routingTableDirty = false;
} catch (Throwable e) {
rerouting.set(false);
ClusterState state = clusterService.state();
logger.warn("Failed to reroute routing table, current state:\n{}", e, state.prettyPrint());
}
}

private class RoutingTableUpdater implements Runnable {

@Override
public void run() {
reroute();
logger.warn("failed to reroute routing table, current state:\n{}", e, state.prettyPrint());
}
}
}
Expand Up @@ -174,7 +174,9 @@ public ClusterState execute(ClusterState currentState) {
nodesBuilder.put(discovery.localNode);
}
nodesBuilder.localNodeId(master.localNode().id()).masterNodeId(master.localNode().id());
return ClusterState.builder(currentState).nodes(nodesBuilder).build();
ClusterState updatedState = ClusterState.builder(currentState).nodes(nodesBuilder).build();
RoutingAllocation.Result routingResult = master.allocationService.reroute(ClusterState.builder(updatedState).build());
return ClusterState.builder(updatedState).routingResult(routingResult).build();
}

@Override
Expand Down
Expand Up @@ -950,7 +950,10 @@ public ClusterState execute(ClusterState currentState) {
if (modified) {
stateBuilder.nodes(nodesBuilder);
}
return stateBuilder.build();
currentState = stateBuilder.build();
// eagerly run reroute to apply the node addition
RoutingAllocation.Result result = allocationService.reroute(currentState);
return ClusterState.builder(currentState).routingResult(result).build();
}

@Override
Expand Down
Expand Up @@ -32,17 +32,12 @@
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.MutableShardRouting;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.routing.*;
import org.elasticsearch.cluster.routing.allocation.FailedRerouteAllocation;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.routing.allocation.StartedRerouteAllocation;
import org.elasticsearch.cluster.routing.allocation.allocator.GatewayAllocator;
import org.elasticsearch.cluster.routing.allocation.decider.Decision;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.lease.Releasables;
Expand All @@ -60,7 +55,6 @@

import java.util.*;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;

/**
*
Expand All @@ -73,8 +67,7 @@ public class LocalGatewayAllocator extends AbstractComponent implements GatewayA

private final TransportNodesListGatewayStartedShards startedAction;
private final TransportNodesListShardStoreMetaData storeAction;
private ClusterService clusterService;
private AllocationService allocationService;
private RoutingService routingService;

private final ConcurrentMap<ShardId, AsyncShardFetch<TransportNodesListGatewayStartedShards.NodeLocalGatewayStartedShards>> asyncFetchStarted = ConcurrentCollections.newConcurrentMap();
private final ConcurrentMap<ShardId, AsyncShardFetch<TransportNodesListShardStoreMetaData.NodeStoreFilesMetaData>> asyncFetchStore = ConcurrentCollections.newConcurrentMap();
Expand All @@ -90,9 +83,8 @@ public LocalGatewayAllocator(Settings settings, TransportNodesListGatewayStarted
logger.debug("using initial_shards [{}]", initialShards);
}

public void setReallocation(final ClusterService clusterService, final AllocationService allocationService) {
this.clusterService = clusterService;
this.allocationService = allocationService;
public void setReallocation(final ClusterService clusterService, final RoutingService routingService) {
this.routingService = routingService;
clusterService.add(new ClusterStateListener() {
@Override
public void clusterChanged(ClusterChangedEvent event) {
Expand Down Expand Up @@ -541,8 +533,6 @@ public int compare(DiscoveryNode o1, DiscoveryNode o2) {
return changed;
}

private final AtomicBoolean rerouting = new AtomicBoolean();

class InternalAsyncFetch<T extends NodeOperationResponse> extends AsyncShardFetch<T> {

public InternalAsyncFetch(ESLogger logger, String type, ShardId shardId, List<? extends NodesOperationResponse<T>, T> action) {
Expand All @@ -551,30 +541,8 @@ public InternalAsyncFetch(ESLogger logger, String type, ShardId shardId, List<?

@Override
protected void reroute(ShardId shardId, String reason) {
if (rerouting.compareAndSet(false, true) == false) {
logger.trace("{} already has pending reroute, ignoring {}", shardId, reason);
return;
}
clusterService.submitStateUpdateTask("async_shard_fetch", Priority.HIGH, new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
rerouting.set(false);
if (currentState.nodes().masterNode() == null) {
return currentState;
}
RoutingAllocation.Result routingResult = allocationService.reroute(currentState);
if (!routingResult.changed()) {
return currentState;
}
return ClusterState.builder(currentState).routingResult(routingResult).build();
}

@Override
public void onFailure(String source, Throwable t) {
rerouting.set(false);
logger.warn("failed to perform reroute post async fetch for {}", t, source);
}
});
logger.trace("{} scheduling reroute for {}", shardId, reason);
routingService.reroute("async_shard_fetch");
}
}
}
Expand Up @@ -259,7 +259,7 @@ public Node start() {
injector.getInstance(RestController.class).start();

// TODO hack around circular dependecncies problems
injector.getInstance(LocalGatewayAllocator.class).setReallocation(injector.getInstance(ClusterService.class), injector.getInstance(AllocationService.class));
injector.getInstance(LocalGatewayAllocator.class).setReallocation(injector.getInstance(ClusterService.class), injector.getInstance(RoutingService.class));

DiscoveryService discoService = injector.getInstance(DiscoveryService.class).start();
discoService.waitForInitialState();
Expand Down
Expand Up @@ -47,7 +47,6 @@ public class PercolatorStressBenchmark {

public static void main(String[] args) throws Exception {
Settings settings = settingsBuilder()
.put("cluster.routing.schedule", 200, TimeUnit.MILLISECONDS)
.put("gateway.type", "none")
.put(SETTING_NUMBER_OF_SHARDS, 4)
.put(SETTING_NUMBER_OF_REPLICAS, 0)
Expand Down
Expand Up @@ -60,7 +60,6 @@ protected int numberOfReplicas() {
@Test
public void testSimpleAwareness() throws Exception {
Settings commonSettings = ImmutableSettings.settingsBuilder()
.put("cluster.routing.schedule", "10ms")
.put("cluster.routing.allocation.awareness.attributes", "rack_id")
.build();

Expand Down

0 comments on commit 9223ceb

Please sign in to comment.