Skip to content

Commit

Permalink
Better handling of shard failures, closes #845.
Browse files Browse the repository at this point in the history
  • Loading branch information
kimchy committed Apr 10, 2011
1 parent 8eab5ec commit 7d8726a
Show file tree
Hide file tree
Showing 9 changed files with 247 additions and 271 deletions.
Expand Up @@ -107,20 +107,16 @@ private void innerShardFailed(final ShardRouting shardRouting, final String reas
logger.warn("received shard failed for {}, reason [{}]", shardRouting, reason);
clusterService.submitStateUpdateTask("shard-failed (" + shardRouting + "), reason [" + reason + "]", new ClusterStateUpdateTask() {
@Override public ClusterState execute(ClusterState currentState) {
RoutingTable routingTable = currentState.routingTable();
IndexRoutingTable indexRoutingTable = routingTable.index(shardRouting.index());
// if there is no routing table, the index has been deleted while it was being allocated
// which is fine, we should just ignore this
if (indexRoutingTable == null) {
return currentState;
}
if (logger.isDebugEnabled()) {
logger.debug("Applying failed shard {}, reason [{}]", shardRouting, reason);
logger.debug("Received failed shard {}, reason [{}]", shardRouting, reason);
}
RoutingAllocation.Result routingResult = shardsAllocation.applyFailedShards(currentState, newArrayList(shardRouting));
RoutingAllocation.Result routingResult = shardsAllocation.applyFailedShard(currentState, shardRouting);
if (!routingResult.changed()) {
return currentState;
}
if (logger.isDebugEnabled()) {
logger.debug("Applying failed shard {}, reason [{}]", shardRouting, reason);
}
return newClusterStateBuilder().state(currentState).routingResult(routingResult).build();
}
});
Expand Down
Expand Up @@ -23,21 +23,19 @@
import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.ShardRouting;

import java.util.List;

/**
* @author kimchy (shay.banon)
*/
public class FailedRerouteAllocation extends RoutingAllocation {

private final List<? extends ShardRouting> failedShards;
private final ShardRouting failedShard;

public FailedRerouteAllocation(RoutingNodes routingNodes, DiscoveryNodes nodes, List<? extends ShardRouting> failedShards) {
public FailedRerouteAllocation(RoutingNodes routingNodes, DiscoveryNodes nodes, ShardRouting failedShard) {
super(routingNodes, nodes);
this.failedShards = failedShards;
this.failedShard = failedShard;
}

public List<? extends ShardRouting> failedShards() {
return failedShards;
public ShardRouting failedShard() {
return failedShard;
}
}
Expand Up @@ -84,6 +84,11 @@ public NodeAllocations(Settings settings) {

@Override public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
Decision ret = Decision.YES;
// first, check if its in the ignored, if so, return NO
if (allocation.shouldIgnoreShardForNode(shardRouting.shardId(), node.nodeId())) {
return Decision.NO;
}
// now, go over the registered allocations
for (NodeAllocation allocation1 : allocations) {
Decision decision = allocation1.canAllocate(shardRouting, node, allocation);
if (decision == Decision.NO) {
Expand Down
Expand Up @@ -22,6 +22,10 @@
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.index.shard.ShardId;

import java.util.HashMap;
import java.util.Map;

/**
* @author kimchy (shay.banon)
Expand Down Expand Up @@ -61,6 +65,8 @@ public AllocationExplanation explanation() {

private final AllocationExplanation explanation = new AllocationExplanation();

private Map<ShardId, String> ignoredShardToNodes = null;

public RoutingAllocation(RoutingNodes routingNodes, DiscoveryNodes nodes) {
this.routingNodes = routingNodes;
this.nodes = nodes;
Expand All @@ -81,4 +87,15 @@ public DiscoveryNodes nodes() {
public AllocationExplanation explanation() {
return explanation;
}

public void addIgnoreShardForNode(ShardId shardId, String nodeId) {
if (ignoredShardToNodes == null) {
ignoredShardToNodes = new HashMap<ShardId, String>();
}
ignoredShardToNodes.put(shardId, nodeId);
}

public boolean shouldIgnoreShardForNode(ShardId shardId, String nodeId) {
return ignoredShardToNodes != null && nodeId.equals(ignoredShardToNodes.get(shardId));
}
}
Expand Up @@ -76,16 +76,15 @@ public RoutingAllocation.Result applyStartedShards(ClusterState clusterState, Li
*
* <p>If the same instance of the routing table is returned, then no change has been made.
*/
public RoutingAllocation.Result applyFailedShards(ClusterState clusterState, List<? extends ShardRouting> failedShards) {
public RoutingAllocation.Result applyFailedShard(ClusterState clusterState, ShardRouting failedShard) {
RoutingNodes routingNodes = clusterState.routingNodes();
FailedRerouteAllocation allocation = new FailedRerouteAllocation(routingNodes, clusterState.nodes(), failedShards);
nodeAllocations.applyFailedShards(nodeAllocations, allocation);
boolean changed = applyFailedShards(allocation);
FailedRerouteAllocation allocation = new FailedRerouteAllocation(routingNodes, clusterState.nodes(), failedShard);
boolean changed = applyFailedShard(allocation);
if (!changed) {
return new RoutingAllocation.Result(false, clusterState.routingTable(), allocation.explanation());
}
// If we reroute again, the failed shard will try and be assigned to the same node, which we do no do in the applyFailedShards
// reroute(routingNodes, clusterState.nodes());
nodeAllocations.applyFailedShards(nodeAllocations, allocation);
reroute(allocation);
return new RoutingAllocation.Result(true, new RoutingTable.Builder().updateNodes(routingNodes).build().validateRaiseException(clusterState.metaData()), allocation.explanation());
}

Expand Down Expand Up @@ -384,83 +383,75 @@ private boolean applyStartedShards(RoutingNodes routingNodes, Iterable<? extends
return dirty;
}

private boolean applyFailedShards(FailedRerouteAllocation allocation) {
boolean dirty = false;
// apply shards might be called several times with the same shard, ignore it
for (ShardRouting failedShard : allocation.failedShards()) {

boolean shardDirty = false;
boolean inRelocation = failedShard.relocatingNodeId() != null;
if (inRelocation) {
RoutingNode routingNode = allocation.routingNodes().nodesToShards().get(failedShard.currentNodeId());
if (routingNode != null) {
Iterator<MutableShardRouting> shards = routingNode.iterator();
while (shards.hasNext()) {
MutableShardRouting shard = shards.next();
if (shard.shardId().equals(failedShard.shardId())) {
shardDirty = true;
shard.deassignNode();
shards.remove();
break;
}
}
}
}

String nodeId = inRelocation ? failedShard.relocatingNodeId() : failedShard.currentNodeId();
RoutingNode currentRoutingNode = allocation.routingNodes().nodesToShards().get(nodeId);

if (currentRoutingNode == null) {
// already failed (might be called several times for the same shard)
continue;
}
/**
* Applies the relevant logic to handle a failed shard. Returns <tt>true</tt> if changes happened that
* require relocation.
*/
private boolean applyFailedShard(FailedRerouteAllocation allocation) {
IndexRoutingTable indexRoutingTable = allocation.routingTable().index(allocation.failedShard().index());
if (indexRoutingTable == null) {
return false;
}

Iterator<MutableShardRouting> shards = currentRoutingNode.iterator();
while (shards.hasNext()) {
MutableShardRouting shard = shards.next();
if (shard.shardId().equals(failedShard.shardId())) {
shardDirty = true;
if (!inRelocation) {
ShardRouting failedShard = allocation.failedShard();

boolean shardDirty = false;
boolean inRelocation = failedShard.relocatingNodeId() != null;
if (inRelocation) {
RoutingNode routingNode = allocation.routingNodes().nodesToShards().get(failedShard.currentNodeId());
if (routingNode != null) {
Iterator<MutableShardRouting> shards = routingNode.iterator();
while (shards.hasNext()) {
MutableShardRouting shard = shards.next();
if (shard.shardId().equals(failedShard.shardId())) {
shardDirty = true;
shard.deassignNode();
shards.remove();
} else {
shard.cancelRelocation();
break;
}
break;
}
}
}

if (!shardDirty) {
continue;
} else {
dirty = true;
}
String nodeId = inRelocation ? failedShard.relocatingNodeId() : failedShard.currentNodeId();
RoutingNode currentRoutingNode = allocation.routingNodes().nodesToShards().get(nodeId);

// if in relocation no need to find a new target, just cancel the relocation.
if (inRelocation) {
continue;
}
if (currentRoutingNode == null) {
// already failed (might be called several times for the same shard)
return false;
}

// not in relocation so find a new target.

boolean allocated = false;
List<RoutingNode> sortedNodesLeastToHigh = allocation.routingNodes().sortedNodesLeastToHigh();
for (RoutingNode target : sortedNodesLeastToHigh) {
if (!target.nodeId().equals(failedShard.currentNodeId()) &&
nodeAllocations.canAllocate(failedShard, target, allocation).allocate()) {
target.add(new MutableShardRouting(failedShard.index(), failedShard.id(),
target.nodeId(), failedShard.relocatingNodeId(),
failedShard.primary(), INITIALIZING));
allocated = true;
break;
Iterator<MutableShardRouting> shards = currentRoutingNode.iterator();
while (shards.hasNext()) {
MutableShardRouting shard = shards.next();
if (shard.shardId().equals(failedShard.shardId())) {
shardDirty = true;
if (!inRelocation) {
shard.deassignNode();
shards.remove();
} else {
shard.cancelRelocation();
}
}
if (!allocated) {
// we did not manage to allocate it, put it in the unassigned
allocation.routingNodes().unassigned().add(new MutableShardRouting(failedShard.index(), failedShard.id(),
null, failedShard.primary(), ShardRoutingState.UNASSIGNED));
break;
}
}
return dirty;

if (!shardDirty) {
return false;
}

// make sure we ignore this shard on the relevant node
allocation.addIgnoreShardForNode(failedShard.shardId(), failedShard.currentNodeId());

// if in relocation no need to find a new target, just cancel the relocation.
if (inRelocation) {
return true; // lets true, so we reroute in this case
}

// add the failed shard to the unassigned shards
allocation.routingNodes().unassigned().add(new MutableShardRouting(failedShard.index(), failedShard.id(),
null, failedShard.primary(), ShardRoutingState.UNASSIGNED));

return true;
}
}
Expand Up @@ -79,10 +79,8 @@ public class BlobReuseExistingNodeAllocation extends NodeAllocation {
}

@Override public void applyFailedShards(NodeAllocations nodeAllocations, FailedRerouteAllocation allocation) {
for (ShardRouting shardRouting : allocation.failedShards()) {
cachedCommitPoints.remove(shardRouting.shardId());
cachedStores.remove(shardRouting.shardId());
}
cachedCommitPoints.remove(allocation.failedShard().shardId());
cachedStores.remove(allocation.failedShard().shardId());
}

@Override public boolean allocateUnassigned(NodeAllocations nodeAllocations, RoutingAllocation allocation) {
Expand Down

0 comments on commit 7d8726a

Please sign in to comment.