diff --git a/server/src/internalClusterTest/java/org/elasticsearch/discovery/ClusterDisruptionIT.java b/server/src/internalClusterTest/java/org/elasticsearch/discovery/ClusterDisruptionIT.java index 7f94809e64fa6..cd9adea500dbd 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/discovery/ClusterDisruptionIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/discovery/ClusterDisruptionIT.java @@ -16,6 +16,7 @@ import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.index.IndexRequestBuilder; import org.elasticsearch.action.support.PlainActionFuture; +import org.elasticsearch.action.support.UnsafePlainActionFuture; import org.elasticsearch.client.internal.Client; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.action.shard.ShardStateAction; @@ -26,6 +27,7 @@ import org.elasticsearch.cluster.routing.Murmur3HashFunction; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.ShardRoutingState; +import org.elasticsearch.cluster.service.ClusterApplierService; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; @@ -542,7 +544,7 @@ public void testRejoinWhileBeingRemoved() { }); final ClusterService dataClusterService = internalCluster().getInstance(ClusterService.class, dataNode); - final PlainActionFuture failedLeader = new PlainActionFuture<>() { + final PlainActionFuture failedLeader = new UnsafePlainActionFuture<>(ClusterApplierService.CLUSTER_UPDATE_THREAD_NAME) { @Override protected boolean blockingAllowed() { // we're deliberately blocking the cluster applier on the master until the data node starts to rejoin diff --git a/server/src/internalClusterTest/java/org/elasticsearch/snapshots/CorruptedBlobStoreRepositoryIT.java b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/CorruptedBlobStoreRepositoryIT.java index f507e27c6073e..9eb9041aa51f1 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/snapshots/CorruptedBlobStoreRepositoryIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/CorruptedBlobStoreRepositoryIT.java @@ -299,7 +299,8 @@ public void testHandlingMissingRootLevelSnapshotMetadata() throws Exception { final ThreadPool threadPool = internalCluster().getCurrentMasterNodeInstance(ThreadPool.class); assertThat( PlainActionFuture.get( - f -> threadPool.generic() + // any other executor than generic and management + f -> threadPool.executor(ThreadPool.Names.SNAPSHOT) .execute( ActionRunnable.supply( f, diff --git a/server/src/main/java/org/elasticsearch/action/support/PlainActionFuture.java b/server/src/main/java/org/elasticsearch/action/support/PlainActionFuture.java index e2b8fcbf2825c..938fe4c84480b 100644 --- a/server/src/main/java/org/elasticsearch/action/support/PlainActionFuture.java +++ b/server/src/main/java/org/elasticsearch/action/support/PlainActionFuture.java @@ -9,10 +9,12 @@ package org.elasticsearch.action.support; import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionFuture; import org.elasticsearch.action.ActionListener; import org.elasticsearch.cluster.service.ClusterApplierService; import org.elasticsearch.cluster.service.MasterService; +import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.util.concurrent.FutureUtils; import org.elasticsearch.common.util.concurrent.UncategorizedExecutionException; import org.elasticsearch.core.CheckedConsumer; @@ -37,6 +39,7 @@ public void onResponse(@Nullable T result) { @Override public void onFailure(Exception e) { + assert assertCompleteAllowed(); if (sync.setException(Objects.requireNonNull(e))) { done(false); } @@ -113,6 +116,7 @@ public boolean isCancelled() { @Override public boolean cancel(boolean mayInterruptIfRunning) { + assert assertCompleteAllowed(); if (sync.cancel() == false) { return false; } @@ -130,6 +134,7 @@ public boolean cancel(boolean mayInterruptIfRunning) { * @return true if the state was successfully changed. */ protected final boolean set(@Nullable T value) { + assert assertCompleteAllowed(); boolean result = sync.set(value); if (result) { done(true); @@ -399,4 +404,27 @@ public static T get(CheckedConsumer extends PlainActionFuture { + + private final String unsafeExecutor; + private final String unsafeExecutor2; + + public UnsafePlainActionFuture(String unsafeExecutor) { + this(unsafeExecutor, null); + } + + public UnsafePlainActionFuture(String unsafeExecutor, String unsafeExecutor2) { + Objects.requireNonNull(unsafeExecutor); + this.unsafeExecutor = unsafeExecutor; + this.unsafeExecutor2 = unsafeExecutor2; + } + + @Override + boolean allowedExecutors(Thread thread1, Thread thread2) { + return super.allowedExecutors(thread1, thread2) + || unsafeExecutor.equals(EsExecutors.executorName(thread1)) + || unsafeExecutor2 == null + || unsafeExecutor2.equals(EsExecutors.executorName(thread1)); + } + + public static T get(CheckedConsumer, E> e, String allowedExecutor) throws E { + PlainActionFuture fut = new UnsafePlainActionFuture<>(allowedExecutor); + e.accept(fut); + return fut.actionGet(); + } +} diff --git a/server/src/main/java/org/elasticsearch/client/internal/support/AbstractClient.java b/server/src/main/java/org/elasticsearch/client/internal/support/AbstractClient.java index 966299408a678..f4e86c8a4eca6 100644 --- a/server/src/main/java/org/elasticsearch/client/internal/support/AbstractClient.java +++ b/server/src/main/java/org/elasticsearch/client/internal/support/AbstractClient.java @@ -59,6 +59,7 @@ import org.elasticsearch.action.search.TransportSearchAction; import org.elasticsearch.action.search.TransportSearchScrollAction; import org.elasticsearch.action.support.PlainActionFuture; +import org.elasticsearch.action.support.UnsafePlainActionFuture; import org.elasticsearch.action.termvectors.MultiTermVectorsAction; import org.elasticsearch.action.termvectors.MultiTermVectorsRequest; import org.elasticsearch.action.termvectors.MultiTermVectorsRequestBuilder; @@ -410,7 +411,13 @@ protected void * on the result before it goes out of scope. * @param reference counted result type */ - private static class RefCountedFuture extends PlainActionFuture { + // todo: the use of UnsafePlainActionFuture here is quite broad, we should find a better way to be more specific + // (unless making all usages safe is easy). + private static class RefCountedFuture extends UnsafePlainActionFuture { + + private RefCountedFuture() { + super(ThreadPool.Names.GENERIC); + } @Override public final void onResponse(R result) { diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java index 015d3899ab90d..9bf381e6f4719 100644 --- a/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java +++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java @@ -276,6 +276,10 @@ public static String executorName(String threadName) { return threadName.substring(executorNameStart + 1, executorNameEnd); } + public static String executorName(Thread thread) { + return executorName(thread.getName()); + } + public static ThreadFactory daemonThreadFactory(Settings settings, String namePrefix) { return daemonThreadFactory(threadName(settings, namePrefix)); } diff --git a/server/src/main/java/org/elasticsearch/index/engine/CompletionStatsCache.java b/server/src/main/java/org/elasticsearch/index/engine/CompletionStatsCache.java index f66b856471894..91eea9f6b1b12 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/CompletionStatsCache.java +++ b/server/src/main/java/org/elasticsearch/index/engine/CompletionStatsCache.java @@ -15,10 +15,12 @@ import org.apache.lucene.search.suggest.document.CompletionTerms; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.PlainActionFuture; +import org.elasticsearch.action.support.UnsafePlainActionFuture; import org.elasticsearch.common.FieldMemoryStats; import org.elasticsearch.common.regex.Regex; import org.elasticsearch.common.util.CollectionUtils; import org.elasticsearch.search.suggest.completion.CompletionStats; +import org.elasticsearch.threadpool.ThreadPool; import java.util.HashMap; import java.util.Map; @@ -42,7 +44,7 @@ public CompletionStatsCache(Supplier searcherSupplier) { } public CompletionStats get(String... fieldNamePatterns) { - final PlainActionFuture newFuture = new PlainActionFuture<>(); + final PlainActionFuture newFuture = new UnsafePlainActionFuture<>(ThreadPool.Names.MANAGEMENT); final PlainActionFuture oldFuture = completionStatsFutureRef.compareAndExchange(null, newFuture); if (oldFuture != null) { diff --git a/server/src/main/java/org/elasticsearch/index/engine/Engine.java b/server/src/main/java/org/elasticsearch/index/engine/Engine.java index 65f47dd3994af..c219e16659c99 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -36,6 +36,7 @@ import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.action.support.SubscribableListener; +import org.elasticsearch.action.support.UnsafePlainActionFuture; import org.elasticsearch.cluster.service.ClusterApplierService; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.logging.Loggers; @@ -75,6 +76,7 @@ import org.elasticsearch.index.translog.Translog; import org.elasticsearch.index.translog.TranslogStats; import org.elasticsearch.search.suggest.completion.CompletionStats; +import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.Transports; import java.io.Closeable; @@ -1956,7 +1958,7 @@ private boolean drainForClose() { logger.debug("drainForClose(): draining ops"); releaseEnsureOpenRef.close(); - final var future = new PlainActionFuture() { + final var future = new UnsafePlainActionFuture(ThreadPool.Names.GENERIC) { @Override protected boolean blockingAllowed() { // TODO remove this blocking, or at least do it elsewhere, see https://github.com/elastic/elasticsearch/issues/89821 diff --git a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java index a389020cdcde8..442a8c3b82dc6 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java @@ -13,6 +13,7 @@ import org.elasticsearch.action.admin.indices.flush.FlushRequest; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.support.PlainActionFuture; +import org.elasticsearch.action.support.UnsafePlainActionFuture; import org.elasticsearch.action.support.replication.TransportReplicationAction; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.MappingMetadata; @@ -869,7 +870,7 @@ protected final void recoverUnstartedReplica( routingTable ); try { - PlainActionFuture future = new PlainActionFuture<>(); + PlainActionFuture future = new UnsafePlainActionFuture<>(ThreadPool.Names.GENERIC); recovery.recoverToTarget(future); future.actionGet(); recoveryTarget.markAsDone(); diff --git a/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java b/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java index 3dc7201535e0a..d966a21a56b5f 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java @@ -20,6 +20,7 @@ import org.elasticsearch.action.ActionListenerResponseHandler; import org.elasticsearch.action.support.ChannelActionListener; import org.elasticsearch.action.support.PlainActionFuture; +import org.elasticsearch.action.support.UnsafePlainActionFuture; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodeUtils; import org.elasticsearch.cluster.node.VersionInformation; @@ -960,7 +961,7 @@ public void onFailure(Exception e) { protected void doRun() throws Exception { safeAwait(go); for (int iter = 0; iter < 10; iter++) { - PlainActionFuture listener = new PlainActionFuture<>(); + PlainActionFuture listener = new UnsafePlainActionFuture<>(ThreadPool.Names.GENERIC); final String info = sender + "_B_" + iter; serviceB.sendRequest( nodeA, @@ -996,7 +997,7 @@ public void onFailure(Exception e) { protected void doRun() throws Exception { go.await(); for (int iter = 0; iter < 10; iter++) { - PlainActionFuture listener = new PlainActionFuture<>(); + PlainActionFuture listener = new UnsafePlainActionFuture<>(ThreadPool.Names.GENERIC); final String info = sender + "_" + iter; final DiscoveryNode node = nodeB; // capture now try { @@ -3464,7 +3465,7 @@ public static void connectToNode(TransportService service, DiscoveryNode node) t * @param connectionProfile the connection profile to use when connecting to this node */ public static void connectToNode(TransportService service, DiscoveryNode node, ConnectionProfile connectionProfile) { - PlainActionFuture.get(fut -> service.connectToNode(node, connectionProfile, fut.map(x -> null))); + UnsafePlainActionFuture.get(fut -> service.connectToNode(node, connectionProfile, fut.map(x -> null)), ThreadPool.Names.GENERIC); } /** diff --git a/x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/shared/SharedBlobCacheService.java b/x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/shared/SharedBlobCacheService.java index 481f39d673410..c5ef1d7c2bf1d 100644 --- a/x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/shared/SharedBlobCacheService.java +++ b/x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/shared/SharedBlobCacheService.java @@ -14,6 +14,7 @@ import org.elasticsearch.action.ActionRunnable; import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.action.support.RefCountingListener; +import org.elasticsearch.action.support.UnsafePlainActionFuture; import org.elasticsearch.blobcache.BlobCacheMetrics; import org.elasticsearch.blobcache.BlobCacheUtils; import org.elasticsearch.blobcache.common.ByteRange; @@ -36,6 +37,7 @@ import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.monitor.fs.FsProbe; import org.elasticsearch.node.NodeRoleSettings; +import org.elasticsearch.repositories.blobstore.BlobStoreRepository; import org.elasticsearch.threadpool.ThreadPool; import java.io.IOException; @@ -1136,7 +1138,9 @@ private int readMultiRegions( int startRegion, int endRegion ) throws InterruptedException, ExecutionException { - final PlainActionFuture readsComplete = new PlainActionFuture<>(); + final PlainActionFuture readsComplete = new UnsafePlainActionFuture<>( + BlobStoreRepository.STATELESS_SHARD_PREWARMING_THREAD_NAME + ); final AtomicInteger bytesRead = new AtomicInteger(); try (var listeners = new RefCountingListener(1, readsComplete)) { for (int region = startRegion; region <= endRegion; region++) { diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java index baf1509c73883..67c4c769d21d1 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java @@ -26,6 +26,7 @@ import org.elasticsearch.action.support.ListenerTimeouts; import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.action.support.ThreadedActionListener; +import org.elasticsearch.action.support.UnsafePlainActionFuture; import org.elasticsearch.client.internal.Client; import org.elasticsearch.client.internal.RemoteClusterClient; import org.elasticsearch.cluster.ClusterName; @@ -599,7 +600,11 @@ private void updateMappings( Client followerClient, Index followerIndex ) { - final PlainActionFuture indexMetadataFuture = new PlainActionFuture<>(); + // todo: this could manifest in production and seems we could make this async easily. + final PlainActionFuture indexMetadataFuture = new UnsafePlainActionFuture<>( + Ccr.CCR_THREAD_POOL_NAME, + ThreadPool.Names.GENERIC + ); final long startTimeInNanos = System.nanoTime(); final Supplier timeout = () -> { final long elapsedInNanos = System.nanoTime() - startTimeInNanos; diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java index 3a16f368d322a..04a97ad9e7f95 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java @@ -15,6 +15,7 @@ import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.support.ActionTestUtils; import org.elasticsearch.action.support.PlainActionFuture; +import org.elasticsearch.action.support.UnsafePlainActionFuture; import org.elasticsearch.action.support.replication.PostWriteRefresh; import org.elasticsearch.action.support.replication.ReplicationResponse; import org.elasticsearch.action.support.replication.TransportWriteAction; @@ -802,7 +803,7 @@ class CcrAction extends ReplicationAction listener) { - final PlainActionFuture permitFuture = new PlainActionFuture<>(); + final PlainActionFuture permitFuture = new UnsafePlainActionFuture<>(ThreadPool.Names.GENERIC); primary.acquirePrimaryOperationPermit(permitFuture, EsExecutors.DIRECT_EXECUTOR_SERVICE); final TransportWriteAction.WritePrimaryResult ccrResult; final var threadpool = mock(ThreadPool.class); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/inference/InferenceRunner.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/inference/InferenceRunner.java index 637b37853363f..06075363997c7 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/inference/InferenceRunner.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/inference/InferenceRunner.java @@ -17,6 +17,7 @@ import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.support.PlainActionFuture; +import org.elasticsearch.action.support.UnsafePlainActionFuture; import org.elasticsearch.client.internal.Client; import org.elasticsearch.client.internal.OriginSettingClient; import org.elasticsearch.common.settings.Settings; @@ -31,6 +32,7 @@ import org.elasticsearch.xpack.core.ClientHelper; import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig; import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; +import org.elasticsearch.xpack.ml.MachineLearning; import org.elasticsearch.xpack.ml.dataframe.DestinationIndex; import org.elasticsearch.xpack.ml.dataframe.stats.DataCountsTracker; import org.elasticsearch.xpack.ml.dataframe.stats.ProgressTracker; @@ -100,7 +102,9 @@ public void run(String modelId) { LOGGER.info("[{}] Started inference on test data against model [{}]", config.getId(), modelId); try { - PlainActionFuture localModelPlainActionFuture = new PlainActionFuture<>(); + PlainActionFuture localModelPlainActionFuture = new UnsafePlainActionFuture<>( + MachineLearning.UTILITY_THREAD_POOL_NAME + ); modelLoadingService.getModelForInternalInference(modelId, localModelPlainActionFuture); InferenceState inferenceState = restoreInferenceState(); dataCountsTracker.setTestDocsCount(inferenceState.processedTestDocsCount); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/assignment/TrainedModelAssignmentNodeService.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/assignment/TrainedModelAssignmentNodeService.java index e181e1fc86684..7052e6f147b36 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/assignment/TrainedModelAssignmentNodeService.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/assignment/TrainedModelAssignmentNodeService.java @@ -13,6 +13,7 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.search.SearchPhaseExecutionException; import org.elasticsearch.action.support.PlainActionFuture; +import org.elasticsearch.action.support.UnsafePlainActionFuture; import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterState; @@ -205,7 +206,9 @@ void loadQueuedModels() { if (stopped) { return; } - final PlainActionFuture listener = new PlainActionFuture<>(); + final PlainActionFuture listener = new UnsafePlainActionFuture<>( + MachineLearning.UTILITY_THREAD_POOL_NAME + ); try { deploymentManager.startDeployment(loadingTask, listener); // This needs to be synchronous here in the utility thread to keep queueing order diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/full/CacheService.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/full/CacheService.java index 6e480a21d507a..636d138c8a3e2 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/full/CacheService.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/full/CacheService.java @@ -10,6 +10,7 @@ import org.apache.logging.log4j.Logger; import org.apache.lucene.store.AlreadyClosedException; import org.elasticsearch.action.support.PlainActionFuture; +import org.elasticsearch.action.support.UnsafePlainActionFuture; import org.elasticsearch.blobcache.common.ByteRange; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.UUIDs; @@ -347,7 +348,7 @@ public void markShardAsEvictedInCache(String snapshotUUID, String snapshotIndexN if (allowShardsEvictions) { final ShardEviction shardEviction = new ShardEviction(snapshotUUID, snapshotIndexName, shardId); pendingShardsEvictions.computeIfAbsent(shardEviction, shard -> { - final PlainActionFuture future = new PlainActionFuture<>(); + final PlainActionFuture future = new UnsafePlainActionFuture<>(ThreadPool.Names.GENERIC); threadPool.generic().execute(new AbstractRunnable() { @Override protected void doRun() { diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/Security.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/Security.java index ecfce2f858428..84fa92bb7d2d4 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/Security.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/Security.java @@ -25,6 +25,7 @@ import org.elasticsearch.action.support.ActionFilter; import org.elasticsearch.action.support.DestructiveOperations; import org.elasticsearch.action.support.PlainActionFuture; +import org.elasticsearch.action.support.UnsafePlainActionFuture; import org.elasticsearch.bootstrap.BootstrapCheck; import org.elasticsearch.client.internal.Client; import org.elasticsearch.cluster.ClusterState; @@ -2130,7 +2131,7 @@ private void reloadRemoteClusterCredentials(Settings settingsWithKeystore) { return; } - final PlainActionFuture future = new PlainActionFuture<>(); + final PlainActionFuture future = new UnsafePlainActionFuture<>(ThreadPool.Names.GENERIC); getClient().execute( ActionTypes.RELOAD_REMOTE_CLUSTER_CREDENTIALS_ACTION, new TransportReloadRemoteClusterCredentialsAction.Request(settingsWithKeystore),