Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor RecoveryTarget state management #8092

Closed
wants to merge 11 commits into from
Expand Up @@ -40,7 +40,6 @@
import org.elasticsearch.index.shard.service.InternalIndexShard;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.indices.recovery.RecoveryStatus;
import org.elasticsearch.indices.recovery.RecoveryTarget;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
Expand Down Expand Up @@ -144,19 +143,15 @@ protected ShardRecoveryResponse shardOperation(ShardRecoveryRequest request) thr

InternalIndexService indexService = (InternalIndexService) indicesService.indexServiceSafe(request.shardId().getIndex());
InternalIndexShard indexShard = (InternalIndexShard) indexService.shardSafe(request.shardId().id());
ShardRouting shardRouting = indexShard.routingEntry();
ShardRecoveryResponse shardRecoveryResponse = new ShardRecoveryResponse(request.shardId());

RecoveryState state;
RecoveryStatus recoveryStatus = indexShard.recoveryStatus();
RecoveryState state = indexShard.recoveryState();

if (recoveryStatus == null) {
recoveryStatus = recoveryTarget.recoveryStatus(indexShard);
if (state == null) {
state = recoveryTarget.recoveryState(indexShard);
}

if (recoveryStatus != null) {
state = recoveryStatus.recoveryState();
} else {
if (state == null) {
IndexShardGatewayService gatewayService =
indexService.shardInjector(request.shardId().id()).getInstance(IndexShardGatewayService.class);
state = gatewayService.recoveryState();
Expand All @@ -183,7 +178,8 @@ protected ClusterBlockException checkRequestBlock(ClusterState state, RecoveryRe

static class ShardRecoveryRequest extends BroadcastShardOperationRequest {

ShardRecoveryRequest() { }
ShardRecoveryRequest() {
}

ShardRecoveryRequest(ShardId shardId, RecoveryRequest request) {
super(shardId, request);
Expand Down
Expand Up @@ -180,13 +180,13 @@ protected ShardStatus shardOperation(IndexShardStatusRequest request) throws Ela

if (request.recovery) {
// check on going recovery (from peer or gateway)
RecoveryStatus peerRecoveryStatus = indexShard.recoveryStatus();
if (peerRecoveryStatus == null) {
peerRecoveryStatus = peerRecoveryTarget.recoveryStatus(indexShard);
RecoveryState peerRecoveryState = indexShard.recoveryState();
if (peerRecoveryState == null) {
peerRecoveryState = peerRecoveryTarget.recoveryState(indexShard);
}
if (peerRecoveryStatus != null) {
if (peerRecoveryState != null) {
PeerRecoveryStatus.Stage stage;
switch (peerRecoveryStatus.stage()) {
switch (peerRecoveryState.getStage()) {
case INIT:
stage = PeerRecoveryStatus.Stage.INIT;
break;
Expand All @@ -205,11 +205,11 @@ protected ShardStatus shardOperation(IndexShardStatusRequest request) throws Ela
default:
stage = PeerRecoveryStatus.Stage.INIT;
}
shardStatus.peerRecoveryStatus = new PeerRecoveryStatus(stage, peerRecoveryStatus.recoveryState().getTimer().startTime(),
peerRecoveryStatus.recoveryState().getTimer().time(),
peerRecoveryStatus.recoveryState().getIndex().totalByteCount(),
peerRecoveryStatus.recoveryState().getIndex().reusedByteCount(),
peerRecoveryStatus.recoveryState().getIndex().recoveredByteCount(), peerRecoveryStatus.recoveryState().getTranslog().currentTranslogOperations());
shardStatus.peerRecoveryStatus = new PeerRecoveryStatus(stage, peerRecoveryState.getTimer().startTime(),
peerRecoveryState.getTimer().time(),
peerRecoveryState.getIndex().totalByteCount(),
peerRecoveryState.getIndex().reusedByteCount(),
peerRecoveryState.getIndex().recoveredByteCount(), peerRecoveryState.getTranslog().currentTranslogOperations());
}

IndexShardGatewayService gatewayService = indexService.shardInjector(request.shardId().id()).getInstance(IndexShardGatewayService.class);
Expand Down
Expand Up @@ -62,6 +62,7 @@ public IndexShardGatewayService(ShardId shardId, @IndexSettings Settings indexSe
this.shardGateway = shardGateway;
this.snapshotService = snapshotService;
this.recoveryState = new RecoveryState(shardId);
this.recoveryState.setType(RecoveryState.Type.GATEWAY);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this go into the ctor?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nevermind it gets updated

this.clusterService = clusterService;
}

Expand Down
Expand Up @@ -89,7 +89,7 @@
import org.elasticsearch.index.warmer.WarmerStats;
import org.elasticsearch.indices.IndicesLifecycle;
import org.elasticsearch.indices.InternalIndicesLifecycle;
import org.elasticsearch.indices.recovery.RecoveryStatus;
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.search.suggest.completion.Completion090PostingsFormat;
import org.elasticsearch.search.suggest.completion.CompletionStats;
import org.elasticsearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -146,7 +146,8 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
private volatile ScheduledFuture mergeScheduleFuture;
private volatile ShardRouting shardRouting;

private RecoveryStatus recoveryStatus;
@Nullable
private RecoveryState recoveryState;

private ApplyRefreshSettings applyRefreshSettings = new ApplyRefreshSettings();

Expand Down Expand Up @@ -733,15 +734,15 @@ public void performRecoveryPrepareForTranslog() throws ElasticsearchException {
}

/**
* The peer recovery status if this shard recovered from a peer shard.
* The peer recovery state if this shard recovered from a peer shard, null o.w.
*/
public RecoveryStatus recoveryStatus() {
return this.recoveryStatus;
public RecoveryState recoveryState() {
return this.recoveryState;
}

public void performRecoveryFinalization(boolean withFlush, RecoveryStatus recoveryStatus) throws ElasticsearchException {
public void performRecoveryFinalization(boolean withFlush, RecoveryState recoveryState) throws ElasticsearchException {
performRecoveryFinalization(withFlush);
this.recoveryStatus = recoveryStatus;
this.recoveryState = recoveryState;
}

public void performRecoveryFinalization(boolean withFlush) throws ElasticsearchException {
Expand Down
Expand Up @@ -61,9 +61,10 @@
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.service.IndexShard;
import org.elasticsearch.index.shard.service.InternalIndexShard;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.recovery.*;
import org.elasticsearch.indices.recovery.RecoveryFailedException;
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.indices.recovery.RecoveryTarget;
import org.elasticsearch.threadpool.ThreadPool;

import java.util.HashMap;
Expand Down Expand Up @@ -559,19 +560,18 @@ private void applyNewOrUpdatedShards(final ClusterChangedEvent event) throws Ela
boolean shardHasBeenRemoved = false;
if (currentRoutingEntry.initializing() && shardRouting.initializing() && !currentRoutingEntry.equals(shardRouting)) {
logger.debug("[{}][{}] removing shard (different instance of it allocated on this node, current [{}], global [{}])", shardRouting.index(), shardRouting.id(), currentRoutingEntry, shardRouting);
// cancel recovery just in case we are in recovery (its fine if we are not in recovery, it will be a noop).
recoveryTarget.cancelRecovery(indexShard);
// closing the shard will also cancel any ongoing recovery.
indexService.removeShard(shardRouting.id(), "removing shard (different instance of it allocated on this node)");
shardHasBeenRemoved = true;
} else if (isPeerRecovery(shardRouting)) {
// check if there is an existing recovery going, and if so, and the source node is not the same, cancel the recovery to restart it
RecoveryStatus recoveryStatus = recoveryTarget.recoveryStatus(indexShard);
if (recoveryStatus != null && recoveryStatus.stage() != RecoveryState.Stage.DONE) {
RecoveryState recoveryState = recoveryTarget.recoveryState(indexShard);
if (recoveryState != null && recoveryState.getStage() != RecoveryState.Stage.DONE) {
// we have an ongoing recovery, find the source based on current routing and compare them
DiscoveryNode sourceNode = findSourceNodeForPeerRecovery(routingTable, nodes, shardRouting);
if (!recoveryStatus.sourceNode().equals(sourceNode)) {
if (!recoveryState.getSourceNode().equals(sourceNode)) {
logger.debug("[{}][{}] removing shard (recovery source changed), current [{}], global [{}])", shardRouting.index(), shardRouting.id(), currentRoutingEntry, shardRouting);
recoveryTarget.cancelRecovery(indexShard);
// closing the shard will also cancel any ongoing recovery.
indexService.removeShard(shardRouting.id(), "removing shard (recovery source node changed)");
shardHasBeenRemoved = true;
}
Expand Down Expand Up @@ -728,17 +728,7 @@ private void applyInitializingShard(final RoutingTable routingTable, final Disco
// the edge case where its mark as relocated, and we might need to roll it back...
// For replicas: we are recovering a backup from a primary
RecoveryState.Type type = shardRouting.primary() ? RecoveryState.Type.RELOCATION : RecoveryState.Type.REPLICA;
final Store store = indexShard.store();
final StartRecoveryRequest request;
store.incRef();
try {
store.failIfCorrupted();
request = new StartRecoveryRequest(indexShard.shardId(), sourceNode, nodes.localNode(),
false, store.getMetadata().asMap(), type, recoveryIdGenerator.incrementAndGet());
} finally {
store.decRef();
}
recoveryTarget.startRecovery(request, indexShard, new PeerRecoveryListener(request, shardRouting, indexService, indexMetaData));
recoveryTarget.startRecovery(indexShard, type, sourceNode, new PeerRecoveryListener(shardRouting, indexService, indexMetaData));

} catch (Throwable e) {
indexShard.engine().failEngine("corrupted preexisting index", e);
Expand Down Expand Up @@ -808,68 +798,41 @@ private boolean isPeerRecovery(ShardRouting shardRouting) {

private class PeerRecoveryListener implements RecoveryTarget.RecoveryListener {

private final StartRecoveryRequest request;
private final ShardRouting shardRouting;
private final IndexService indexService;
private final IndexMetaData indexMetaData;

private PeerRecoveryListener(StartRecoveryRequest request, ShardRouting shardRouting, IndexService indexService, IndexMetaData indexMetaData) {
this.request = request;
private PeerRecoveryListener(ShardRouting shardRouting, IndexService indexService, IndexMetaData indexMetaData) {
this.shardRouting = shardRouting;
this.indexService = indexService;
this.indexMetaData = indexMetaData;
}

@Override
public void onRecoveryDone() {
shardStateAction.shardStarted(shardRouting, indexMetaData.getUUID(), "after recovery (replica) from node [" + request.sourceNode() + "]");
}

@Override
public void onRetryRecovery(TimeValue retryAfter, RecoveryStatus recoveryStatus) {
recoveryTarget.retryRecovery(request, retryAfter, recoveryStatus, PeerRecoveryListener.this);
}

@Override
public void onIgnoreRecovery(boolean removeShard, String reason) {
if (!removeShard) {
return;
}
synchronized (mutex) {
if (indexService.hasShard(shardRouting.shardId().id())) {
if (logger.isDebugEnabled()) {
logger.debug("[{}][{}] removing shard on ignored recovery, reason [{}]", shardRouting.index(), shardRouting.shardId().id(), reason);
}
try {
indexService.removeShard(shardRouting.shardId().id(), "ignore recovery: " + reason);
} catch (IndexShardMissingException e) {
// the node got closed on us, ignore it
} catch (Throwable e1) {
logger.warn("[{}][{}] failed to delete shard after ignore recovery", e1, indexService.index().name(), shardRouting.shardId().id());
}
}
}
public void onRecoveryDone(RecoveryState state) {
shardStateAction.shardStarted(shardRouting, indexMetaData.getUUID(), "after recovery (replica) from node [" + state.getSourceNode() + "]");
}

@Override
public void onRecoveryFailure(RecoveryFailedException e, boolean sendShardFailure) {
public void onRecoveryFailure(RecoveryState state, RecoveryFailedException e, boolean sendShardFailure) {
handleRecoveryFailure(indexService, indexMetaData, shardRouting, sendShardFailure, e);
}
}

private void handleRecoveryFailure(IndexService indexService, IndexMetaData indexMetaData, ShardRouting shardRouting, boolean sendShardFailure, Throwable failure) {
logger.warn("[{}][{}] failed to start shard", failure, indexService.index().name(), shardRouting.shardId().id());
synchronized (mutex) {
if (indexService.hasShard(shardRouting.shardId().id())) {
try {
logger.debug("[{}][{}] removing shard on failed recovery [{}]", shardRouting.index(), shardRouting.shardId().id(), failure.getMessage());
indexService.removeShard(shardRouting.shardId().id(), "recovery failure [" + ExceptionsHelper.detailedMessage(failure) + "]");
} catch (IndexShardMissingException e) {
// the node got closed on us, ignore it
} catch (Throwable e1) {
logger.warn("[{}][{}] failed to delete shard after failed startup", e1, indexService.index().name(), shardRouting.shardId().id());
logger.warn("[{}][{}] failed to delete shard after recovery failure", e1, indexService.index().name(), shardRouting.shardId().id());
}
}
if (sendShardFailure) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume the onIgnoreRecovery is unused?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. it is now removed from the listener interface.

logger.warn("[{}][{}] sending failed shard after recovery failure", failure, indexService.index().name(), shardRouting.shardId().id());
try {
failedShards.put(shardRouting.shardId(), new FailedShard(shardRouting.version()));
shardStateAction.shardFailed(shardRouting, indexMetaData.getUUID(), "Failed to start shard, message [" + detailedMessage(failure) + "]");
Expand Down