From 12a4177690e1d27931c24e2a51adcfd564d876fb Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Mon, 2 Sep 2019 15:43:51 +0200 Subject: [PATCH] Wait for all Rec. to Stop on Node Close (#46178) * Wait for all Rec. to Stop on Node Close * This issue is in the `RecoverySourceHandler#acquireStore`. If we submit the store release to the generic threadpool while it is getting shut down we never complete the futue we wait on (in the generic pool as well) and fail to ever release the store potentially. * Fixed by waiting for all recoveries to end on node close so that we aways have a healthy thread pool here * Closes #45956 --- .../recovery/PeerRecoverySourceService.java | 46 ++++++++++++++++++- .../recovery/RecoverySourceHandler.java | 15 +++--- .../java/org/elasticsearch/node/Node.java | 3 ++ .../PeerRecoverySourceServiceTests.java | 1 + 4 files changed, 57 insertions(+), 8 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoverySourceService.java b/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoverySourceService.java index ef47b153f5354..644a8e2eb5cb5 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoverySourceService.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoverySourceService.java @@ -24,10 +24,13 @@ import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.ChannelActionListener; +import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.FutureUtils; import org.elasticsearch.index.IndexService; import org.elasticsearch.index.shard.IndexEventListener; import org.elasticsearch.index.shard.IndexShard; @@ -50,7 +53,7 @@ * The source recovery accepts recovery requests from other peer shards and start the recovery process from this * source shard to the target shard. */ -public class PeerRecoverySourceService implements IndexEventListener { +public class PeerRecoverySourceService extends AbstractLifecycleComponent implements IndexEventListener { private static final Logger logger = LogManager.getLogger(PeerRecoverySourceService.class); @@ -74,6 +77,19 @@ public PeerRecoverySourceService(TransportService transportService, IndicesServi new StartRecoveryTransportRequestHandler()); } + @Override + protected void doStart() { + } + + @Override + protected void doStop() { + ongoingRecoveries.awaitEmpty(); + } + + @Override + protected void doClose() { + } + @Override public void beforeIndexShardClosed(ShardId shardId, @Nullable IndexShard indexShard, Settings indexSettings) { @@ -118,9 +134,14 @@ final int numberOfOngoingRecoveries() { } final class OngoingRecoveries { + private final Map ongoingRecoveries = new HashMap<>(); + @Nullable + private List> emptyListeners; + synchronized RecoverySourceHandler addNewRecovery(StartRecoveryRequest request, IndexShard shard) { + assert lifecycle.started(); final ShardRecoveryContext shardContext = ongoingRecoveries.computeIfAbsent(shard, s -> new ShardRecoveryContext()); RecoverySourceHandler handler = shardContext.addNewRecovery(request, shard); shard.recoveryStats().incCurrentAsSource(); @@ -138,6 +159,13 @@ synchronized void remove(IndexShard shard, RecoverySourceHandler handler) { if (shardRecoveryContext.recoveryHandlers.isEmpty()) { ongoingRecoveries.remove(shard); } + if (ongoingRecoveries.isEmpty()) { + if (emptyListeners != null) { + final List> onEmptyListeners = emptyListeners; + emptyListeners = null; + ActionListener.onResponse(onEmptyListeners, null); + } + } } synchronized void cancel(IndexShard shard, String reason) { @@ -157,6 +185,22 @@ synchronized void cancel(IndexShard shard, String reason) { } } + void awaitEmpty() { + assert lifecycle.stoppedOrClosed(); + final PlainActionFuture future; + synchronized (this) { + if (ongoingRecoveries.isEmpty()) { + return; + } + future = new PlainActionFuture<>(); + if (emptyListeners == null) { + emptyListeners = new ArrayList<>(); + } + emptyListeners.add(future); + } + FutureUtils.get(future); + } + private final class ShardRecoveryContext { final Set recoveryHandlers = new HashSet<>(); diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java index e5ab1d7890eb4..8324dd023b703 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java @@ -402,13 +402,14 @@ private Releasable acquireStore(Store store) { store.incRef(); return Releasables.releaseOnce(() -> { final PlainActionFuture future = new PlainActionFuture<>(); - threadPool.generic().execute(new ActionRunnable<>(future) { - @Override - protected void doRun() { - store.decRef(); - listener.onResponse(null); - } - }); + assert threadPool.generic().isShutdown() == false; + // TODO: We shouldn't use the generic thread pool here as we already execute this from the generic pool. + // While practically unlikely at a min pool size of 128 we could technically block the whole pool by waiting on futures + // below and thus make it impossible for the store release to execute which in turn would block the futures forever + threadPool.generic().execute(ActionRunnable.wrap(future, l -> { + store.decRef(); + l.onResponse(null); + })); FutureUtils.get(future); }); } diff --git a/server/src/main/java/org/elasticsearch/node/Node.java b/server/src/main/java/org/elasticsearch/node/Node.java index cb7888eddee96..4f892948066db 100644 --- a/server/src/main/java/org/elasticsearch/node/Node.java +++ b/server/src/main/java/org/elasticsearch/node/Node.java @@ -593,6 +593,7 @@ protected Node( .filter(p -> p instanceof LifecycleComponent) .map(p -> (LifecycleComponent) p).collect(Collectors.toList()); resourcesToClose.addAll(pluginLifecycleComponents); + resourcesToClose.add(injector.getInstance(PeerRecoverySourceService.class)); this.pluginLifecycleComponents = Collections.unmodifiableList(pluginLifecycleComponents); client.initialize(injector.getInstance(new Key>() {}), transportService.getTaskManager(), () -> clusterService.localNode().getId(), transportService.getRemoteClusterService()); @@ -689,6 +690,7 @@ public Node start() throws NodeValidationException { assert localNodeFactory.getNode() != null; assert transportService.getLocalNode().equals(localNodeFactory.getNode()) : "transportService has a different local node than the factory provided"; + injector.getInstance(PeerRecoverySourceService.class).start(); final MetaData onDiskMetadata; // we load the global state here (the persistent part of the cluster state stored on disk) to // pass it to the bootstrap checks to allow plugins to enforce certain preconditions based on the recovered state. @@ -834,6 +836,7 @@ public synchronized void close() throws IOException { toClose.add(injector.getInstance(IndicesService.class)); // close filter/fielddata caches after indices toClose.add(injector.getInstance(IndicesStore.class)); + toClose.add(injector.getInstance(PeerRecoverySourceService.class)); toClose.add(() -> stopWatch.stop().start("cluster")); toClose.add(injector.getInstance(ClusterService.class)); toClose.add(() -> stopWatch.stop().start("node_connections_service")); diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoverySourceServiceTests.java b/server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoverySourceServiceTests.java index 72eb2baeca942..491c3974e5bd3 100644 --- a/server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoverySourceServiceTests.java +++ b/server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoverySourceServiceTests.java @@ -43,6 +43,7 @@ public void testDuplicateRecoveries() throws IOException { StartRecoveryRequest startRecoveryRequest = new StartRecoveryRequest(primary.shardId(), randomAlphaOfLength(10), getFakeDiscoNode("source"), getFakeDiscoNode("target"), Store.MetadataSnapshot.EMPTY, randomBoolean(), randomLong(), SequenceNumbers.UNASSIGNED_SEQ_NO); + peerRecoverySourceService.start(); RecoverySourceHandler handler = peerRecoverySourceService.ongoingRecoveries.addNewRecovery(startRecoveryRequest, primary); DelayRecoveryException delayRecoveryException = expectThrows(DelayRecoveryException.class, () -> peerRecoverySourceService.ongoingRecoveries.addNewRecovery(startRecoveryRequest, primary));