Permalink
Browse files

Primary shard failure with initializing replica shards can cause the …

…replica shard to cause allocation failures


fixes #2592
  • Loading branch information...
kimchy committed Jan 25, 2013
1 parent a7bb3c2 commit 042a5d02d9e12eca77bec39acf4496cbc1be716d
@@ -20,7 +20,6 @@
package org.elasticsearch.cluster.routing.allocation;
import com.google.common.collect.Lists;
-
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.ElasticSearchIllegalStateException;
import org.elasticsearch.cluster.ClusterState;
@@ -103,7 +102,7 @@ public AllocationService(Settings settings, AllocationDeciders allocationDecider
// shuffle the unassigned nodes, just so we won't have things like poison failed shards
Collections.shuffle(routingNodes.unassigned());
FailedRerouteAllocation allocation = new FailedRerouteAllocation(allocationDeciders, routingNodes, clusterState.nodes(), failedShard);
- boolean changed = applyFailedShard(allocation, failedShard);
+ boolean changed = applyFailedShard(allocation, failedShard, true);
if (!changed) {
return new RoutingAllocation.Result(false, clusterState.routingTable(), allocation.explanation());
}
@@ -165,7 +164,7 @@ public AllocationService(Settings settings, AllocationDeciders allocationDecider
// elect primaries *before* allocating unassigned, so backups of primaries that failed
// will be moved to primary state and not wait for primaries to be allocated and recovered (*from gateway*)
- changed |= electPrimaries(allocation.routingNodes());
+ changed |= electPrimariesAndUnassignDanglingReplicas(allocation);
if (!changed) {
return new RoutingAllocation.Result(false, clusterState.routingTable(), allocation.explanation());
@@ -185,13 +184,13 @@ private boolean reroute(RoutingAllocation allocation) {
// elect primaries *before* allocating unassigned, so backups of primaries that failed
// will be moved to primary state and not wait for primaries to be allocated and recovered (*from gateway*)
- changed |= electPrimaries(allocation.routingNodes());
+ changed |= electPrimariesAndUnassignDanglingReplicas(allocation);
// now allocate all the unassigned to available nodes
if (allocation.routingNodes().hasUnassigned()) {
changed |= shardsAllocators.allocateUnassigned(allocation);
// elect primaries again, in case this is needed with unassigned allocation
- changed |= electPrimaries(allocation.routingNodes());
+ changed |= electPrimariesAndUnassignDanglingReplicas(allocation);
}
// move shards that no longer can be allocated
@@ -242,8 +241,9 @@ private boolean moveShards(RoutingAllocation allocation) {
return changed;
}
- private boolean electPrimaries(RoutingNodes routingNodes) {
+ private boolean electPrimariesAndUnassignDanglingReplicas(RoutingAllocation allocation) {
boolean changed = false;
+ RoutingNodes routingNodes = allocation.routingNodes();
for (MutableShardRouting shardEntry : routingNodes.unassigned()) {
if (shardEntry.primary() && !shardEntry.assignedToNode()) {
boolean elected = false;
@@ -283,6 +283,29 @@ private boolean electPrimaries(RoutingNodes routingNodes) {
}
}
}
+
+ // go over and remove dangling replicas that are initializing, but we couldn't elect primary ones...
+ List<ShardRouting> shardsToFail = null;
+ for (MutableShardRouting shardEntry : routingNodes.unassigned()) {
+ if (shardEntry.primary() && !shardEntry.assignedToNode()) {
+ for (RoutingNode routingNode : routingNodes.nodesToShards().values()) {
+ for (MutableShardRouting shardEntry2 : routingNode.shards()) {
+ if (shardEntry.shardId().equals(shardEntry2.shardId()) && !shardEntry2.active()) {
+ changed = true;
+ if (shardsToFail == null) {
+ shardsToFail = new ArrayList<ShardRouting>();
+ }
+ shardsToFail.add(shardEntry2);
+ }
+ }
+ }
+ }
+ }
+ if (shardsToFail != null) {
+ for (ShardRouting shardToFail : shardsToFail) {
+ applyFailedShard(allocation, shardToFail, false);
+ }
+ }
return changed;
}
@@ -310,8 +333,7 @@ private boolean deassociateDeadNodes(RoutingAllocation allocation) {
changed = true;
// now, go over all the shards routing on the node, and fail them
for (MutableShardRouting shardRouting : new ArrayList<MutableShardRouting>(node.shards())) {
- // we create a copy of the shard routing, since applyFailedShard assumes its a new copy
- applyFailedShard(allocation, shardRouting);
+ applyFailedShard(allocation, shardRouting, false);
}
// its a dead node, remove it, note, its important to remove it *after* we apply failed shard
// since it relies on the fact that the RoutingNode exists in the list of nodes
@@ -372,7 +394,7 @@ private boolean applyStartedShards(RoutingNodes routingNodes, Iterable<? extends
* Applies the relevant logic to handle a failed shard. Returns <tt>true</tt> if changes happened that
* require relocation.
*/
- private boolean applyFailedShard(RoutingAllocation allocation, ShardRouting failedShard) {
+ private boolean applyFailedShard(RoutingAllocation allocation, ShardRouting failedShard, boolean addToIgnoreList) {
// create a copy of the failed shard, since we assume we can change possible refernces to it without
// changing the state of failed shard
failedShard = new ImmutableShardRouting(failedShard);
@@ -397,8 +419,10 @@ private boolean applyFailedShard(RoutingAllocation allocation, ShardRouting fail
it.remove();
shardRouting.deassignNode();
- // make sure we ignore this shard on the relevant node
- allocation.addIgnoreShardForNode(failedShard.shardId(), failedShard.currentNodeId());
+ if (addToIgnoreList) {
+ // make sure we ignore this shard on the relevant node
+ allocation.addIgnoreShardForNode(failedShard.shardId(), failedShard.currentNodeId());
+ }
break;
}
@@ -433,8 +457,10 @@ private boolean applyFailedShard(RoutingAllocation allocation, ShardRouting fail
dirty = true;
shardRouting.cancelRelocation();
it.remove();
- // make sure we ignore this shard on the relevant node
- allocation.addIgnoreShardForNode(failedShard.shardId(), failedShard.currentNodeId());
+ if (addToIgnoreList) {
+ // make sure we ignore this shard on the relevant node
+ allocation.addIgnoreShardForNode(failedShard.shardId(), failedShard.currentNodeId());
+ }
allocation.routingNodes().unassigned().add(new MutableShardRouting(failedShard.index(), failedShard.id(),
null, failedShard.primary(), ShardRoutingState.UNASSIGNED, failedShard.version() + 1));
@@ -469,8 +495,10 @@ private boolean applyFailedShard(RoutingAllocation allocation, ShardRouting fail
MutableShardRouting shardRouting = it.next();
if (shardRouting.equals(failedShard)) {
dirty = true;
- // make sure we ignore this shard on the relevant node
- allocation.addIgnoreShardForNode(failedShard.shardId(), failedShard.currentNodeId());
+ if (addToIgnoreList) {
+ // make sure we ignore this shard on the relevant node
+ allocation.addIgnoreShardForNode(failedShard.shardId(), failedShard.currentNodeId());
+ }
it.remove();
@@ -24,6 +24,7 @@
import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
+import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import org.testng.annotations.Test;
@@ -98,4 +99,51 @@ public void testBackupElectionToPrimaryWhenPrimaryCanBeAllocatedToAnotherNode()
assertThat(routingTable.index("test").shard(0).primaryShard().currentNodeId(), equalTo("node2"));
assertThat(routingTable.index("test").shard(0).replicaShards().get(0).currentNodeId(), equalTo("node3"));
}
+
+ @Test
+ public void testRemovingInitializingReplicasIfPrimariesFails() {
+ AllocationService allocation = new AllocationService(settingsBuilder().put("cluster.routing.allocation.concurrent_recoveries", 10).build());
+
+ logger.info("Building initial routing table");
+
+ MetaData metaData = newMetaDataBuilder()
+ .put(newIndexMetaDataBuilder("test").numberOfShards(2).numberOfReplicas(1))
+ .build();
+
+ RoutingTable routingTable = routingTable()
+ .addAsNew(metaData.index("test"))
+ .build();
+
+ ClusterState clusterState = newClusterStateBuilder().metaData(metaData).routingTable(routingTable).build();
+
+ logger.info("Adding two nodes and performing rerouting");
+ clusterState = newClusterStateBuilder().state(clusterState).nodes(newNodesBuilder().put(newNode("node1")).put(newNode("node2"))).build();
+ RoutingAllocation.Result rerouteResult = allocation.reroute(clusterState);
+ clusterState = newClusterStateBuilder().state(clusterState).routingTable(rerouteResult.routingTable()).build();
+
+ logger.info("Start the primary shards");
+ RoutingNodes routingNodes = clusterState.routingNodes();
+ rerouteResult = allocation.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING));
+ clusterState = newClusterStateBuilder().state(clusterState).routingTable(rerouteResult.routingTable()).build();
+ routingNodes = clusterState.routingNodes();
+
+ assertThat(routingNodes.shardsWithState(STARTED).size(), equalTo(2));
+ assertThat(routingNodes.shardsWithState(INITIALIZING).size(), equalTo(2));
+
+ // now, fail one node, while the replica is initializing, and it also holds a primary
+ logger.info("--> fail node with primary");
+ String nodeIdToFail = clusterState.routingTable().index("test").shard(0).primaryShard().currentNodeId();
+ String nodeIdRemaining = nodeIdToFail.equals("node1") ? "node2" : "node1";
+ clusterState = newClusterStateBuilder().state(clusterState).nodes(newNodesBuilder()
+ .put(newNode(nodeIdRemaining))
+ ).build();
+ rerouteResult = allocation.reroute(clusterState);
+ clusterState = newClusterStateBuilder().state(clusterState).routingTable(rerouteResult.routingTable()).build();
+ routingNodes = clusterState.routingNodes();
+
+ assertThat(routingNodes.shardsWithState(STARTED).size(), equalTo(1));
+ assertThat(routingNodes.shardsWithState(INITIALIZING).size(), equalTo(1));
+ assertThat(routingNodes.node(nodeIdRemaining).shardsWithState(INITIALIZING).get(0).primary(), equalTo(true));
+
+ }
}

0 comments on commit 042a5d0

Please sign in to comment.