Skip to content

Commit

Permalink
Disable recovery monitor before recovery start (#93551)
Browse files Browse the repository at this point in the history
We do nontrivial amounts of work before we start a peer recovery,
particularly recovering from the local translog up to its global
checkpoint. Today the recovery monitor is running during this time, and
will (repeatedly) fail the recovery if it takes more than 30 minutes to
complete. With this commit we disable the recovery monitor until this
local process has completed.

Closes #93542
  • Loading branch information
DaveCTurner committed Feb 7, 2023
1 parent 052a4f1 commit b8c9dc9
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 10 deletions.
6 changes: 6 additions & 0 deletions docs/changelog/93551.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 93551
summary: Disable recovery monitor before recovery start
area: Recovery
type: bug
issues:
- 93542
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.elasticsearch.core.CheckedFunction;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.engine.RecoveryEngineException;
Expand Down Expand Up @@ -219,6 +220,7 @@ private void doRecovery(final long recoveryId, final StartRecoveryRequest preExi
final RecoveryState recoveryState = recoveryTarget.state();
final RecoveryState.Timer timer = recoveryState.getTimer();
final IndexShard indexShard = recoveryTarget.indexShard();
final Releasable onCompletion = Releasables.wrap(recoveryTarget.disableRecoveryMonitor(), recoveryRef);

final var failureHandler = ActionListener.notifyOnce(ActionListener.runBefore(ActionListener.noop().delegateResponse((l, e) -> {
// this will be logged as warning later on...
Expand All @@ -228,7 +230,7 @@ private void doRecovery(final long recoveryId, final StartRecoveryRequest preExi
new RecoveryFailedException(recoveryTarget.state(), "failed to prepare shard for recovery", e),
true
);
}), recoveryRef::close));
}), onCompletion::close));

if (indexShard.routingEntry().isPromotableToPrimary() == false) {
assert preExistingRequest == null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -255,11 +255,11 @@ public boolean cancelRecoveriesForShard(ShardId shardId, String reason) {
}

/**
* a reference to {@link RecoveryTarget}, which implements {@link AutoCloseable}. closing the reference
* a reference to {@link RecoveryTarget}, which implements {@link Releasable}. closing the reference
* causes {@link RecoveryTarget#decRef()} to be called. This makes sure that the underlying resources
* will not be freed until {@link RecoveryRef#close()} is called.
*/
public static class RecoveryRef implements AutoCloseable {
public static class RecoveryRef implements Releasable {

private final RecoveryTarget status;
private final AtomicBoolean closed = new AtomicBoolean(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.elasticsearch.core.AbstractRefCounted;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.mapper.MapperException;
import org.elasticsearch.index.seqno.ReplicationTracker;
Expand All @@ -47,6 +48,7 @@
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

import static org.elasticsearch.core.Strings.format;
Expand Down Expand Up @@ -80,7 +82,7 @@ public class RecoveryTarget extends AbstractRefCounted implements RecoveryTarget
// last time this status was accessed
private volatile long lastAccessTime = System.nanoTime();

private volatile boolean recoveryMonitorEnabled = true;
private final AtomicInteger recoveryMonitorBlocks = new AtomicInteger();

@Nullable // if we're not downloading files from snapshots in this recovery or we're retrying
private volatile Releasable snapshotFileDownloadsPermit;
Expand Down Expand Up @@ -185,7 +187,7 @@ public boolean hasPermitToDownloadSnapshotFiles() {

/** return the last time this RecoveryStatus was used (based on System.nanoTime() */
public long lastAccessTime() {
if (recoveryMonitorEnabled) {
if (recoveryMonitorBlocks.get() == 0) {
return lastAccessTime;
}
return System.nanoTime();
Expand All @@ -204,12 +206,11 @@ public void setLastAccessTime() {
* @return releasable that once closed will re-enable liveness checks by the recovery monitor
*/
public Releasable disableRecoveryMonitor() {
assert recoveryMonitorEnabled : "recovery monitor already disabled";
recoveryMonitorEnabled = false;
return () -> {
recoveryMonitorBlocks.incrementAndGet();
return Releasables.releaseOnce(() -> {
setLastAccessTime();
recoveryMonitorEnabled = true;
};
recoveryMonitorBlocks.decrementAndGet();
});
}

public Store store() {
Expand Down Expand Up @@ -316,6 +317,7 @@ public void markAsDone() {

@Override
protected void closeInternal() {
assert recoveryMonitorBlocks.get() == 0;
try {
multiFileWriter.close();
} finally {
Expand Down

0 comments on commit b8c9dc9

Please sign in to comment.