New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Use a fresh recovery id when retrying recoveries #22325
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks yannick. I left some initial feedback
this(copyFrom.indexShard, copyFrom.sourceNode, copyFrom.listener, copyFrom.cancellableThreads, copyFrom.recoveryId, | ||
copyFrom.ensureClusterStateVersionCallback); | ||
} | ||
private final Map<String, String> tempFileNames = ConcurrentCollections.newConcurrentMap(); | ||
|
||
public RecoveryTarget(IndexShard indexShard, DiscoveryNode sourceNode, PeerRecoveryTargetService.RecoveryListener listener, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can remove the private constructor?
throw new TimeoutException("timed out while waiting on previous recovery attempt to release resources"); | ||
} | ||
} catch (InterruptedException e) { | ||
Thread.currentThread().interrupt(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this one always gets me - we deal with the interrupt - I.e., throw an exception, why restore the thread's flag?
@@ -310,6 +337,7 @@ protected void closeInternal() { | |||
store.decRef(); | |||
indexShard.recoveryStats().decCurrentAsTarget(); | |||
} | |||
closedLatch.countDown(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm.. I think we want this in the finally clause?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was undecided about that one. I wondered if we should only consider successful closing as proper closing for the latch. Looking at it again: As we catch all the exceptions anyhow (IndexOutput.close() is wrapped by catch (Exception)
and the store is deleted with deleteQuiet
), there might not be a difference between the two. I'll move it to the finally
logger.trace( | ||
(Supplier<?>) () -> new ParameterizedMessage( | ||
"will retry recovery with id [{}] in [{}]", recoveryTarget.recoveryId(), retryAfter), reason); | ||
retryRecovery(recoveryTarget, retryAfter, currentRequest); | ||
"will retry recovery with id [{}] in [{}]", recoveryRef.status().recoveryId(), retryAfter), reason); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shoulds we change the status() field to target()?
} | ||
|
||
recoveryRef.close(); // close the reference held by RecoveryRunner |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we discussed an alternative to not have to close this reference here, but make doRecovery use a recoveryId
"shard id mismatch, expected " + shardId + " but was " + oldRecoveryTarget.shardId(); | ||
|
||
newRecoveryTarget = oldRecoveryTarget.retryCopy(); | ||
RecoveryTarget existingTarget = onGoingRecoveries.putIfAbsent(newRecoveryTarget.recoveryId(), newRecoveryTarget); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we need to have a new monitor here..
} | ||
|
||
recoveryRef.close(); // close the reference held by RecoveryRunner | ||
oldRecoveryTarget.resetRecovery(retryCleanupTimeout); // Closes the current recovery target |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we talked about an option to run this under the cancellable threads of the new target, so we rely on it's monitor to cancel this if it takes too long. This will save us retryCleanupTimeout
setting.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thx @ywelsch . I left some very minor comments. I think we're almost there. Btw - I think we mentioned something about having a test, but I don't see it? Should we also link to the issue that started this all?
indexShard.performRecoveryRestart(); | ||
try { | ||
indexShard.performRecoveryRestart(); | ||
} catch (IOException ioe) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe you can run this via CancellableThreads.executeIO
, which means we won't this exception juggling
@@ -188,20 +185,13 @@ public void renameAllTempFiles() throws IOException { | |||
/** | |||
* Closes the current recovery target and waits up to a certain timeout for resources to be freed | |||
*/ | |||
void resetRecovery(TimeValue timeout) throws IOException, TimeoutException { | |||
void resetRecovery() throws InterruptedException { | |||
ensureRefCount(); | |||
if (finished.compareAndSet(false, true)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should return a boolean to indicate whether this actually worked. I'm afraid that someone called the fail
method concurrently, in which case the listener is notified and we can't reset the recovery any more. This means that if the caller of this methods get's a false back, it should cancel the new recovery.
onGoingRecoveries.failRecovery(recoveryTarget.recoveryId(), | ||
new RecoveryFailedException(recoveryTarget.state(), "failed to list local files", e), true); | ||
private void doRecovery(final long recoveryId) { | ||
RecoveryRef recoveryRef = onGoingRecoveries.getRecovery(recoveryId); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should release the reference. Can we do all prep work under a try with resources until and including the creation of the start recovery requests? I believe we only need the recovery id and the cancelable threads after that, so we can capture it within the block. It will be easier to follow.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The RecoveryTarget is needed for one more thing: recoveryTarget.indexShard().prepareForIndexRecovery();
which moves the recovery stage from INIT to INDEX. I've moved this call further up and made it fail the recovery/shard if this step fails. Previously it would retry in this situation, and reset the recovery stage back to INIT, but now with double recoveries being forbidden, I think we should fail early on this.
// release the initial reference. recovery files will be cleaned as soon as ref count goes to zero, potentially now | ||
decRef(); | ||
closedLatch.await(); | ||
RecoveryState.Stage stage = indexShard.recoveryState().getStage(); | ||
if (indexShard.recoveryState().getPrimary() && (stage == RecoveryState.Stage.FINALIZE || stage == RecoveryState.Stage.DONE)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we something to the PR description about this? it's subtle but important imo.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Thx @ywelsch
Thanks @bleskes |
Recoveries are tracked on the target node using RecoveryTarget objects that are kept in a RecoveriesCollection. Each recovery has a unique id that is communicated from the recovery target to the source so that it can call back to the target and execute actions using the right recovery context. In case of a network disconnect, recoveries are retried. At the moment, the same recovery id is reused for the restarted recovery. This can lead to confusion though if the disconnect is unilateral and the recovery source continues with the recovery process. If the target reuses the same recovery id while doing a second attempt, there might be two concurrent recoveries running on the source for the same target. This commit changes the recovery retry process to use a fresh recovery id. It also waits for the first recovery attempt to be fully finished (all resources locally freed) to further prevent concurrent access to the shard. Finally, in case of primary relocation, it also fails a second recovery attempt if the first attempt moved past the finalization step, as the relocation source can then be moved to RELOCATED state and start indexing as primary into the target shard (see TransportReplicationAction). Resetting the target shard in this state could mean that indexing is halted until the recovery retry attempt is completed and could also destroy existing documents indexed and acknowledged before the reset. Relates to #22043
Recoveries are tracked on the target node using RecoveryTarget objects that are kept in a RecoveriesCollection. Each recovery has a unique id that is communicated from the recovery target to the source so that it can call back to the target and execute actions using the right recovery context. In case of a network disconnect, recoveries are retried. At the moment, the same recovery id is reused for the restarted recovery. This can lead to confusion though if the disconnect is unilateral and the recovery source continues with the recovery process. If the target reuses the same recovery id while doing a second attempt, there might be two concurrent recoveries running on the source for the same target. This commit changes the recovery retry process to use a fresh recovery id. It also waits for the first recovery attempt to be fully finished (all resources locally freed) to further prevent concurrent access to the shard. Finally, in case of primary relocation, it also fails a second recovery attempt if the first attempt moved past the finalization step, as the relocation source can then be moved to RELOCATED state and start indexing as primary into the target shard (see TransportReplicationAction). Resetting the target shard in this state could mean that indexing is halted until the recovery retry attempt is completed and could also destroy existing documents indexed and acknowledged before the reset. Relates to #22043
Resetting a recovery consists of resetting the old recovery target and replacing it by a new recovery target object. This is done on the Cancellable threads of the new recovery target. If the new recovery target is already cancelled before or while this happens, for example due to shard closing or recovery source changing, we have to make sure that the old recovery target object frees all shard resources. Relates to #22325
I've pushed 816e1c6 which fixes an issue uncovered by a CI run: |
Resetting a recovery consists of resetting the old recovery target and replacing it by a new recovery target object. This is done on the Cancellable threads of the new recovery target. If the new recovery target is already cancelled before or while this happens, for example due to shard closing or recovery source changing, we have to make sure that the old recovery target object frees all shard resources. Relates to #22325
Resetting a recovery consists of resetting the old recovery target and replacing it by a new recovery target object. This is done on the Cancellable threads of the new recovery target. If the new recovery target is already cancelled before or while this happens, for example due to shard closing or recovery source changing, we have to make sure that the old recovery target object frees all shard resources. Relates to #22325
#22325 changed the recovery retry logic to use unique recovery ids. The change also introduced an issue, however, which made it possible for the shard store to be closed under CancellableThreads, triggering assertions in the node locking logic. This commit limits the use of CancellableThreads only to the part where we wait on the old recovery target to be closed.
#22325 changed the recovery retry logic to use unique recovery ids. The change also introduced an issue, however, which made it possible for the shard store to be closed under CancellableThreads, triggering assertions in the node locking logic. This commit limits the use of CancellableThreads only to the part where we wait on the old recovery target to be closed.
#22325 changed the recovery retry logic to use unique recovery ids. The change also introduced an issue, however, which made it possible for the shard store to be closed under CancellableThreads, triggering assertions in the node locking logic. This commit limits the use of CancellableThreads only to the part where we wait on the old recovery target to be closed.
#22325 changed the recovery retry logic to use unique recovery ids. The change also introduced an issue, however, which made it possible for the shard store to be closed under CancellableThreads, triggering assertions in the node locking logic. This commit limits the use of CancellableThreads only to the part where we wait on the old recovery target to be closed.
Recoveries are tracked on the target node using RecoveryTarget objects that are kept in a RecoveriesCollection. Each recovery has a unique id that is communicated from the recovery target to the source so that it can call back to the target and execute actions using the right recovery context. In case of a network disconnect, recoveries are retried. At the moment, the same recovery id is reused for the restarted recovery. This can lead to confusion though if the disconnect is unilateral and the recovery source continues with the recovery process. If the target reuses the same recovery id while doing a second attempt, there might be two concurrent recoveries running on the source for the same target.
This PR changes the recovery retry process to use a fresh recovery id. It also waits for the first recovery attempt to be fully finished (all resources locally freed) to further prevent concurrent access to the shard. Finally, in case of primary relocation, it also fails a second recovery attempt if the first attempt moved past the finalization step, as the relocation source can then be moved to RELOCATED state and start indexing as primary into the target shard (see TransportReplicationAction). Resetting the target shard in this state could mean that indexing is halted until the recovery retry attempt is completed and could also destroy existing documents indexed and acknowledged before the reset.
Relates to #22043