Skip to content

Commit

Permalink
Failed shards could be re-assigned to the same nodes if multiple repl…
Browse files Browse the repository at this point in the history
…icas failed at once

After a shard fails on a node we assign a new replica on another node. This is important in order to avoid failing again due to node specific problems. In the rare case where two different replicas of the same shard failed in a short time span, we may fail to do so and assign one of them back to the node it's currently on. This happens if both shard failed events are processed within the same batch on the master.

Closes #5725
  • Loading branch information
bleskes committed Apr 8, 2014
1 parent abc453d commit 1f25a9d
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 18 deletions.
Expand Up @@ -29,7 +29,9 @@
import org.elasticsearch.index.shard.ShardId;

import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;

/**
* The {@link RoutingAllocation} keep the state of the current allocation
Expand Down Expand Up @@ -107,7 +109,7 @@ public RoutingExplanations explanations() {

private final ClusterInfo clusterInfo;

private Map<ShardId, String> ignoredShardToNodes = null;
private Map<ShardId, Set<String>> ignoredShardToNodes = null;

private boolean ignoreDisable = false;

Expand Down Expand Up @@ -197,13 +199,22 @@ public boolean debugDecision() {

public void addIgnoreShardForNode(ShardId shardId, String nodeId) {
if (ignoredShardToNodes == null) {
ignoredShardToNodes = new HashMap<ShardId, String>();
ignoredShardToNodes = new HashMap<ShardId, Set<String>>();
}
ignoredShardToNodes.put(shardId, nodeId);
Set<String> nodes = ignoredShardToNodes.get(shardId);
if (nodes == null) {
nodes = new HashSet<String>();
ignoredShardToNodes.put(shardId, nodes);
}
nodes.add(nodeId);
}

public boolean shouldIgnoreShardForNode(ShardId shardId, String nodeId) {
return ignoredShardToNodes != null && nodeId.equals(ignoredShardToNodes.get(shardId));
if (ignoredShardToNodes == null) {
return false;
}
Set<String> nodes = ignoredShardToNodes.get(shardId);
return nodes != null && nodes.contains(nodeId);
}

/**
Expand Down
Expand Up @@ -60,11 +60,7 @@
import org.elasticsearch.index.shard.service.IndexShard;
import org.elasticsearch.index.shard.service.InternalIndexShard;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.recovery.RecoveryFailedException;
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.indices.recovery.RecoveryStatus;
import org.elasticsearch.indices.recovery.RecoveryTarget;
import org.elasticsearch.indices.recovery.StartRecoveryRequest;
import org.elasticsearch.indices.recovery.*;
import org.elasticsearch.threadpool.ThreadPool;

import java.util.HashMap;
Expand Down
Expand Up @@ -23,17 +23,16 @@
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.ImmutableShardRouting;
import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.*;
import org.elasticsearch.cluster.routing.allocation.command.AllocationCommands;
import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationCommand;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.test.ElasticsearchAllocationTestCase;
import org.junit.Test;

import java.util.ArrayList;

import static org.elasticsearch.cluster.routing.ShardRoutingState.*;
import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
import static org.hamcrest.Matchers.*;
Expand Down Expand Up @@ -63,8 +62,8 @@ public void testFailedShardPrimaryRelocatingToAndFrom() {

logger.info("--> adding 2 nodes on same rack and do rerouting");
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder()
.put(newNode("node1"))
.put(newNode("node2"))
.put(newNode("node1"))
.put(newNode("node2"))
).build();

RoutingAllocation.Result rerouteResult = allocation.reroute(clusterState);
Expand All @@ -85,7 +84,7 @@ public void testFailedShardPrimaryRelocatingToAndFrom() {

logger.info("--> adding additional node");
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder(clusterState.nodes())
.put(newNode("node3"))
.put(newNode("node3"))
).build();
rerouteResult = allocation.reroute(clusterState);
clusterState = ClusterState.builder(clusterState).routingTable(rerouteResult.routingTable()).build();
Expand All @@ -101,7 +100,7 @@ public void testFailedShardPrimaryRelocatingToAndFrom() {

logger.info("--> moving primary shard to node3");
rerouteResult = allocation.reroute(clusterState, new AllocationCommands(
new MoveAllocationCommand(clusterState.routingTable().index("test").shard(0).primaryShard().shardId(), clusterState.routingTable().index("test").shard(0).primaryShard().currentNodeId(), "node3"))
new MoveAllocationCommand(clusterState.routingTable().index("test").shard(0).primaryShard().shardId(), clusterState.routingTable().index("test").shard(0).primaryShard().currentNodeId(), "node3"))
);
assertThat(rerouteResult.changed(), equalTo(true));
clusterState = ClusterState.builder(clusterState).routingTable(rerouteResult.routingTable()).build();
Expand All @@ -117,7 +116,7 @@ public void testFailedShardPrimaryRelocatingToAndFrom() {

logger.info("--> moving primary shard to node3");
rerouteResult = allocation.reroute(clusterState, new AllocationCommands(
new MoveAllocationCommand(clusterState.routingTable().index("test").shard(0).primaryShard().shardId(), clusterState.routingTable().index("test").shard(0).primaryShard().currentNodeId(), "node3"))
new MoveAllocationCommand(clusterState.routingTable().index("test").shard(0).primaryShard().shardId(), clusterState.routingTable().index("test").shard(0).primaryShard().currentNodeId(), "node3"))
);
assertThat(rerouteResult.changed(), equalTo(true));
clusterState = ClusterState.builder(clusterState).routingTable(rerouteResult.routingTable()).build();
Expand Down Expand Up @@ -273,6 +272,63 @@ public void firstAllocationFailureSingleNode() {
assertThat(strategy.applyFailedShard(clusterState, new ImmutableShardRouting("test", 0, "node1", true, INITIALIZING, 0)).changed(), equalTo(false));
}

@Test
public void singleShardMultipleAllocationFailures() {
AllocationService strategy = createAllocationService(settingsBuilder()
.put("cluster.routing.allocation.concurrent_recoveries", 10)
.put("cluster.routing.allocation.allow_rebalance", "always")
.build());

logger.info("Building initial routing table");
int numberOfReplicas = scaledRandomIntBetween(2, 10);
MetaData metaData = MetaData.builder()
.put(IndexMetaData.builder("test").numberOfShards(1).numberOfReplicas(numberOfReplicas))
.build();

RoutingTable routingTable = RoutingTable.builder()
.addAsNew(metaData.index("test"))
.build();

ClusterState clusterState = ClusterState.builder(org.elasticsearch.cluster.ClusterName.DEFAULT).metaData(metaData).routingTable(routingTable).build();

logger.info("Adding {} nodes and performing rerouting", numberOfReplicas + 1);
DiscoveryNodes.Builder nodeBuilder = DiscoveryNodes.builder();
for (int i = 0; i < numberOfReplicas + 1; i++) {
nodeBuilder.put(newNode("node" + Integer.toString(i)));
}
clusterState = ClusterState.builder(clusterState).nodes(nodeBuilder).build();
while (!clusterState.routingTable().shardsWithState(UNASSIGNED).isEmpty()) {
// start all initializing
clusterState = ClusterState.builder(clusterState)
.routingTable(strategy
.applyStartedShards(clusterState, clusterState.routingTable().shardsWithState(INITIALIZING)).routingTable()
)
.build();
// and assign more unassigned
clusterState = ClusterState.builder(clusterState).routingTable(strategy.reroute(clusterState).routingTable()).build();
}

int shardsToFail = randomIntBetween(1, numberOfReplicas);
ArrayList<ShardRouting> failedShards = new ArrayList<ShardRouting>();
RoutingNodes routingNodes = clusterState.routingNodes();
for (int i = 0; i < shardsToFail; i++) {
String n = "node" + Integer.toString(randomInt(numberOfReplicas));
logger.info("failing shard on node [{}]", n);
ShardRouting shardToFail = routingNodes.node(n).get(0);
failedShards.add(new MutableShardRouting(shardToFail));
}

routingTable = strategy.applyFailedShards(clusterState, failedShards).routingTable();

clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
routingNodes = clusterState.routingNodes();
for (ShardRouting failedShard : failedShards) {
if (!routingNodes.node(failedShard.currentNodeId()).isEmpty()) {
fail("shard " + failedShard + " was re-assigned to it's node");
}
}
}

@Test
public void firstAllocationFailureTwoNodes() {
AllocationService strategy = createAllocationService(settingsBuilder()
Expand Down

0 comments on commit 1f25a9d

Please sign in to comment.