Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid extra reroutes of delayed shards in RoutingService #12532

Merged
merged 1 commit into from Aug 5, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -272,11 +272,15 @@ private ClusterHealthResponse clusterHealth(ClusterHealthRequest request, Cluste
concreteIndices = clusterState.metaData().concreteIndices(request.indicesOptions(), request.indices());
} catch (IndexMissingException e) {
// one of the specified indices is not there - treat it as RED.
ClusterHealthResponse response = new ClusterHealthResponse(clusterName.value(), Strings.EMPTY_ARRAY, clusterState, numberOfPendingTasks, numberOfInFlightFetch, UnassignedInfo.getNumberOfDelayedUnassigned(settings, clusterState));
ClusterHealthResponse response = new ClusterHealthResponse(clusterName.value(),
Strings.EMPTY_ARRAY, clusterState, numberOfPendingTasks, numberOfInFlightFetch,
UnassignedInfo.getNumberOfDelayedUnassigned(System.currentTimeMillis(), settings, clusterState));
response.status = ClusterHealthStatus.RED;
return response;
}

return new ClusterHealthResponse(clusterName.value(), concreteIndices, clusterState, numberOfPendingTasks, numberOfInFlightFetch, UnassignedInfo.getNumberOfDelayedUnassigned(settings, clusterState));
return new ClusterHealthResponse(clusterName.value(),
concreteIndices, clusterState, numberOfPendingTasks, numberOfInFlightFetch,
UnassignedInfo.getNumberOfDelayedUnassigned(System.currentTimeMillis(), settings, clusterState));
}
}
51 changes: 37 additions & 14 deletions src/main/java/org/elasticsearch/cluster/routing/RoutingService.java
Expand Up @@ -57,6 +57,7 @@ public class RoutingService extends AbstractLifecycleComponent<RoutingService> i
private AtomicBoolean rerouting = new AtomicBoolean();
private volatile long registeredNextDelaySetting = Long.MAX_VALUE;
private volatile ScheduledFuture registeredNextDelayFuture;
private volatile long unassignedShardsAllocatedTimestamp = 0;

@Inject
public RoutingService(Settings settings, ThreadPool threadPool, ClusterService clusterService, AllocationService allocationService) {
Expand Down Expand Up @@ -87,6 +88,19 @@ public AllocationService getAllocationService() {
return this.allocationService;
}

/**
* Update the last time the allocator tried to assign unassigned shards
*
* This is used so that both the GatewayAllocator and RoutingService use a
* consistent timestamp for comparing which shards have been delayed to
* avoid a race condition where GatewayAllocator thinks the shard should
* be delayed and the RoutingService thinks it has already passed the delay
* and that the GatewayAllocator has/will handle it.
*/
public void setUnassignedShardsAllocatedTimestamp(long timeInMillis) {
this.unassignedShardsAllocatedTimestamp = timeInMillis;
}

/**
* Initiates a reroute.
*/
Expand All @@ -108,20 +122,29 @@ public void clusterChanged(ClusterChangedEvent event) {
if (nextDelaySetting > 0 && nextDelaySetting < registeredNextDelaySetting) {
FutureUtils.cancel(registeredNextDelayFuture);
registeredNextDelaySetting = nextDelaySetting;
TimeValue nextDelay = TimeValue.timeValueMillis(UnassignedInfo.findNextDelayedAllocationIn(settings, event.state()));
logger.info("delaying allocation for [{}] unassigned shards, next check in [{}]", UnassignedInfo.getNumberOfDelayedUnassigned(settings, event.state()), nextDelay);
registeredNextDelayFuture = threadPool.schedule(nextDelay, ThreadPool.Names.SAME, new AbstractRunnable() {
@Override
protected void doRun() throws Exception {
registeredNextDelaySetting = Long.MAX_VALUE;
reroute("assign delayed unassigned shards");
}

@Override
public void onFailure(Throwable t) {
logger.warn("failed to schedule/execute reroute post unassigned shard", t);
}
});
// We use System.currentTimeMillis here because we want the
// next delay from the "now" perspective, rather than the
// delay from the last time the GatewayAllocator tried to
// assign/delay the shard
TimeValue nextDelay = TimeValue.timeValueMillis(UnassignedInfo.findNextDelayedAllocationIn(System.currentTimeMillis(), settings, event.state()));
int unassignedDelayedShards = UnassignedInfo.getNumberOfDelayedUnassigned(unassignedShardsAllocatedTimestamp, settings, event.state());
if (unassignedDelayedShards > 0) {
logger.info("delaying allocation for [{}] unassigned shards, next check in [{}]",
unassignedDelayedShards, nextDelay);
registeredNextDelayFuture = threadPool.schedule(nextDelay, ThreadPool.Names.SAME, new AbstractRunnable() {
@Override
protected void doRun() throws Exception {
registeredNextDelaySetting = Long.MAX_VALUE;
reroute("assign delayed unassigned shards");
}

@Override
public void onFailure(Throwable t) {
logger.warn("failed to schedule/execute reroute post unassigned shard", t);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we probably want to rest here as well: registeredNextDelaySetting = Long.MAX_VALUE;

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes good idea, otherwise it assumes one was running.

registeredNextDelaySetting = Long.MAX_VALUE;
}
});
}
} else {
logger.trace("no need to schedule reroute due to delayed unassigned, next_delay_setting [{}], registered [{}]", nextDelaySetting, registeredNextDelaySetting);
}
Expand Down
Expand Up @@ -167,12 +167,12 @@ public long getAllocationDelayTimeoutSetting(Settings settings, Settings indexSe
/**
* The time in millisecond until this unassigned shard can be reassigned.
*/
public long getDelayAllocationExpirationIn(Settings settings, Settings indexSettings) {
public long getDelayAllocationExpirationIn(long unassignedShardsAllocatedTimestamp, Settings settings, Settings indexSettings) {
long delayTimeout = getAllocationDelayTimeoutSetting(settings, indexSettings);
if (delayTimeout == 0) {
return 0;
}
long delta = System.currentTimeMillis() - timestamp;
long delta = unassignedShardsAllocatedTimestamp - timestamp;
// account for time drift, treat it as no timeout
if (delta < 0) {
return 0;
Expand All @@ -184,12 +184,12 @@ public long getDelayAllocationExpirationIn(Settings settings, Settings indexSett
/**
* Returns the number of shards that are unassigned and currently being delayed.
*/
public static int getNumberOfDelayedUnassigned(Settings settings, ClusterState state) {
public static int getNumberOfDelayedUnassigned(long unassignedShardsAllocatedTimestamp, Settings settings, ClusterState state) {
int count = 0;
for (ShardRouting shard : state.routingTable().shardsWithState(ShardRoutingState.UNASSIGNED)) {
if (shard.primary() == false) {
IndexMetaData indexMetaData = state.metaData().index(shard.getIndex());
long delay = shard.unassignedInfo().getDelayAllocationExpirationIn(settings, indexMetaData.getSettings());
long delay = shard.unassignedInfo().getDelayAllocationExpirationIn(unassignedShardsAllocatedTimestamp, settings, indexMetaData.getSettings());
if (delay > 0) {
count++;
}
Expand Down Expand Up @@ -219,12 +219,12 @@ public static long findSmallestDelayedAllocationSetting(Settings settings, Clust
/**
* Finds the next (closest) delay expiration of an unassigned shard. Returns 0 if there are none.
*/
public static long findNextDelayedAllocationIn(Settings settings, ClusterState state) {
public static long findNextDelayedAllocationIn(long unassignedShardsAllocatedTimestamp, Settings settings, ClusterState state) {
long nextDelay = Long.MAX_VALUE;
for (ShardRouting shard : state.routingTable().shardsWithState(ShardRoutingState.UNASSIGNED)) {
if (shard.primary() == false) {
IndexMetaData indexMetaData = state.metaData().index(shard.getIndex());
long nextShardDelay = shard.unassignedInfo().getDelayAllocationExpirationIn(settings, indexMetaData.getSettings());
long nextShardDelay = shard.unassignedInfo().getDelayAllocationExpirationIn(unassignedShardsAllocatedTimestamp, settings, indexMetaData.getSettings());
if (nextShardDelay > 0 && nextShardDelay < nextDelay) {
nextDelay = nextShardDelay;
}
Expand Down
Expand Up @@ -145,6 +145,11 @@ private boolean recoverOnAnyNode(@IndexSettings Settings idxSettings) {

@Override
public boolean allocateUnassigned(RoutingAllocation allocation) {
// Take a snapshot of the current time and tell the RoutingService
// about it, so it will use a consistent timestamp for delays
long lastAllocateUnassignedRun = System.currentTimeMillis();
this.routingService.setUnassignedShardsAllocatedTimestamp(lastAllocateUnassignedRun);

boolean changed = false;
DiscoveryNodes nodes = allocation.nodes();
RoutingNodes routingNodes = allocation.routingNodes();
Expand Down Expand Up @@ -526,7 +531,7 @@ public int compare(DiscoveryNode o1, DiscoveryNode o2) {
// note: we only care about replica in delayed allocation, since if we have an unassigned primary it
// will anyhow wait to find an existing copy of the shard to be allocated
// note: the other side of the equation is scheduling a reroute in a timely manner, which happens in the RoutingService
long delay = shard.unassignedInfo().getDelayAllocationExpirationIn(settings, indexMetaData.getSettings());
long delay = shard.unassignedInfo().getDelayAllocationExpirationIn(lastAllocateUnassignedRun, settings, indexMetaData.getSettings());
if (delay > 0) {
logger.debug("[{}][{}]: delaying allocation of [{}] for [{}]", shard.index(), shard.id(), shard, TimeValue.timeValueMillis(delay));
/**
Expand Down
Expand Up @@ -19,11 +19,13 @@

package org.elasticsearch.cluster.allocation;

import org.apache.lucene.util.LuceneTestCase;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus;
import org.elasticsearch.action.admin.cluster.reroute.ClusterRerouteResponse;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.RoutingService;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.allocation.RerouteExplanation;
Expand All @@ -33,15 +35,19 @@
import org.elasticsearch.cluster.routing.allocation.decider.Decision;
import org.elasticsearch.cluster.routing.allocation.decider.DisableAllocationDecider;
import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider;
import org.elasticsearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.io.FileSystemUtils;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope;
import org.elasticsearch.test.InternalTestCluster;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.junit.Test;

import java.io.File;
Expand Down Expand Up @@ -162,6 +168,45 @@ public void rerouteWithAllocateLocalGateway_enableAllocationSettings() throws Ex
rerouteWithAllocateLocalGateway(commonSettings);
}

/**
* Test that we don't miss any reroutes when concurrent_recoveries
* is set very low and there are a large number of unassigned shards.
*/
@Test
@LuceneTestCase.Slow
public void testDelayWithALargeAmountOfShards() throws Exception {
Settings commonSettings = settingsBuilder()
.put("gateway.type", "local")
.put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_CONCURRENT_RECOVERIES, 1)
.build();
logger.info("--> starting 4 nodes");
String node_1 = internalCluster().startNode(commonSettings);
internalCluster().startNode(commonSettings);
internalCluster().startNode(commonSettings);
internalCluster().startNode(commonSettings);

assertThat(cluster().size(), equalTo(4));
ClusterHealthResponse healthResponse = client().admin().cluster().prepareHealth().setWaitForNodes("4").execute().actionGet();
assertThat(healthResponse.isTimedOut(), equalTo(false));

logger.info("--> create indices");
for (int i = 0; i < 25; i++) {
client().admin().indices().prepareCreate("test" + i)
.setSettings(settingsBuilder()
.put("index.number_of_shards", 5).put("index.number_of_replicas", 1)
.put("index.unassigned.node_left.delayed_timeout", randomIntBetween(250, 1000) + "ms"))
.execute().actionGet();
}

ensureGreen(TimeValue.timeValueMinutes(1));

logger.info("--> stopping node1");
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(node_1));

// This might run slowly on older hardware
ensureGreen(TimeValue.timeValueMinutes(2));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yikes! Is that really 2 minutes of 1 cpu running 100%?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some of our CI nodes are reeeeeaaaalllly slow, and since this creates 25 indices with 10 total shards each, I just wanted to make sure it doesn't time out. I do not expect it to take 2 minutes regardless (it's much faster for me). Another reason this test in annotated with @Slow

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we configure the delayed allocation to not be the default (1m) but something high enough to trigger what we are trying to fix, like 200ms? This will speed up the test.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I already randomize it between 4 and 15 seconds (since it should find the lowest delay setting), but I can lower it even more.

}

private void rerouteWithAllocateLocalGateway(Settings commonSettings) throws Exception {
logger.info("--> starting 2 nodes");
String node_1 = internalCluster().startNode(commonSettings);
Expand Down
Expand Up @@ -27,7 +27,9 @@
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.test.ElasticsearchAllocationTestCase;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.elasticsearch.threadpool.ThreadPool;
import org.junit.After;
import org.junit.Before;
Expand Down Expand Up @@ -91,6 +93,7 @@ public void testNoDelayedUnassigned() throws Exception {
}

@Test
@TestLogging("_root:DEBUG")
public void testDelayedUnassignedScheduleReroute() throws Exception {
AllocationService allocation = createAllocationService();
MetaData metaData = MetaData.builder()
Expand All @@ -111,6 +114,10 @@ public void testDelayedUnassignedScheduleReroute() throws Exception {
ClusterState prevState = clusterState;
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder(clusterState.nodes()).remove("node2")).build();
clusterState = ClusterState.builder(clusterState).routingResult(allocation.reroute(clusterState)).build();
// We need to update the routing service's last attempted run to
// signal that the GatewayAllocator tried to allocated it but
// it was delayed
routingService.setUnassignedShardsAllocatedTimestamp(System.currentTimeMillis());
ClusterState newState = clusterState;

routingService.clusterChanged(new ClusterChangedEvent("test", newState, prevState));
Expand All @@ -124,6 +131,44 @@ public void run() {
assertThat(routingService.getRegisteredNextDelaySetting(), equalTo(Long.MAX_VALUE));
}

@Test
public void testDelayedUnassignedDoesNotRerouteForNegativeDelays() throws Exception {
AllocationService allocation = createAllocationService();
MetaData metaData = MetaData.builder()
.put(IndexMetaData.builder("test").settings(ImmutableSettings.builder().put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING, "100ms"))
.numberOfShards(1).numberOfReplicas(1))
.build();
ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT)
.metaData(metaData)
.routingTable(RoutingTable.builder().addAsNew(metaData.index("test"))).build();
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder().put(newNode("node1")).put(newNode("node2")).localNodeId("node1").masterNodeId("node1")).build();
clusterState = ClusterState.builder(clusterState).routingResult(allocation.reroute(clusterState)).build();
// starting primaries
clusterState = ClusterState.builder(clusterState).routingResult(allocation.applyStartedShards(clusterState, clusterState.routingNodes().shardsWithState(INITIALIZING))).build();
// starting replicas
clusterState = ClusterState.builder(clusterState).routingResult(allocation.applyStartedShards(clusterState, clusterState.routingNodes().shardsWithState(INITIALIZING))).build();
assertThat(clusterState.routingNodes().hasUnassigned(), equalTo(false));
// remove node2 and reroute
ClusterState prevState = clusterState;
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder(clusterState.nodes()).remove("node2")).build();
clusterState = ClusterState.builder(clusterState).routingResult(allocation.reroute(clusterState)).build();
// Set it in the future so the delay will be negative
routingService.setUnassignedShardsAllocatedTimestamp(System.currentTimeMillis() + TimeValue.timeValueMinutes(1).millis());

ClusterState newState = clusterState;

routingService.clusterChanged(new ClusterChangedEvent("test", newState, prevState));
assertBusy(new Runnable() {
@Override
public void run() {
assertThat(routingService.hasReroutedAndClear(), equalTo(false));

// verify the registration has been updated
assertThat(routingService.getRegisteredNextDelaySetting(), equalTo(100L));
}
});
}

private class TestRoutingService extends RoutingService {

private AtomicBoolean rerouted = new AtomicBoolean();
Expand Down