Skip to content

Commit

Permalink
Recovery: refactor RecoveryTarget state management
Browse files Browse the repository at this point in the history
This commit rewrites the state controls in the RecoveryTarget family classes to make it easier to guarantee that:
- recovery resources are only cleared once there are no ongoing requests
- recovery is automatically canceled when the target shard is closed/removed
- canceled recoveries do not leave temp files behind when canceled.

Highlights of the change:
1) All temporary files are cleared upon failure/cancel (see #7315 )
2) All newly created files are always temporary
3) Doesn't list local files on the cluster state update thread (which throw unwanted exception)
4) Recoveries are canceled by a listener to IndicesLifecycle.beforeIndexShardClosed, so we don't need to explicitly call it.
5) Simplifies RecoveryListener to only notify when a recovery is done or failed. Removed subtleties like ignore and retry (they are dealt with internally)

Closes #8092 , Closes #7315
  • Loading branch information
bleskes committed Oct 23, 2014
1 parent 1557c34 commit 24bc8d3
Show file tree
Hide file tree
Showing 9 changed files with 851 additions and 505 deletions.
Expand Up @@ -40,7 +40,6 @@
import org.elasticsearch.index.shard.service.InternalIndexShard;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.indices.recovery.RecoveryStatus;
import org.elasticsearch.indices.recovery.RecoveryTarget;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
Expand Down Expand Up @@ -144,19 +143,15 @@ protected ShardRecoveryResponse shardOperation(ShardRecoveryRequest request) thr

InternalIndexService indexService = (InternalIndexService) indicesService.indexServiceSafe(request.shardId().getIndex());
InternalIndexShard indexShard = (InternalIndexShard) indexService.shardSafe(request.shardId().id());
ShardRouting shardRouting = indexShard.routingEntry();
ShardRecoveryResponse shardRecoveryResponse = new ShardRecoveryResponse(request.shardId());

RecoveryState state;
RecoveryStatus recoveryStatus = indexShard.recoveryStatus();
RecoveryState state = indexShard.recoveryState();

if (recoveryStatus == null) {
recoveryStatus = recoveryTarget.recoveryStatus(indexShard);
if (state == null) {
state = recoveryTarget.recoveryState(indexShard);
}

if (recoveryStatus != null) {
state = recoveryStatus.recoveryState();
} else {
if (state == null) {
IndexShardGatewayService gatewayService =
indexService.shardInjector(request.shardId().id()).getInstance(IndexShardGatewayService.class);
state = gatewayService.recoveryState();
Expand All @@ -183,7 +178,8 @@ protected ClusterBlockException checkRequestBlock(ClusterState state, RecoveryRe

static class ShardRecoveryRequest extends BroadcastShardOperationRequest {

ShardRecoveryRequest() { }
ShardRecoveryRequest() {
}

ShardRecoveryRequest(ShardId shardId, RecoveryRequest request) {
super(shardId, request);
Expand Down
Expand Up @@ -62,6 +62,7 @@ public IndexShardGatewayService(ShardId shardId, @IndexSettings Settings indexSe
this.shardGateway = shardGateway;
this.snapshotService = snapshotService;
this.recoveryState = new RecoveryState(shardId);
this.recoveryState.setType(RecoveryState.Type.GATEWAY);
this.clusterService = clusterService;
}

Expand Down
Expand Up @@ -89,7 +89,7 @@
import org.elasticsearch.index.warmer.WarmerStats;
import org.elasticsearch.indices.IndicesLifecycle;
import org.elasticsearch.indices.InternalIndicesLifecycle;
import org.elasticsearch.indices.recovery.RecoveryStatus;
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.search.suggest.completion.Completion090PostingsFormat;
import org.elasticsearch.search.suggest.completion.CompletionStats;
import org.elasticsearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -146,7 +146,8 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
private volatile ScheduledFuture mergeScheduleFuture;
private volatile ShardRouting shardRouting;

private RecoveryStatus recoveryStatus;
@Nullable
private RecoveryState recoveryState;

private ApplyRefreshSettings applyRefreshSettings = new ApplyRefreshSettings();

Expand Down Expand Up @@ -733,15 +734,15 @@ public void performRecoveryPrepareForTranslog() throws ElasticsearchException {
}

/**
* The peer recovery status if this shard recovered from a peer shard.
* The peer recovery state if this shard recovered from a peer shard, null o.w.
*/
public RecoveryStatus recoveryStatus() {
return this.recoveryStatus;
public RecoveryState recoveryState() {
return this.recoveryState;
}

public void performRecoveryFinalization(boolean withFlush, RecoveryStatus recoveryStatus) throws ElasticsearchException {
public void performRecoveryFinalization(boolean withFlush, RecoveryState recoveryState) throws ElasticsearchException {
performRecoveryFinalization(withFlush);
this.recoveryStatus = recoveryStatus;
this.recoveryState = recoveryState;
}

public void performRecoveryFinalization(boolean withFlush) throws ElasticsearchException {
Expand Down
Expand Up @@ -61,9 +61,10 @@
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.service.IndexShard;
import org.elasticsearch.index.shard.service.InternalIndexShard;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.recovery.*;
import org.elasticsearch.indices.recovery.RecoveryFailedException;
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.indices.recovery.RecoveryTarget;
import org.elasticsearch.threadpool.ThreadPool;

import java.util.HashMap;
Expand Down Expand Up @@ -559,19 +560,18 @@ private void applyNewOrUpdatedShards(final ClusterChangedEvent event) throws Ela
boolean shardHasBeenRemoved = false;
if (currentRoutingEntry.initializing() && shardRouting.initializing() && !currentRoutingEntry.equals(shardRouting)) {
logger.debug("[{}][{}] removing shard (different instance of it allocated on this node, current [{}], global [{}])", shardRouting.index(), shardRouting.id(), currentRoutingEntry, shardRouting);
// cancel recovery just in case we are in recovery (its fine if we are not in recovery, it will be a noop).
recoveryTarget.cancelRecovery(indexShard);
// closing the shard will also cancel any ongoing recovery.
indexService.removeShard(shardRouting.id(), "removing shard (different instance of it allocated on this node)");
shardHasBeenRemoved = true;
} else if (isPeerRecovery(shardRouting)) {
// check if there is an existing recovery going, and if so, and the source node is not the same, cancel the recovery to restart it
RecoveryStatus recoveryStatus = recoveryTarget.recoveryStatus(indexShard);
if (recoveryStatus != null && recoveryStatus.stage() != RecoveryState.Stage.DONE) {
RecoveryState recoveryState = recoveryTarget.recoveryState(indexShard);
if (recoveryState != null && recoveryState.getStage() != RecoveryState.Stage.DONE) {
// we have an ongoing recovery, find the source based on current routing and compare them
DiscoveryNode sourceNode = findSourceNodeForPeerRecovery(routingTable, nodes, shardRouting);
if (!recoveryStatus.sourceNode().equals(sourceNode)) {
if (!recoveryState.getSourceNode().equals(sourceNode)) {
logger.debug("[{}][{}] removing shard (recovery source changed), current [{}], global [{}])", shardRouting.index(), shardRouting.id(), currentRoutingEntry, shardRouting);
recoveryTarget.cancelRecovery(indexShard);
// closing the shard will also cancel any ongoing recovery.
indexService.removeShard(shardRouting.id(), "removing shard (recovery source node changed)");
shardHasBeenRemoved = true;
}
Expand Down Expand Up @@ -728,17 +728,7 @@ private void applyInitializingShard(final RoutingTable routingTable, final Disco
// the edge case where its mark as relocated, and we might need to roll it back...
// For replicas: we are recovering a backup from a primary
RecoveryState.Type type = shardRouting.primary() ? RecoveryState.Type.RELOCATION : RecoveryState.Type.REPLICA;
final Store store = indexShard.store();
final StartRecoveryRequest request;
store.incRef();
try {
store.failIfCorrupted();
request = new StartRecoveryRequest(indexShard.shardId(), sourceNode, nodes.localNode(),
false, store.getMetadata().asMap(), type, recoveryIdGenerator.incrementAndGet());
} finally {
store.decRef();
}
recoveryTarget.startRecovery(request, indexShard, new PeerRecoveryListener(request, shardRouting, indexService, indexMetaData));
recoveryTarget.startRecovery(indexShard, type, sourceNode, new PeerRecoveryListener(shardRouting, indexService, indexMetaData));

} catch (Throwable e) {
indexShard.engine().failEngine("corrupted preexisting index", e);
Expand Down Expand Up @@ -808,68 +798,41 @@ private boolean isPeerRecovery(ShardRouting shardRouting) {

private class PeerRecoveryListener implements RecoveryTarget.RecoveryListener {

private final StartRecoveryRequest request;
private final ShardRouting shardRouting;
private final IndexService indexService;
private final IndexMetaData indexMetaData;

private PeerRecoveryListener(StartRecoveryRequest request, ShardRouting shardRouting, IndexService indexService, IndexMetaData indexMetaData) {
this.request = request;
private PeerRecoveryListener(ShardRouting shardRouting, IndexService indexService, IndexMetaData indexMetaData) {
this.shardRouting = shardRouting;
this.indexService = indexService;
this.indexMetaData = indexMetaData;
}

@Override
public void onRecoveryDone() {
shardStateAction.shardStarted(shardRouting, indexMetaData.getUUID(), "after recovery (replica) from node [" + request.sourceNode() + "]");
}

@Override
public void onRetryRecovery(TimeValue retryAfter, RecoveryStatus recoveryStatus) {
recoveryTarget.retryRecovery(request, retryAfter, recoveryStatus, PeerRecoveryListener.this);
}

@Override
public void onIgnoreRecovery(boolean removeShard, String reason) {
if (!removeShard) {
return;
}
synchronized (mutex) {
if (indexService.hasShard(shardRouting.shardId().id())) {
if (logger.isDebugEnabled()) {
logger.debug("[{}][{}] removing shard on ignored recovery, reason [{}]", shardRouting.index(), shardRouting.shardId().id(), reason);
}
try {
indexService.removeShard(shardRouting.shardId().id(), "ignore recovery: " + reason);
} catch (IndexShardMissingException e) {
// the node got closed on us, ignore it
} catch (Throwable e1) {
logger.warn("[{}][{}] failed to delete shard after ignore recovery", e1, indexService.index().name(), shardRouting.shardId().id());
}
}
}
public void onRecoveryDone(RecoveryState state) {
shardStateAction.shardStarted(shardRouting, indexMetaData.getUUID(), "after recovery (replica) from node [" + state.getSourceNode() + "]");
}

@Override
public void onRecoveryFailure(RecoveryFailedException e, boolean sendShardFailure) {
public void onRecoveryFailure(RecoveryState state, RecoveryFailedException e, boolean sendShardFailure) {
handleRecoveryFailure(indexService, indexMetaData, shardRouting, sendShardFailure, e);
}
}

private void handleRecoveryFailure(IndexService indexService, IndexMetaData indexMetaData, ShardRouting shardRouting, boolean sendShardFailure, Throwable failure) {
logger.warn("[{}][{}] failed to start shard", failure, indexService.index().name(), shardRouting.shardId().id());
synchronized (mutex) {
if (indexService.hasShard(shardRouting.shardId().id())) {
try {
logger.debug("[{}][{}] removing shard on failed recovery [{}]", shardRouting.index(), shardRouting.shardId().id(), failure.getMessage());
indexService.removeShard(shardRouting.shardId().id(), "recovery failure [" + ExceptionsHelper.detailedMessage(failure) + "]");
} catch (IndexShardMissingException e) {
// the node got closed on us, ignore it
} catch (Throwable e1) {
logger.warn("[{}][{}] failed to delete shard after failed startup", e1, indexService.index().name(), shardRouting.shardId().id());
logger.warn("[{}][{}] failed to delete shard after recovery failure", e1, indexService.index().name(), shardRouting.shardId().id());
}
}
if (sendShardFailure) {
logger.warn("[{}][{}] sending failed shard after recovery failure", failure, indexService.index().name(), shardRouting.shardId().id());
try {
failedShards.put(shardRouting.shardId(), new FailedShard(shardRouting.version()));
shardStateAction.shardFailed(shardRouting, indexMetaData.getUUID(), "Failed to start shard, message [" + detailedMessage(failure) + "]");
Expand Down

0 comments on commit 24bc8d3

Please sign in to comment.