Skip to content

Commit

Permalink
Recovery: cleaner interrupt handling during cancellation
Browse files Browse the repository at this point in the history
RecoveryTarget initiates the recovery by sending a start recovery request to the source node and then waits for the recovery to complete. During recovery cancellation, we interrupt the thread so it will wake up and clean the recovery. Depending on timing, this can leave an unneeded interrupted thread status causing future IO commands to fail unneeded.

RecoverySource already had a handy utility called CancellableThreads. This extracts it to a top level class, and uses it in RecoveryTarget as well.

Closes elastic#9000
  • Loading branch information
bleskes committed Dec 19, 2014
1 parent a00b552 commit fdcac03
Show file tree
Hide file tree
Showing 6 changed files with 366 additions and 128 deletions.
145 changes: 145 additions & 0 deletions src/main/java/org/elasticsearch/common/util/CancellableThreads.java
@@ -0,0 +1,145 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.util;

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.Nullable;

import java.util.HashSet;
import java.util.Set;

/**
* A utility class for multi threaded operation that needs to be cancellable via interrupts. Every cancellable operation should be
* executed via {@link #execute(Interruptable)}, which will capture the executing thread and make sure it is interrupted in the case
* cancellation.
*/
public class CancellableThreads {
private final Set<Thread> threads = new HashSet<>();
private boolean cancelled = false;
private String reason;

public synchronized boolean isCancelled() {
return cancelled;
}


/** call this will throw an exception if operation was cancelled. Override {@link #onCancel(String, java.lang.Throwable)} for custom failure logic */
public synchronized void checkForCancel() {
if (isCancelled()) {
onCancel(reason, null);
}
}

/**
* called if {@link #checkForCancel()} was invoked after the operation was cancelled.
* the default implementation always throws an {@link ExecutionCancelledException}, suppressing
* any other exception that occurred before cancellation
*
* @param reason reason for failure supplied by the caller of {@link @cancel}
* @param suppressedException any error that was encountered during the execution before the operation was cancelled.
*/
protected void onCancel(String reason, @Nullable Throwable suppressedException) {
RuntimeException e = new ExecutionCancelledException("operation was cancelled reason [" + reason + "]");
if (suppressedException != null) {
e.addSuppressed(suppressedException);
}
throw e;
}

private synchronized boolean add() {
checkForCancel();
threads.add(Thread.currentThread());
// capture and clean the interrupted thread before we start, so we can identify
// our own interrupt. we do so under lock so we know we don't clear our own.
return Thread.interrupted();
}

/**
* run the Interruptable, capturing the executing thread. Concurrent calls to {@link #cancel(String)} will interrupt this thread
* causing the call to prematurely return.
*
* @param interruptable code to run
*/
public void execute(Interruptable interruptable) {
boolean wasInterrupted = add();
RuntimeException throwable = null;
try {
interruptable.run();
} catch (InterruptedException e) {
// assume this is us and ignore
} catch (RuntimeException t) {
throwable = t;
} finally {
remove();
}
// we are now out of threads collection so we can't be interrupted any more by this class
// restore old flag and see if we need to fail
if (wasInterrupted) {
Thread.currentThread().interrupt();
} else {
// clear the flag interrupted flag as we are checking for failure..
Thread.interrupted();
}
synchronized (this) {
if (isCancelled()) {
onCancel(reason, throwable);
} else if (throwable != null) {
// if we're not canceling, we throw the original exception
throw throwable;
}
}
}


private synchronized void remove() {
threads.remove(Thread.currentThread());
}

/** cancel all current running operations. Future calls to {@link #checkForCancel()} will be failed with the given reason */
public synchronized void cancel(String reason) {
if (cancelled) {
// we were already cancelled, make sure we don't interrupt threads twice
// this is important in order to make sure that we don't mark
// Thread.interrupted without handling it
return;
}
cancelled = true;
this.reason = reason;
for (Thread thread : threads) {
thread.interrupt();
}
threads.clear();
}


public interface Interruptable {
public void run() throws InterruptedException;
}

public class ExecutionCancelledException extends ElasticsearchException {

public ExecutionCancelledException(String msg) {
super(msg);
}

public ExecutionCancelledException(String msg, Throwable cause) {
super(msg, cause);
}
}
}
Expand Up @@ -23,9 +23,9 @@
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardClosedException;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.IndexShard;

import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down
Expand Up @@ -21,15 +21,15 @@

import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexOutput;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.util.CancellableThreads;
import org.elasticsearch.common.util.concurrent.AbstractRefCounted;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.store.StoreFileMetaData;

Expand All @@ -40,7 +40,6 @@
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;

/**
*
Expand All @@ -64,13 +63,13 @@ public class RecoveryStatus extends AbstractRefCounted {
private final Store store;
private final RecoveryTarget.RecoveryListener listener;

private AtomicReference<Thread> waitingRecoveryThread = new AtomicReference<>();

private final AtomicBoolean finished = new AtomicBoolean();

private final ConcurrentMap<String, IndexOutput> openIndexOutputs = ConcurrentCollections.newConcurrentMap();
private final Store.LegacyChecksums legacyChecksums = new Store.LegacyChecksums();

private final CancellableThreads cancellableThreads = new CancellableThreads();

public RecoveryStatus(IndexShard indexShard, DiscoveryNode sourceNode, RecoveryState state, RecoveryTarget.RecoveryListener listener) {
super("recovery_status");
this.recoveryId = idGenerator.incrementAndGet();
Expand Down Expand Up @@ -110,24 +109,15 @@ public RecoveryState state() {
return state;
}

public CancellableThreads CancellableThreads() {
return cancellableThreads;
}

public Store store() {
ensureRefCount();
return store;
}

/** set a thread that should be interrupted if the recovery is canceled */
public void setWaitingRecoveryThread(Thread thread) {
waitingRecoveryThread.set(thread);
}

/**
* clear the thread set by {@link #setWaitingRecoveryThread(Thread)}, making sure we
* do not override another thread.
*/
public void clearWaitingRecoveryThread(Thread threadToClear) {
waitingRecoveryThread.compareAndSet(threadToClear, null);
}

public void stage(RecoveryState.Stage stage) {
state.setStage(stage);
}
Expand All @@ -146,38 +136,42 @@ public void renameAllTempFiles() throws IOException {
store.renameFilesSafe(tempFileNames);
}

/** cancel the recovery. calling this method will clean temporary files and release the store
/**
* cancel the recovery. calling this method will clean temporary files and release the store
* unless this object is in use (in which case it will be cleaned once all ongoing users call
* {@link #decRef()}
*
* if {@link #setWaitingRecoveryThread(Thread)} was used, the thread will be interrupted.
* <p/>
* if {@link #CancellableThreads()} was used, the threads will be interrupted.
*/
public void cancel(String reason) {
if (finished.compareAndSet(false, true)) {
logger.debug("recovery canceled (reason: [{}])", reason);
// release the initial reference. recovery files will be cleaned as soon as ref count goes to zero, potentially now
decRef();

final Thread thread = waitingRecoveryThread.get();
if (thread != null) {
thread.interrupt();
try {
logger.debug("recovery canceled (reason: [{}])", reason);
cancellableThreads.cancel(reason);
} finally {
// release the initial reference. recovery files will be cleaned as soon as ref count goes to zero, potentially now
decRef();
}
}
}

/**
* fail the recovery and call listener
*
* @param e exception that encapsulating the failure
* @param e exception that encapsulating the failure
* @param sendShardFailure indicates whether to notify the master of the shard failure
**/
*/
public void fail(RecoveryFailedException e, boolean sendShardFailure) {
if (finished.compareAndSet(false, true)) {
try {
listener.onRecoveryFailure(state, e, sendShardFailure);
} finally {
// release the initial reference. recovery files will be cleaned as soon as ref count goes to zero, potentially now
decRef();
try {
cancellableThreads.cancel("failed recovery [" + e.getMessage() + "]");
} finally {
// release the initial reference. recovery files will be cleaned as soon as ref count goes to zero, potentially now
decRef();
}
}
}
}
Expand Down Expand Up @@ -244,7 +238,12 @@ protected void closeInternal() {
Iterator<Entry<String, IndexOutput>> iterator = openIndexOutputs.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<String, IndexOutput> entry = iterator.next();
IOUtils.closeWhileHandlingException(entry.getValue());
logger.trace("closing IndexOutput file [{}]", entry.getValue());
try {
entry.getValue().close();
} catch (Throwable t) {
logger.debug("error while closing recovery output [{}]", t, entry.getValue());
}
iterator.remove();
}
// trash temporary files
Expand Down
Expand Up @@ -35,14 +35,11 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.CancellableThreads;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.index.IndexShardMissingException;
import org.elasticsearch.index.engine.RecoveryEngineException;
import org.elasticsearch.index.shard.IllegalIndexShardStateException;
import org.elasticsearch.index.shard.IndexShardClosedException;
import org.elasticsearch.index.shard.IndexShardNotStartedException;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.*;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.store.StoreFileMetaData;
import org.elasticsearch.index.translog.Translog;
Expand All @@ -53,6 +50,7 @@

import java.util.Collections;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;

import static org.elasticsearch.common.unit.TimeValue.timeValueMillis;

Expand Down Expand Up @@ -162,22 +160,27 @@ private void doRecovery(final RecoveryStatus recoveryStatus) {
new RecoveryFailedException(recoveryStatus.state(), "failed to list local files", e), true);
return;
}
StartRecoveryRequest request = new StartRecoveryRequest(recoveryStatus.shardId(), recoveryStatus.sourceNode(), clusterService.localNode(),
final StartRecoveryRequest request = new StartRecoveryRequest(recoveryStatus.shardId(), recoveryStatus.sourceNode(), clusterService.localNode(),
false, existingFiles, recoveryStatus.state().getType(), recoveryStatus.recoveryId());

final AtomicReference<RecoveryResponse> responseHolder = new AtomicReference<>();
try {
logger.trace("[{}][{}] starting recovery from {}", request.shardId().index().name(), request.shardId().id(), request.sourceNode());

StopWatch stopWatch = new StopWatch().start();
recoveryStatus.setWaitingRecoveryThread(Thread.currentThread());

RecoveryResponse recoveryResponse = transportService.submitRequest(request.sourceNode(), RecoverySource.Actions.START_RECOVERY, request, new FutureTransportResponseHandler<RecoveryResponse>() {
recoveryStatus.CancellableThreads().execute(new CancellableThreads.Interruptable() {
@Override
public RecoveryResponse newInstance() {
return new RecoveryResponse();
public void run() throws InterruptedException {
responseHolder.set(transportService.submitRequest(request.sourceNode(), RecoverySource.Actions.START_RECOVERY, request, new FutureTransportResponseHandler<RecoveryResponse>() {
@Override
public RecoveryResponse newInstance() {
return new RecoveryResponse();
}
}).txGet());
}
}).txGet();
recoveryStatus.clearWaitingRecoveryThread(Thread.currentThread());
});
final RecoveryResponse recoveryResponse = responseHolder.get();
assert responseHolder != null;
stopWatch.stop();
if (logger.isTraceEnabled()) {
StringBuilder sb = new StringBuilder();
Expand All @@ -199,6 +202,8 @@ public RecoveryResponse newInstance() {
}
// do this through ongoing recoveries to remove it from the collection
onGoingRecoveries.markRecoveryAsDone(recoveryStatus.recoveryId());
} catch (CancellableThreads.ExecutionCancelledException e) {
logger.trace("recovery cancelled", e);
} catch (Throwable e) {
if (logger.isTraceEnabled()) {
logger.trace("[{}][{}] Got exception on recovery", e, request.shardId().index().name(), request.shardId().id());
Expand Down Expand Up @@ -494,8 +499,6 @@ public void doRun() {
try {
doRecovery(statusRef.status());
} finally {
// make sure we never interrupt the thread after we have released it back to the pool
statusRef.status().clearWaitingRecoveryThread(Thread.currentThread());
statusRef.close();
}
}
Expand Down

0 comments on commit fdcac03

Please sign in to comment.