Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve handling of failed primary replica handling #6825

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
16 changes: 16 additions & 0 deletions src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java
Expand Up @@ -310,6 +310,12 @@ public List<MutableShardRouting> shardsWithState(ShardRoutingState... state) {
for (RoutingNode routingNode : this) {
shards.addAll(routingNode.shardsWithState(state));
}
for (ShardRoutingState s : state) {
if (s == ShardRoutingState.UNASSIGNED) {
Iterables.addAll(shards, unassigned());
break;
}
}
return shards;
}

Expand All @@ -319,6 +325,16 @@ public List<MutableShardRouting> shardsWithState(String index, ShardRoutingState
for (RoutingNode routingNode : this) {
shards.addAll(routingNode.shardsWithState(index, state));
}
for (ShardRoutingState s : state) {
if (s == ShardRoutingState.UNASSIGNED) {
for (MutableShardRouting unassignedShard : unassignedShards) {
if (unassignedShard.index().equals(index)) {
shards.add(unassignedShard);
}
}
break;
}
}
return shards;
}

Expand Down
Expand Up @@ -192,7 +192,7 @@ public RoutingAllocation.Result rerouteWithNoReassign(ClusterState clusterState,

// elect primaries *before* allocating unassigned, so backups of primaries that failed
// will be moved to primary state and not wait for primaries to be allocated and recovered (*from gateway*)
changed |= electPrimariesAndUnassignDanglingReplicas(allocation);
changed |= electPrimariesAndUnassignedDanglingReplicas(allocation);

if (!changed) {
return new RoutingAllocation.Result(false, clusterState.routingTable());
Expand All @@ -210,13 +210,13 @@ private boolean reroute(RoutingAllocation allocation) {

// elect primaries *before* allocating unassigned, so backups of primaries that failed
// will be moved to primary state and not wait for primaries to be allocated and recovered (*from gateway*)
changed |= electPrimariesAndUnassignDanglingReplicas(allocation);
changed |= electPrimariesAndUnassignedDanglingReplicas(allocation);

// now allocate all the unassigned to available nodes
if (allocation.routingNodes().hasUnassigned()) {
changed |= shardsAllocators.allocateUnassigned(allocation);
// elect primaries again, in case this is needed with unassigned allocation
changed |= electPrimariesAndUnassignDanglingReplicas(allocation);
changed |= electPrimariesAndUnassignedDanglingReplicas(allocation);
}

// move shards that no longer can be allocated
Expand Down Expand Up @@ -269,13 +269,31 @@ private boolean moveShards(RoutingAllocation allocation) {
return changed;
}

private boolean electPrimariesAndUnassignDanglingReplicas(RoutingAllocation allocation) {
private boolean electPrimariesAndUnassignedDanglingReplicas(RoutingAllocation allocation) {
boolean changed = false;
RoutingNodes routingNodes = allocation.routingNodes();
if (!routingNodes.hasUnassignedPrimaries()) {
// move out if we don't have unassigned primaries
return changed;
}

// go over and remove dangling replicas that are initializing for primary shards
List<ShardRouting> shardsToFail = Lists.newArrayList();
for (MutableShardRouting shardEntry : routingNodes.unassigned()) {
if (shardEntry.primary()) {
for (MutableShardRouting routing : routingNodes.assignedShards(shardEntry)) {
if (!routing.primary() && routing.initializing()) {
shardsToFail.add(routing);
}
}
}
}
for (ShardRouting shardToFail : shardsToFail) {
changed |= applyFailedShard(allocation, shardToFail, false);
}

// now, go over and elect a new primary if possible, not, from this code block on, if one is elected,
// routingNodes.hasUnassignedPrimaries() will potentially be false
for (MutableShardRouting shardEntry : routingNodes.unassigned()) {
if (shardEntry.primary()) {
MutableShardRouting candidate = allocation.routingNodes().activeReplica(shardEntry);
Expand All @@ -298,28 +316,6 @@ private boolean electPrimariesAndUnassignDanglingReplicas(RoutingAllocation allo
}
}

// go over and remove dangling replicas that are initializing, but we couldn't elect primary ones...
List<ShardRouting> shardsToFail = null;
if (routingNodes.hasUnassignedPrimaries()) {
for (MutableShardRouting shardEntry : routingNodes.unassigned()) {
if (shardEntry.primary()) {
for (MutableShardRouting routing : routingNodes.assignedShards(shardEntry)) {
if (!routing.primary()) {
changed = true;
if (shardsToFail == null) {
shardsToFail = new ArrayList<>();
}
shardsToFail.add(routing);
}
}
}
}
if (shardsToFail != null) {
for (ShardRouting shardToFail : shardsToFail) {
applyFailedShard(allocation, shardToFail, false);
}
}
}
return changed;
}

Expand Down Expand Up @@ -421,23 +417,6 @@ private boolean applyFailedShard(RoutingAllocation allocation, ShardRouting fail

RoutingNodes routingNodes = allocation.routingNodes();
boolean dirty = false;
if (failedShard.primary()) {
// we have to fail the initializing replicas if the primary fails
// since they might now yet have started the recovery and then they will
// stick in the cluster-state forever since the replica has a retry logic that
// retries infinitely in that case.
List<MutableShardRouting> initializingReplicas = new ArrayList<>();
for (MutableShardRouting shard : routingNodes.assignedShards(failedShard)){
if (!shard.primary() && shard.initializing()) {
initializingReplicas.add(shard);
}
}
// we can't do this in the loop above since we modify the iterator and will get
// concurrent modification exceptions
for (MutableShardRouting shard : initializingReplicas) {
dirty |= applyFailedShard(allocation, shard, addToIgnoreList);
}
}
if (failedShard.relocatingNodeId() != null) {
// the shard is relocating, either in initializing (recovery from another node) or relocating (moving to another node)
if (failedShard.state() == INITIALIZING) {
Expand Down
Expand Up @@ -24,6 +24,7 @@
import com.carrotsearch.hppc.cursors.ObjectCursor;
import com.google.common.collect.Lists;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchIllegalStateException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterService;
Expand Down Expand Up @@ -541,6 +542,18 @@ private void applyNewOrUpdatedShards(final ClusterChangedEvent event) throws Ela
// cancel recovery just in case we are in recovery (its fine if we are not in recovery, it will be a noop).
recoveryTarget.cancelRecovery(indexShard);
indexService.removeShard(shardRouting.id(), "removing shard (different instance of it allocated on this node)");
} else if (isPeerRecovery(shardRouting)) {
// check if there is an existing recovery going, and if so, and the source node is not the same, cancel the recovery to restart it
RecoveryStatus recoveryStatus = recoveryTarget.recoveryStatus(indexShard.shardId());
if (recoveryStatus != null && recoveryStatus.stage() != RecoveryState.Stage.DONE) {
// we have an ongoing recovery, find the source based on current routing and compare them
DiscoveryNode sourceNode = findSourceNodeForPeerRecovery(routingTable, nodes, shardRouting);
if (!recoveryStatus.sourceNode().equals(sourceNode)) {
logger.debug("[{}][{}] removing shard (recovery source changed), current [{}], global [{}])", shardRouting.index(), shardRouting.id(), currentRoutingEntry, shardRouting);
recoveryTarget.cancelRecovery(indexShard);
indexService.removeShard(shardRouting.id(), "removing shard (recovery source node changed)");
}
}
}
}

Expand Down Expand Up @@ -630,34 +643,10 @@ private void applyInitializingShard(final RoutingTable routingTable, final Disco

// figure out where to recover from (node or disk, in which case sourceNode is null)
DiscoveryNode sourceNode = null;
if (!shardRouting.primary()) {
IndexShardRoutingTable shardRoutingTable = routingTable.index(shardRouting.index()).shard(shardRouting.id());
for (ShardRouting entry : shardRoutingTable) {
if (entry.primary() && entry.started()) {
// only recover from started primary, if we can't find one, we will do it next round
sourceNode = nodes.get(entry.currentNodeId());
if (sourceNode == null) {
logger.trace("can't recover replica because primary shard {} is assigned to an unknown node. ignoring.", entry);
return;
}
break;
}
}

if (sourceNode == null) {
logger.trace("can't recover replica for {} because a primary shard can not be found. ignoring.", shardRouting.shardId());
return;
}

} else if (shardRouting.relocatingNodeId() != null) {
sourceNode = nodes.get(shardRouting.relocatingNodeId());
if (sourceNode == null) {
logger.trace("can't recover from remote primary shard {} because it is assigned to an unknown node [{}]. ignoring.", shardRouting.shardId(), shardRouting.relocatingNodeId());
return;
}
if (isPeerRecovery(shardRouting)) {
sourceNode = findSourceNodeForPeerRecovery(routingTable, nodes, shardRouting);
}


// if there is no shard, create it
if (!indexService.hasShard(shardId)) {
if (failedShards.containsKey(shardRouting.shardId())) {
Expand Down Expand Up @@ -750,6 +739,45 @@ public void onRecoveryFailed(IndexShardGatewayRecoveryException e) {
}
}

/**
* Finds the routing source node for peer recovery, return null if its not found. Note, this method expects the shard
* routing to *require* peer recovery, use {@link #isPeerRecovery(org.elasticsearch.cluster.routing.ShardRouting)} to
* check if its needed or not.
*/
private DiscoveryNode findSourceNodeForPeerRecovery(RoutingTable routingTable, DiscoveryNodes nodes, ShardRouting shardRouting) {
DiscoveryNode sourceNode = null;
if (!shardRouting.primary()) {
IndexShardRoutingTable shardRoutingTable = routingTable.index(shardRouting.index()).shard(shardRouting.id());
for (ShardRouting entry : shardRoutingTable) {
if (entry.primary() && entry.started()) {
// only recover from started primary, if we can't find one, we will do it next round
sourceNode = nodes.get(entry.currentNodeId());
if (sourceNode == null) {
logger.trace("can't find replica source node because primary shard {} is assigned to an unknown node. ignoring.", entry);
return null;
}
break;
}
}

if (sourceNode == null) {
logger.trace("can't find replica source node for {} because a primary shard can not be found. ignoring.", shardRouting.shardId());
}
} else if (shardRouting.relocatingNodeId() != null) {
sourceNode = nodes.get(shardRouting.relocatingNodeId());
if (sourceNode == null) {
logger.trace("can't find relocation source node for shard {} because it is assigned to an unknown node [{}]. ignoring.", shardRouting.shardId(), shardRouting.relocatingNodeId());
}
} else {
throw new ElasticsearchIllegalStateException("trying to find source node for peer recovery when routing state means no peer recovery: " + shardRouting);
}
return sourceNode;
}

private boolean isPeerRecovery(ShardRouting shardRouting) {
return !shardRouting.primary() || shardRouting.relocatingNodeId() != null;
}

private class PeerRecoveryListener implements RecoveryTarget.RecoveryListener {

private final StartRecoveryRequest request;
Expand Down
Expand Up @@ -21,6 +21,7 @@

import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexOutput;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.service.InternalIndexShard;
Expand All @@ -41,10 +42,12 @@ public class RecoveryStatus {
final long recoveryId;
final InternalIndexShard indexShard;
final RecoveryState recoveryState;
final DiscoveryNode sourceNode;

public RecoveryStatus(long recoveryId, InternalIndexShard indexShard) {
public RecoveryStatus(long recoveryId, InternalIndexShard indexShard, DiscoveryNode sourceNode) {
this.recoveryId = recoveryId;
this.indexShard = indexShard;
this.sourceNode = sourceNode;
this.shardId = indexShard.shardId();
this.recoveryState = new RecoveryState(shardId);
recoveryState.getTimer().startTime(System.currentTimeMillis());
Expand All @@ -57,6 +60,10 @@ public RecoveryStatus(long recoveryId, InternalIndexShard indexShard) {
private volatile ConcurrentMap<String, IndexOutput> openIndexOutputs = ConcurrentCollections.newConcurrentMap();
public final Store.LegacyChecksums legacyChecksums = new Store.LegacyChecksums();

public DiscoveryNode sourceNode() {
return this.sourceNode;
}

public RecoveryState recoveryState() {
return recoveryState;
}
Expand Down
Expand Up @@ -163,7 +163,7 @@ public void startRecovery(final StartRecoveryRequest request, final InternalInde
return;
}
// create a new recovery status, and process...
final RecoveryStatus recoveryStatus = new RecoveryStatus(request.recoveryId(), indexShard);
final RecoveryStatus recoveryStatus = new RecoveryStatus(request.recoveryId(), indexShard, request.sourceNode());
recoveryStatus.recoveryState.setType(request.recoveryType());
recoveryStatus.recoveryState.setSourceNode(request.sourceNode());
recoveryStatus.recoveryState.setTargetNode(request.targetNode());
Expand Down