Skip to content

Commit

Permalink
Do not send recovery requests with CancellableThreads (#46287)
Browse files Browse the repository at this point in the history
Previously, we send recovery requests using CancellableThreads because
we send requests and wait for responses in a blocking manner. With async
recovery, we no longer need to do so. Moreover, if we fail to submit a
request, then we can release the Store using an interruptible thread
which can risk invalidating the node lock.

This PR is the first step to avoid forking when releasing the Store.

Relates #45409
Relates #46178
  • Loading branch information
dnhatn committed Sep 4, 2019
1 parent c43ad1e commit b38f464
Showing 1 changed file with 36 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -517,9 +517,9 @@ void phase1(IndexCommit snapshot, Consumer<ActionListener<RetentionLease>> creat
final StepListener<Void> sendFilesStep = new StepListener<>();
final StepListener<RetentionLease> createRetentionLeaseStep = new StepListener<>();
final StepListener<Void> cleanFilesStep = new StepListener<>();
cancellableThreads.execute(() ->
recoveryTarget.receiveFileInfo(phase1FileNames, phase1FileSizes, phase1ExistingFileNames,
phase1ExistingFileSizes, translogOps.getAsInt(), sendFileInfoStep));
cancellableThreads.checkForCancel();
recoveryTarget.receiveFileInfo(phase1FileNames, phase1FileSizes, phase1ExistingFileNames,
phase1ExistingFileSizes, translogOps.getAsInt(), sendFileInfoStep);

sendFileInfoStep.whenComplete(r ->
sendFiles(store, phase1Files.toArray(new StoreFileMetaData[0]), translogOps, sendFilesStep), listener::onFailure);
Expand Down Expand Up @@ -634,8 +634,8 @@ void prepareTargetForTranslog(int totalTranslogOps, ActionListener<TimeValue> li
// Send a request preparing the new shard's translog to receive operations. This ensures the shard engine is started and disables
// garbage collection (not the JVM's GC!) of tombstone deletes.
logger.trace("recovery [phase1]: prepare remote engine for translog");
cancellableThreads.execute(() ->
recoveryTarget.prepareForTranslogOperations(totalTranslogOps, wrappedListener));
cancellableThreads.checkForCancel();
recoveryTarget.prepareForTranslogOperations(totalTranslogOps, wrappedListener);
}

/**
Expand Down Expand Up @@ -741,30 +741,29 @@ private void sendBatch(
final List<Translog.Operation> operations = nextBatch.get();
// send the leftover operations or if no operations were sent, request the target to respond with its local checkpoint
if (operations.isEmpty() == false || firstBatch) {
cancellableThreads.execute(() -> {
recoveryTarget.indexTranslogOperations(
operations,
totalTranslogOps,
maxSeenAutoIdTimestamp,
maxSeqNoOfUpdatesOrDeletes,
retentionLeases,
mappingVersionOnPrimary,
ActionListener.wrap(
newCheckpoint -> {
sendBatch(
nextBatch,
false,
SequenceNumbers.max(targetLocalCheckpoint, newCheckpoint),
totalTranslogOps,
maxSeenAutoIdTimestamp,
maxSeqNoOfUpdatesOrDeletes,
retentionLeases,
mappingVersionOnPrimary,
listener);
},
listener::onFailure
));
});
cancellableThreads.checkForCancel();
recoveryTarget.indexTranslogOperations(
operations,
totalTranslogOps,
maxSeenAutoIdTimestamp,
maxSeqNoOfUpdatesOrDeletes,
retentionLeases,
mappingVersionOnPrimary,
ActionListener.wrap(
newCheckpoint -> {
sendBatch(
nextBatch,
false,
SequenceNumbers.max(targetLocalCheckpoint, newCheckpoint),
totalTranslogOps,
maxSeenAutoIdTimestamp,
maxSeqNoOfUpdatesOrDeletes,
retentionLeases,
mappingVersionOnPrimary,
listener);
},
listener::onFailure
));
} else {
listener.onResponse(targetLocalCheckpoint);
}
Expand All @@ -787,7 +786,8 @@ void finalizeRecovery(long targetLocalCheckpoint, long trimAboveSeqNo, ActionLis
shardId + " marking " + request.targetAllocationId() + " as in sync", shard, cancellableThreads, logger);
final long globalCheckpoint = shard.getLastKnownGlobalCheckpoint(); // this global checkpoint is persisted in finalizeRecovery
final StepListener<Void> finalizeListener = new StepListener<>();
cancellableThreads.executeIO(() -> recoveryTarget.finalizeRecovery(globalCheckpoint, trimAboveSeqNo, finalizeListener));
cancellableThreads.checkForCancel();
recoveryTarget.finalizeRecovery(globalCheckpoint, trimAboveSeqNo, finalizeListener);
finalizeListener.whenComplete(r -> {
runUnderPrimaryPermit(() -> shard.updateGlobalCheckpointForShard(request.targetAllocationId(), globalCheckpoint),
shardId + " updating " + request.targetAllocationId() + "'s global checkpoint", shard, cancellableThreads, logger);
Expand Down Expand Up @@ -894,8 +894,9 @@ protected FileChunk nextChunkRequest(StoreFileMetaData md) throws IOException {

@Override
protected void sendChunkRequest(FileChunk request, ActionListener<Void> listener) {
cancellableThreads.execute(() -> recoveryTarget.writeFileChunk(
request.md, request.position, request.content, request.lastChunk, translogOps.getAsInt(), listener));
cancellableThreads.checkForCancel();
recoveryTarget.writeFileChunk(
request.md, request.position, request.content, request.lastChunk, translogOps.getAsInt(), listener);
}

@Override
Expand All @@ -922,13 +923,14 @@ private void cleanFiles(Store store, Store.MetadataSnapshot sourceMetadata, IntS
// Once the files have been renamed, any other files that are not
// related to this recovery (out of date segments, for example)
// are deleted
cancellableThreads.execute(() -> recoveryTarget.cleanFiles(translogOps.getAsInt(), globalCheckpoint, sourceMetadata,
cancellableThreads.checkForCancel();
recoveryTarget.cleanFiles(translogOps.getAsInt(), globalCheckpoint, sourceMetadata,
ActionListener.delegateResponse(listener, (l, e) -> ActionListener.completeWith(l, () -> {
StoreFileMetaData[] mds = StreamSupport.stream(sourceMetadata.spliterator(), false).toArray(StoreFileMetaData[]::new);
ArrayUtil.timSort(mds, Comparator.comparingLong(StoreFileMetaData::length)); // check small files first
handleErrorOnSendFiles(store, e, mds);
throw e;
}))));
})));
}

private void handleErrorOnSendFiles(Store store, Exception e, StoreFileMetaData[] mds) throws Exception {
Expand Down

0 comments on commit b38f464

Please sign in to comment.