Skip to content

Commit

Permalink
Do not start a recovery process if the primary shard is currently all…
Browse files Browse the repository at this point in the history
…ocated on a node which is not part of the cluster state

If a source node disconnect during recover, the target node will respond by canceling the recovery. Typically the master will respond by removing the disconnected node from the cluster state, promoting another shard to become primary. This is sent it to all nodes and the target node will start recovering from the new primary. However, if the drop of a node caused the node count to go bellow min_master_node, the master will step down and will not promote shard immediately. When a new master is elected we may publish a new cluster state (who's point is to notify of a new master) which is not yet updated. This caused the node to start a recovery to a non existent node. Before we aborted the recovery without cleaning up the shard, causing subsequent correct cluster states to be ignored. We should not start the recovery process but wait for another cluster state to come in.

Closes #6024
  • Loading branch information
bleskes committed May 2, 2014
1 parent 291a0fe commit c7a4e5b
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 55 deletions.
Expand Up @@ -367,7 +367,8 @@ private void applyMappings(ClusterChangedEvent event) {
if (sendRefreshMapping) {
nodeMappingRefreshAction.nodeMappingRefresh(event.state(),
new NodeMappingRefreshAction.NodeMappingRefreshRequest(index, indexMetaData.uuid(),
typesToRefresh.toArray(new String[typesToRefresh.size()]), event.state().nodes().localNodeId()));
typesToRefresh.toArray(new String[typesToRefresh.size()]), event.state().nodes().localNodeId())
);
}
}
// go over and remove mappings
Expand Down Expand Up @@ -489,7 +490,8 @@ private void applyNewOrUpdatedShards(final ClusterChangedEvent event) throws Ela
}

RoutingTable routingTable = event.state().routingTable();
RoutingNodes.RoutingNodeIterator routingNode = event.state().readOnlyRoutingNodes().routingNodeIter(event.state().nodes().localNodeId());;
RoutingNodes.RoutingNodeIterator routingNode = event.state().readOnlyRoutingNodes().routingNodeIter(event.state().nodes().localNodeId());

if (routingNode == null) {
failedShards.clear();
return;
Expand Down Expand Up @@ -551,7 +553,8 @@ private void applyNewOrUpdatedShards(final ClusterChangedEvent event) throws Ela

private void cleanFailedShards(final ClusterChangedEvent event) {
RoutingTable routingTable = event.state().routingTable();
RoutingNodes.RoutingNodeIterator routingNode = event.state().readOnlyRoutingNodes().routingNodeIter(event.state().nodes().localNodeId());;
RoutingNodes.RoutingNodeIterator routingNode = event.state().readOnlyRoutingNodes().routingNodeIter(event.state().nodes().localNodeId());

if (routingNode == null) {
failedShards.clear();
return;
Expand Down Expand Up @@ -611,10 +614,42 @@ private void applyInitializingShard(final RoutingTable routingTable, final Disco
return;
} else {
if (indexShard.ignoreRecoveryAttempt()) {
logger.trace("ignoring recovery instruction for an existing shard {} (shard state: [{}])", indexShard.shardId(), indexShard.state());
return;
}
}
}

// figure out where to recover from (node or disk, in which case sourceNode is null)
DiscoveryNode sourceNode = null;
if (!shardRouting.primary()) {
IndexShardRoutingTable shardRoutingTable = routingTable.index(shardRouting.index()).shard(shardRouting.id());
for (ShardRouting entry : shardRoutingTable) {
if (entry.primary() && entry.started()) {
// only recover from started primary, if we can't find one, we will do it next round
sourceNode = nodes.get(entry.currentNodeId());
if (sourceNode == null) {
logger.trace("can't recover replica because primary shard {} is assigned to an unknown node. ignoring.", entry);
return;
}
break;
}
}

if (sourceNode == null) {
logger.trace("can't recover replica for {} because a primary shard can not be found. ignoring.", shardRouting.shardId());
return;
}

} else if (shardRouting.relocatingNodeId() != null) {
sourceNode = nodes.get(shardRouting.relocatingNodeId());
if (sourceNode == null) {
logger.trace("can't recover from remote primary shard {} because it is assigned to an unknown node [{}]. ignoring.", shardRouting.shardId(), shardRouting.relocatingNodeId());
return;
}
}


// if there is no shard, create it
if (!indexService.hasShard(shardId)) {
if (failedShards.containsKey(shardRouting.shardId())) {
Expand Down Expand Up @@ -650,63 +685,45 @@ private void applyInitializingShard(final RoutingTable routingTable, final Disco
if (indexShard.ignoreRecoveryAttempt()) {
// we are already recovering (we can get to this state since the cluster event can happen several
// times while we recover)
logger.trace("ignoring recovery instruction for shard {} (shard state: [{}])", indexShard.shardId(), indexShard.state());
return;
}

if (sourceNode != null) {
try {
// we don't mark this one as relocated at the end.
// For primaries: requests in any case are routed to both when its relocating and that way we handle
// the edge case where its mark as relocated, and we might need to roll it back...
// For replicas: we are recovering a backup from a primary

RecoveryState.Type type = shardRouting.primary() ? RecoveryState.Type.RELOCATION : RecoveryState.Type.REPLICA;
final StartRecoveryRequest request = new StartRecoveryRequest(indexShard.shardId(), sourceNode, nodes.localNode(),
false, indexShard.store().list(), type);
recoveryTarget.startRecovery(request, indexShard, new PeerRecoveryListener(request, shardRouting, indexService, indexMetaData));

if (!shardRouting.primary()) {
// recovery from primary
IndexShardRoutingTable shardRoutingTable = routingTable.index(shardRouting.index()).shard(shardRouting.id());
for (ShardRouting entry : shardRoutingTable) {
if (entry.primary() && entry.started()) {
// only recover from started primary, if we can't find one, we will do it next round
final DiscoveryNode sourceNode = nodes.get(entry.currentNodeId());
try {
// we are recovering a backup from a primary, so no need to mark it as relocated
final StartRecoveryRequest request = new StartRecoveryRequest(indexShard.shardId(), sourceNode, nodes.localNode(),
false, indexShard.store().list(), RecoveryState.Type.REPLICA);
recoveryTarget.startRecovery(request, indexShard, new PeerRecoveryListener(request, shardRouting, indexService, indexMetaData));
} catch (Throwable e) {
handleRecoveryFailure(indexService, indexMetaData, shardRouting, true, e);
break;
}
break;
}
} catch (Throwable e) {
handleRecoveryFailure(indexService, indexMetaData, shardRouting, true, e);
}
} else {
if (shardRouting.relocatingNodeId() == null) {
// we are the first primary, recover from the gateway
// if its post api allocation, the index should exists
boolean indexShouldExists = indexShardRouting.primaryAllocatedPostApi();
IndexShardGatewayService shardGatewayService = indexService.shardInjector(shardId).getInstance(IndexShardGatewayService.class);
shardGatewayService.recover(indexShouldExists, new IndexShardGatewayService.RecoveryListener() {
@Override
public void onRecoveryDone() {
shardStateAction.shardStarted(shardRouting, indexMetaData.getUUID(), "after recovery from gateway");
}
// we are the first primary, recover from the gateway
// if its post api allocation, the index should exists
boolean indexShouldExists = indexShardRouting.primaryAllocatedPostApi();
IndexShardGatewayService shardGatewayService = indexService.shardInjector(shardId).getInstance(IndexShardGatewayService.class);
shardGatewayService.recover(indexShouldExists, new IndexShardGatewayService.RecoveryListener() {
@Override
public void onRecoveryDone() {
shardStateAction.shardStarted(shardRouting, indexMetaData.getUUID(), "after recovery from gateway");
}

@Override
public void onIgnoreRecovery(String reason) {
}
@Override
public void onIgnoreRecovery(String reason) {
}

@Override
public void onRecoveryFailed(IndexShardGatewayRecoveryException e) {
handleRecoveryFailure(indexService, indexMetaData, shardRouting, true, e);
}
});
} else {
// relocating primaries, recovery from the relocating shard
final DiscoveryNode sourceNode = nodes.get(shardRouting.relocatingNodeId());
try {
// we don't mark this one as relocated at the end, requests in any case are routed to both when its relocating
// and that way we handle the edge case where its mark as relocated, and we might need to roll it back...
final StartRecoveryRequest request = new StartRecoveryRequest(indexShard.shardId(), sourceNode, nodes.localNode(),
false, indexShard.store().list(), RecoveryState.Type.RELOCATION);
recoveryTarget.startRecovery(request, indexShard, new PeerRecoveryListener(request, shardRouting, indexService, indexMetaData));
} catch (Throwable e) {
@Override
public void onRecoveryFailed(IndexShardGatewayRecoveryException e) {
handleRecoveryFailure(indexService, indexMetaData, shardRouting, true, e);
}
}
});
}
}

Expand Down
Expand Up @@ -187,10 +187,9 @@ public void run() {
}

private void doRecovery(final StartRecoveryRequest request, final RecoveryStatus recoveryStatus, final RecoveryListener listener) {
if (request.sourceNode() == null) {
listener.onIgnoreRecovery(false, "No node to recover from, retry on next cluster state update");
return;
}

assert request.sourceNode() != null : "can't do a recovery without a source node";

final InternalIndexShard shard = recoveryStatus.indexShard;
if (shard == null) {
listener.onIgnoreRecovery(false, "shard missing locally, stop recovery");
Expand Down
Expand Up @@ -161,7 +161,7 @@ public boolean apply(Object obj) {
}

@Test
@TestLogging("cluster.service:TRACE,discovery:TRACE")
@TestLogging("cluster.service:TRACE,discovery:TRACE,indices.cluster:TRACE")
public void multipleNodesShutdownNonMasterNodes() throws Exception {
Settings settings = settingsBuilder()
.put("discovery.type", "zen")
Expand Down

0 comments on commit c7a4e5b

Please sign in to comment.