Skip to content

Commit

Permalink
Use a fresh recovery id when retrying recoveries (#22325)
Browse files Browse the repository at this point in the history
Recoveries are tracked on the target node using RecoveryTarget objects that are kept in a RecoveriesCollection. Each recovery has a unique id that is communicated from the recovery target to the source so that it can call back to the target and execute actions using the right recovery context. In case of a network disconnect, recoveries are retried. At the moment, the same recovery id is reused for the restarted recovery. This can lead to confusion though if the disconnect is unilateral and the recovery source continues with the recovery process. If the target reuses the same recovery id while doing a second attempt, there might be two concurrent recoveries running on the source for the same target.

This commit changes the recovery retry process to use a fresh recovery id. It also waits for the first recovery attempt to be fully finished (all resources locally freed) to further prevent concurrent access to the shard. Finally, in case of primary relocation, it also fails a second recovery attempt if the first attempt moved past the finalization step, as the relocation source can then be moved to RELOCATED state and start indexing as primary into the target shard (see TransportReplicationAction). Resetting the target shard in this state could mean that indexing is halted until the recovery retry attempt is completed and could also destroy existing documents indexed and acknowledged before the reset.

Relates to #22043
  • Loading branch information
ywelsch committed Dec 29, 2016
1 parent ca90d9e commit 6e6d9eb
Show file tree
Hide file tree
Showing 7 changed files with 370 additions and 163 deletions.

Large diffs are not rendered by default.

Expand Up @@ -33,7 +33,9 @@
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.threadpool.ThreadPool;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;

Expand Down Expand Up @@ -65,36 +67,67 @@ public RecoveriesCollection(Logger logger, ThreadPool threadPool, Callback<Long>
*/
public long startRecovery(IndexShard indexShard, DiscoveryNode sourceNode,
PeerRecoveryTargetService.RecoveryListener listener, TimeValue activityTimeout) {
RecoveryTarget status = new RecoveryTarget(indexShard, sourceNode, listener, ensureClusterStateVersionCallback);
RecoveryTarget existingStatus = onGoingRecoveries.putIfAbsent(status.recoveryId(), status);
assert existingStatus == null : "found two RecoveryStatus instances with the same id";
logger.trace("{} started recovery from {}, id [{}]", indexShard.shardId(), sourceNode, status.recoveryId());
RecoveryTarget recoveryTarget = new RecoveryTarget(indexShard, sourceNode, listener, ensureClusterStateVersionCallback);
startRecoveryInternal(recoveryTarget, activityTimeout);
return recoveryTarget.recoveryId();
}

private void startRecoveryInternal(RecoveryTarget recoveryTarget, TimeValue activityTimeout) {
RecoveryTarget existingTarget = onGoingRecoveries.putIfAbsent(recoveryTarget.recoveryId(), recoveryTarget);
assert existingTarget == null : "found two RecoveryStatus instances with the same id";
logger.trace("{} started recovery from {}, id [{}]", recoveryTarget.shardId(), recoveryTarget.sourceNode(),
recoveryTarget.recoveryId());
threadPool.schedule(activityTimeout, ThreadPool.Names.GENERIC,
new RecoveryMonitor(status.recoveryId(), status.lastAccessTime(), activityTimeout));
return status.recoveryId();
new RecoveryMonitor(recoveryTarget.recoveryId(), recoveryTarget.lastAccessTime(), activityTimeout));
}


/**
* Resets the recovery and performs a recovery restart on the currently recovering index shard
*
* @see IndexShard#performRecoveryRestart()
* @return newly created RecoveryTarget
*/
public void resetRecovery(long id, ShardId shardId) throws IOException {
try (RecoveryRef ref = getRecoverySafe(id, shardId)) {
// instead of adding complicated state to RecoveryTarget we just flip the
// target instance when we reset a recovery, that way we have only one cleanup
// path on the RecoveryTarget and are always within the bounds of ref-counting
// which is important since we verify files are on disk etc. after we have written them etc.
RecoveryTarget status = ref.status();
RecoveryTarget resetRecovery = status.resetRecovery();
if (onGoingRecoveries.replace(id, status, resetRecovery) == false) {
resetRecovery.cancel("replace failed"); // this is important otherwise we leak a reference to the store
throw new IllegalStateException("failed to replace recovery target");
public RecoveryTarget resetRecovery(final long recoveryId, TimeValue activityTimeout) {
RecoveryTarget oldRecoveryTarget = null;
final RecoveryTarget newRecoveryTarget;

try {
synchronized (onGoingRecoveries) {
// swap recovery targets in a synchronized block to ensure that the newly added recovery target is picked up by
// cancelRecoveriesForShard whenever the old recovery target is picked up
oldRecoveryTarget = onGoingRecoveries.remove(recoveryId);
if (oldRecoveryTarget == null) {
return null;
}

newRecoveryTarget = oldRecoveryTarget.retryCopy();
startRecoveryInternal(newRecoveryTarget, activityTimeout);
}

// Closes the current recovery target
final RecoveryTarget finalOldRecoveryTarget = oldRecoveryTarget;
final AtomicBoolean successfulReset = new AtomicBoolean();
newRecoveryTarget.CancellableThreads().executeIO(() -> successfulReset.set(finalOldRecoveryTarget.resetRecovery()));
if (successfulReset.get() == false) {
cancelRecovery(newRecoveryTarget.recoveryId(), "failed to reset recovery");
return null;
} else {
logger.trace("{} restarted recovery from {}, id [{}], previous id [{}]", newRecoveryTarget.shardId(),
newRecoveryTarget.sourceNode(), newRecoveryTarget.recoveryId(), oldRecoveryTarget.recoveryId());
return newRecoveryTarget;
}
} catch (Exception e) {
// fail shard to be safe
oldRecoveryTarget.notifyListener(new RecoveryFailedException(oldRecoveryTarget.state(), "failed to retry recovery", e), true);
return null;
}
}

public RecoveryTarget getRecoveryTarget(long id) {
return onGoingRecoveries.get(id);
}

/**
* gets the {@link RecoveryTarget } for a given id. The RecoveryStatus returned has it's ref count already incremented
* to make sure it's safe to use. However, you must call {@link RecoveryTarget#decRef()} when you are done with it, typically
Expand All @@ -116,7 +149,7 @@ public RecoveryRef getRecoverySafe(long id, ShardId shardId) {
if (recoveryRef == null) {
throw new IndexShardClosedException(shardId);
}
assert recoveryRef.status().shardId().equals(shardId);
assert recoveryRef.target().shardId().equals(shardId);
return recoveryRef;
}

Expand All @@ -143,7 +176,8 @@ public boolean cancelRecovery(long id, String reason) {
public void failRecovery(long id, RecoveryFailedException e, boolean sendShardFailure) {
RecoveryTarget removed = onGoingRecoveries.remove(id);
if (removed != null) {
logger.trace("{} failing recovery from {}, id [{}]. Send shard failure: [{}]", removed.shardId(), removed.sourceNode(), removed.recoveryId(), sendShardFailure);
logger.trace("{} failing recovery from {}, id [{}]. Send shard failure: [{}]", removed.shardId(), removed.sourceNode(),
removed.recoveryId(), sendShardFailure);
removed.fail(e, sendShardFailure);
}
}
Expand Down Expand Up @@ -171,11 +205,22 @@ public int size() {
*/
public boolean cancelRecoveriesForShard(ShardId shardId, String reason) {
boolean cancelled = false;
for (RecoveryTarget status : onGoingRecoveries.values()) {
if (status.shardId().equals(shardId)) {
cancelled |= cancelRecovery(status.recoveryId(), reason);
List<RecoveryTarget> matchedRecoveries = new ArrayList<>();
synchronized (onGoingRecoveries) {
for (Iterator<RecoveryTarget> it = onGoingRecoveries.values().iterator(); it.hasNext(); ) {
RecoveryTarget status = it.next();
if (status.shardId().equals(shardId)) {
matchedRecoveries.add(status);
it.remove();
}
}
}
for (RecoveryTarget removed : matchedRecoveries) {
logger.trace("{} canceled recovery from {}, id [{}] (reason [{}])",
removed.shardId(), removed.sourceNode(), removed.recoveryId(), reason);
removed.cancel(reason);
cancelled = true;
}
return cancelled;
}

Expand Down Expand Up @@ -205,7 +250,7 @@ public void close() {
}
}

public RecoveryTarget status() {
public RecoveryTarget target() {
return status;
}
}
Expand Down
Expand Up @@ -69,7 +69,7 @@ public class RecoverySettings extends AbstractComponent {
*/
public static final Setting<TimeValue> INDICES_RECOVERY_ACTIVITY_TIMEOUT_SETTING =
Setting.timeSetting("indices.recovery.recovery_activity_timeout",
INDICES_RECOVERY_INTERNAL_LONG_ACTION_TIMEOUT_SETTING::get, TimeValue.timeValueSeconds(0),
INDICES_RECOVERY_INTERNAL_LONG_ACTION_TIMEOUT_SETTING::get, TimeValue.timeValueSeconds(0),
Property.Dynamic, Property.NodeScope);

public static final ByteSizeValue DEFAULT_CHUNK_SIZE = new ByteSizeValue(512, ByteSizeUnit.KB);
Expand Down
Expand Up @@ -148,6 +148,8 @@ public RecoveryResponse recoverToTarget() throws IOException {

// engine was just started at the end of phase 1
if (shard.state() == IndexShardState.RELOCATED) {
assert request.isPrimaryRelocation() == false :
"recovery target should not retry primary relocation if previous attempt made it past finalization step";
/**
* The primary shard has been relocated while we copied files. This means that we can't guarantee any more that all
* operations that were replicated during the file copy (when the target engine was not yet opened) will be present in the
Expand Down
Expand Up @@ -55,6 +55,7 @@
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;

Expand Down Expand Up @@ -87,17 +88,11 @@ public class RecoveryTarget extends AbstractRefCounted implements RecoveryTarget
// last time this status was accessed
private volatile long lastAccessTime = System.nanoTime();

private final Map<String, String> tempFileNames = ConcurrentCollections.newConcurrentMap();
// latch that can be used to blockingly wait for RecoveryTarget to be closed
private final CountDownLatch closedLatch = new CountDownLatch(1);

private RecoveryTarget(RecoveryTarget copyFrom) { // copy constructor
this(copyFrom.indexShard, copyFrom.sourceNode, copyFrom.listener, copyFrom.cancellableThreads, copyFrom.recoveryId,
copyFrom.ensureClusterStateVersionCallback);
}
private final Map<String, String> tempFileNames = ConcurrentCollections.newConcurrentMap();

public RecoveryTarget(IndexShard indexShard, DiscoveryNode sourceNode, PeerRecoveryTargetService.RecoveryListener listener,
Callback<Long> ensureClusterStateVersionCallback) {
this(indexShard, sourceNode, listener, new CancellableThreads(), idGenerator.incrementAndGet(), ensureClusterStateVersionCallback);
}
/**
* creates a new recovery target object that represents a recovery to the provided indexShard
*
Expand All @@ -108,11 +103,11 @@ public RecoveryTarget(IndexShard indexShard, DiscoveryNode sourceNode, PeerRecov
* version. Necessary for primary relocation so that new primary knows about all other ongoing
* replica recoveries when replicating documents (see {@link RecoverySourceHandler}).
*/
private RecoveryTarget(IndexShard indexShard, DiscoveryNode sourceNode, PeerRecoveryTargetService.RecoveryListener listener,
CancellableThreads cancellableThreads, long recoveryId, Callback<Long> ensureClusterStateVersionCallback) {
public RecoveryTarget(IndexShard indexShard, DiscoveryNode sourceNode, PeerRecoveryTargetService.RecoveryListener listener,
Callback<Long> ensureClusterStateVersionCallback) {
super("recovery_status");
this.cancellableThreads = cancellableThreads;
this.recoveryId = recoveryId;
this.cancellableThreads = new CancellableThreads();
this.recoveryId = idGenerator.incrementAndGet();
this.listener = listener;
this.logger = Loggers.getLogger(getClass(), indexShard.indexSettings().getSettings(), indexShard.shardId());
this.indexShard = indexShard;
Expand All @@ -126,6 +121,13 @@ private RecoveryTarget(IndexShard indexShard, DiscoveryNode sourceNode, PeerReco
indexShard.recoveryStats().incCurrentAsTarget();
}

/**
* returns a fresh RecoveryTarget to retry recovery from the same source node onto the same IndexShard and using the same listener
*/
public RecoveryTarget retryCopy() {
return new RecoveryTarget(this.indexShard, this.sourceNode, this.listener, this.ensureClusterStateVersionCallback);
}

public long recoveryId() {
return recoveryId;
}
Expand Down Expand Up @@ -177,19 +179,28 @@ public void renameAllTempFiles() throws IOException {
}

/**
* Closes the current recovery target and returns a
* clone to reset the ongoing recovery.
* Note: the returned target must be canceled, failed or finished
* in order to release all it's reference.
* Closes the current recovery target and waits up to a certain timeout for resources to be freed.
* Returns true if resetting the recovery was successful, false if the recovery target is already cancelled / failed or marked as done.
*/
RecoveryTarget resetRecovery() throws IOException {
ensureRefCount();
boolean resetRecovery() throws InterruptedException, IOException {
if (finished.compareAndSet(false, true)) {
logger.debug("reset of recovery with shard {} and id [{}]", shardId, recoveryId);
// release the initial reference. recovery files will be cleaned as soon as ref count goes to zero, potentially now
decRef();
closedLatch.await();
RecoveryState.Stage stage = indexShard.recoveryState().getStage();
if (indexShard.recoveryState().getPrimary() && (stage == RecoveryState.Stage.FINALIZE || stage == RecoveryState.Stage.DONE)) {
// once primary relocation has moved past the finalization step, the relocation source can be moved to RELOCATED state
// and start indexing as primary into the target shard (see TransportReplicationAction). Resetting the target shard in this
// state could mean that indexing is halted until the recovery retry attempt is completed and could also destroy existing
// documents indexed and acknowledged before the reset.
assert stage != RecoveryState.Stage.DONE : "recovery should not have completed when it's being reset";
throw new IllegalStateException("cannot reset recovery as previous attempt made it past finalization step");
}
indexShard.performRecoveryRestart();
return true;
}
indexShard.performRecoveryRestart();
return new RecoveryTarget(this);
return false;
}

/**
Expand Down Expand Up @@ -220,7 +231,7 @@ public void cancel(String reason) {
public void fail(RecoveryFailedException e, boolean sendShardFailure) {
if (finished.compareAndSet(false, true)) {
try {
listener.onRecoveryFailure(state(), e, sendShardFailure);
notifyListener(e, sendShardFailure);
} finally {
try {
cancellableThreads.cancel("failed recovery [" + ExceptionsHelper.stackTrace(e) + "]");
Expand All @@ -232,6 +243,10 @@ public void fail(RecoveryFailedException e, boolean sendShardFailure) {
}
}

public void notifyListener(RecoveryFailedException e, boolean sendShardFailure) {
listener.onRecoveryFailure(state(), e, sendShardFailure);
}

/** mark the current recovery as done */
public void markAsDone() {
if (finished.compareAndSet(false, true)) {
Expand Down Expand Up @@ -309,6 +324,7 @@ protected void closeInternal() {
// free store. increment happens in constructor
store.decRef();
indexShard.recoveryStats().decCurrentAsTarget();
closedLatch.countDown();
}
}

Expand Down

0 comments on commit 6e6d9eb

Please sign in to comment.