Skip to content

Commit

Permalink
Wait for all Rec. to Stop on Node Close
Browse files Browse the repository at this point in the history
* This issue is in the `RecoverySourceHandler#acquireStore`. If we submit the store release to the generic threadpool while it is getting shut down we never complete the futue we wait on (in the generic pool as well) and fail to ever release the store potentially.
* Fixed by waiting for all recoveries to end on node close so that we aways have a healthy thread pool here
* Closes elastic#45956
  • Loading branch information
original-brownbear committed Aug 30, 2019
1 parent 911d02b commit 2ad3593
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 8 deletions.
Expand Up @@ -24,10 +24,13 @@
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ChannelActionListener;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.FutureUtils;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.shard.IndexEventListener;
import org.elasticsearch.index.shard.IndexShard;
Expand All @@ -50,7 +53,7 @@
* The source recovery accepts recovery requests from other peer shards and start the recovery process from this
* source shard to the target shard.
*/
public class PeerRecoverySourceService implements IndexEventListener {
public class PeerRecoverySourceService extends AbstractLifecycleComponent implements IndexEventListener {

private static final Logger logger = LogManager.getLogger(PeerRecoverySourceService.class);

Expand All @@ -74,6 +77,19 @@ public PeerRecoverySourceService(TransportService transportService, IndicesServi
new StartRecoveryTransportRequestHandler());
}

@Override
protected void doStart() {
}

@Override
protected void doStop() {
ongoingRecoveries.awaitEmpty();
}

@Override
protected void doClose() {
}

@Override
public void beforeIndexShardClosed(ShardId shardId, @Nullable IndexShard indexShard,
Settings indexSettings) {
Expand Down Expand Up @@ -118,6 +134,9 @@ final int numberOfOngoingRecoveries() {
}

final class OngoingRecoveries {

private final List<ActionListener<Void>> emptyListeners = new ArrayList<>();

private final Map<IndexShard, ShardRecoveryContext> ongoingRecoveries = new HashMap<>();

synchronized RecoverySourceHandler addNewRecovery(StartRecoveryRequest request, IndexShard shard) {
Expand All @@ -138,6 +157,10 @@ synchronized void remove(IndexShard shard, RecoverySourceHandler handler) {
if (shardRecoveryContext.recoveryHandlers.isEmpty()) {
ongoingRecoveries.remove(shard);
}
if (ongoingRecoveries.isEmpty()) {
ActionListener.onResponse(emptyListeners, null);
emptyListeners.clear();
}
}

synchronized void cancel(IndexShard shard, String reason) {
Expand All @@ -157,6 +180,19 @@ synchronized void cancel(IndexShard shard, String reason) {
}
}

void awaitEmpty() {
assert lifecycle.stoppedOrClosed();
final PlainActionFuture<Void> future;
synchronized (this) {
if (ongoingRecoveries.isEmpty()) {
return;
}
future = new PlainActionFuture<>();
emptyListeners.add(future);
}
FutureUtils.get(future);
}

private final class ShardRecoveryContext {
final Set<RecoverySourceHandler> recoveryHandlers = new HashSet<>();

Expand Down
Expand Up @@ -402,13 +402,11 @@ private Releasable acquireStore(Store store) {
store.incRef();
return Releasables.releaseOnce(() -> {
final PlainActionFuture<Void> future = new PlainActionFuture<>();
threadPool.generic().execute(new ActionRunnable<>(future) {
@Override
protected void doRun() {
store.decRef();
listener.onResponse(null);
}
});
assert threadPool.generic().isShutdown() == false;
threadPool.generic().execute(ActionRunnable.wrap(future, l -> {
store.decRef();
l.onResponse(null);
}));
FutureUtils.get(future);
});
}
Expand Down
3 changes: 3 additions & 0 deletions server/src/main/java/org/elasticsearch/node/Node.java
Expand Up @@ -592,6 +592,7 @@ protected Node(
.filter(p -> p instanceof LifecycleComponent)
.map(p -> (LifecycleComponent) p).collect(Collectors.toList());
resourcesToClose.addAll(pluginLifecycleComponents);
resourcesToClose.add(injector.getInstance(PeerRecoverySourceService.class));
this.pluginLifecycleComponents = Collections.unmodifiableList(pluginLifecycleComponents);
client.initialize(injector.getInstance(new Key<Map<ActionType, TransportAction>>() {}), transportService.getTaskManager(),
() -> clusterService.localNode().getId(), transportService.getRemoteClusterService());
Expand Down Expand Up @@ -688,6 +689,7 @@ public Node start() throws NodeValidationException {
assert localNodeFactory.getNode() != null;
assert transportService.getLocalNode().equals(localNodeFactory.getNode())
: "transportService has a different local node than the factory provided";
injector.getInstance(PeerRecoverySourceService.class).start();
final MetaData onDiskMetadata;
// we load the global state here (the persistent part of the cluster state stored on disk) to
// pass it to the bootstrap checks to allow plugins to enforce certain preconditions based on the recovered state.
Expand Down Expand Up @@ -833,6 +835,7 @@ public synchronized void close() throws IOException {
toClose.add(injector.getInstance(IndicesService.class));
// close filter/fielddata caches after indices
toClose.add(injector.getInstance(IndicesStore.class));
toClose.add(injector.getInstance(PeerRecoverySourceService.class));
toClose.add(() -> stopWatch.stop().start("cluster"));
toClose.add(injector.getInstance(ClusterService.class));
toClose.add(() -> stopWatch.stop().start("node_connections_service"));
Expand Down

0 comments on commit 2ad3593

Please sign in to comment.