Skip to content

Commit

Permalink
[RECOVERY] Allow to cancle recovery sources when shards are closed
Browse files Browse the repository at this point in the history
Today recovery sources are not cancled if a shard is closed. The recovery target
is already cancled when shards are closed but we should also cleanup and cancel
the sources side since it holds on to shard locks / references until it's closed.
  • Loading branch information
s1monw committed Nov 20, 2014
1 parent 2f758d2 commit 339e9dd
Show file tree
Hide file tree
Showing 4 changed files with 277 additions and 70 deletions.
Expand Up @@ -39,9 +39,19 @@ public final void run() {
onFailure(ex);
} catch (Throwable t) {
onFailure(t);
} finally {
onAfter();
}
}

/**
* This method is called in a finally block after successful execution
* or on a rejection.
*/
public void onAfter() {
// nothing by default
}

/**
* This method is invoked for all exception thrown by {@link #doRun()}
*/
Expand Down
Expand Up @@ -81,7 +81,12 @@ public void execute(Runnable command) {
if (command instanceof AbstractRunnable) {
// If we are an abstract runnable we can handle the rejection
// directly and don't need to rethrow it.
((AbstractRunnable)command).onRejection(ex);
try {
((AbstractRunnable) command).onRejection(ex);
} finally {
((AbstractRunnable) command).onAfter();

}
} else {
throw ex;
}
Expand Down
Expand Up @@ -19,22 +19,31 @@

package org.elasticsearch.indices.recovery;

import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.action.index.MappingUpdatedAction;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.service.IndexService;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.service.IndexShard;
import org.elasticsearch.index.shard.service.InternalIndexShard;
import org.elasticsearch.indices.IndicesLifecycle;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.BaseTransportRequestHandler;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportService;

import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

/**
* The source recovery accepts recovery requests from other peer shards and start the recovery process from this
* source shard to the target shard.
Expand All @@ -54,6 +63,7 @@ public static class Actions {

private final TimeValue internalActionTimeout;
private final TimeValue internalActionLongTimeout;
private final OngoingRecoveres ongoingRecoveries = new OngoingRecoveres();


@Inject
Expand All @@ -64,6 +74,14 @@ public RecoverySource(Settings settings, TransportService transportService, Indi
this.indicesService = indicesService;
this.mappingUpdatedAction = mappingUpdatedAction;
this.clusterService = clusterService;
this.indicesService.indicesLifecycle().addListener(new IndicesLifecycle.Listener() {
@Override
public void beforeIndexShardClosed(ShardId shardId, @Nullable IndexShard indexShard) {
if (indexShard != null) {
ongoingRecoveries.cancel(indexShard, "shard is closed");
}
}
});

this.recoverySettings = recoverySettings;

Expand Down Expand Up @@ -102,10 +120,14 @@ private RecoveryResponse recover(final StartRecoveryRequest request) {

logger.trace("[{}][{}] starting recovery to {}, mark_as_relocated {}", request.shardId().index().name(), request.shardId().id(), request.targetNode(), request.markAsRelocated());

ShardRecoveryHandler handler = new ShardRecoveryHandler(shard, request, recoverySettings, transportService, internalActionTimeout,
final ShardRecoveryHandler handler = new ShardRecoveryHandler(shard, request, recoverySettings, transportService, internalActionTimeout,
internalActionLongTimeout, clusterService, indicesService, mappingUpdatedAction, logger);

shard.recover(handler);
ongoingRecoveries.add(shard, handler);
try {
shard.recover(handler);
} finally {
ongoingRecoveries.remove(shard, handler);
}
return handler.getResponse();
}

Expand All @@ -127,5 +149,45 @@ public void messageReceived(final StartRecoveryRequest request, final TransportC
channel.sendResponse(response);
}
}


private static final class OngoingRecoveres {
private final Map<IndexShard, Set<ShardRecoveryHandler>> ongoingRecoveries = new HashMap<>();

synchronized void add(IndexShard shard, ShardRecoveryHandler handler) {
Set<ShardRecoveryHandler> shardRecoveryHandlers = ongoingRecoveries.get(shard);
if (shardRecoveryHandlers == null) {
shardRecoveryHandlers = new HashSet<>();
ongoingRecoveries.put(shard, shardRecoveryHandlers);
}
assert shardRecoveryHandlers.contains(handler) == false : "Handler was already registered [" + handler + "]";
shardRecoveryHandlers.add(handler);
}

synchronized void remove(IndexShard shard, ShardRecoveryHandler handler) {
final Set<ShardRecoveryHandler> shardRecoveryHandlers = ongoingRecoveries.get(shard);
assert shardRecoveryHandlers != null : "Shard was not registered [" + shard + "]";
boolean remove = shardRecoveryHandlers.remove(handler);
assert remove : "Handler was not registered [" + handler + "]";
if (shardRecoveryHandlers.isEmpty()) {
ongoingRecoveries.remove(shard);
}
}

synchronized void cancel(IndexShard shard, String reason) {
final Set<ShardRecoveryHandler> shardRecoveryHandlers = ongoingRecoveries.get(shard);
if (shardRecoveryHandlers != null) {
final List<Exception> failures = new ArrayList<>();
for (ShardRecoveryHandler handlers : shardRecoveryHandlers) {
try {
handlers.cancel(reason);
} catch (Exception ex) {
failures.add(ex);
}
}
ExceptionsHelper.maybeThrowRuntimeAndSuppress(failures);
}
}
}
}

0 comments on commit 339e9dd

Please sign in to comment.