From bc534e308c6965cd539072f4d1c5e3a192e9d881 Mon Sep 17 00:00:00 2001 From: David Turner Date: Tue, 24 Jan 2023 07:48:03 +0000 Subject: [PATCH 1/2] Simplify `ThreadedActionListener` construction - There's no real need for the caller's `Logger` just in case an impossible situation happens - we will get enough info from the log message. - Almost nobody uses the `forceExecution` feature, default it to `false`. - Accept an `ExecutorService` rather than requiring the whole `ThreadPool` only to look up the executor later on. --- .../TransportSnapshotsStatusAction.java | 8 ++--- .../support/ThreadedActionListener.java | 34 +++++++++--------- .../cluster/InternalClusterInfoService.java | 7 ++-- .../index/shard/StoreRecovery.java | 7 ++-- .../recovery/RecoverySourceHandler.java | 11 +++--- .../recovery/plan/ShardSnapshotsService.java | 2 +- .../blobstore/BlobStoreRepository.java | 2 +- .../elasticsearch/transport/TcpTransport.java | 2 +- .../support/ThreadedActionListenerTests.java | 35 ++++++++++++++++--- .../TransportMasterNodeActionTests.java | 2 +- .../TransportHealthNodeActionTests.java | 2 +- .../concurrent/DeterministicTaskQueue.java | 5 +++ .../xpack/ccr/repository/CcrRepository.java | 4 +-- .../xpack/ccr/rest/RestCcrStatsAction.java | 7 ++-- .../xpack/ccr/rest/RestFollowStatsAction.java | 7 ++-- .../TransportDeprecationInfoAction.java | 7 ++-- .../TransportDeleteExpiredDataAction.java | 2 +- .../retention/ExpiredAnnotationsRemover.java | 7 ++-- .../retention/ExpiredForecastsRemover.java | 2 +- .../ExpiredModelSnapshotsRemover.java | 7 ++-- .../job/retention/ExpiredResultsRemover.java | 7 ++-- .../blobstore/testkit/BlobAnalyzeAction.java | 2 +- 22 files changed, 84 insertions(+), 85 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/TransportSnapshotsStatusAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/TransportSnapshotsStatusAction.java index 55e98327a0a47..c0d69c0bdcf28 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/TransportSnapshotsStatusAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/TransportSnapshotsStatusAction.java @@ -142,10 +142,9 @@ protected void masterOperation( TransportNodesSnapshotsStatus.TYPE, new TransportNodesSnapshotsStatus.Request(nodesIds.toArray(Strings.EMPTY_ARRAY)).snapshots(snapshots) .timeout(request.masterNodeTimeout()), + // fork to snapshot meta since building the response is expensive for large snapshots new ThreadedActionListener<>( - logger, - threadPool, - ThreadPool.Names.SNAPSHOT_META, // fork to snapshot meta since building the response is expensive for large snapshots + threadPool.executor(ThreadPool.Names.SNAPSHOT_META), ActionListener.wrap( nodeSnapshotStatuses -> buildResponse( snapshotsInProgress, @@ -156,8 +155,7 @@ protected void masterOperation( listener ), listener::onFailure - ), - false + ) ) ); } else { diff --git a/server/src/main/java/org/elasticsearch/action/support/ThreadedActionListener.java b/server/src/main/java/org/elasticsearch/action/support/ThreadedActionListener.java index f85a365a15006..6e575a7f7dd96 100644 --- a/server/src/main/java/org/elasticsearch/action/support/ThreadedActionListener.java +++ b/server/src/main/java/org/elasticsearch/action/support/ThreadedActionListener.java @@ -8,39 +8,37 @@ package org.elasticsearch.action.support; +import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRunnable; import org.elasticsearch.common.util.concurrent.AbstractRunnable; -import org.elasticsearch.threadpool.ThreadPool; + +import java.util.concurrent.ExecutorService; /** - * An action listener that wraps another action listener and threading its execution. + * An action listener that wraps another action listener and dispatches its completion to an executor. */ public final class ThreadedActionListener extends ActionListener.Delegating { - private final Logger logger; - private final ThreadPool threadPool; - private final String executor; + private static final Logger logger = LogManager.getLogger(ThreadedActionListener.class); + + private final ExecutorService executor; private final boolean forceExecution; - public ThreadedActionListener( - Logger logger, - ThreadPool threadPool, - String executor, - ActionListener listener, - boolean forceExecution - ) { + public ThreadedActionListener(ExecutorService executor, ActionListener listener) { + this(executor, false, listener); + } + + public ThreadedActionListener(ExecutorService executor, boolean forceExecution, ActionListener listener) { super(listener); - this.logger = logger; - this.threadPool = threadPool; - this.executor = executor; this.forceExecution = forceExecution; + this.executor = executor; } @Override public void onResponse(final Response response) { - threadPool.executor(executor).execute(new ActionRunnable<>(delegate) { + executor.execute(new ActionRunnable<>(delegate) { @Override public boolean isForceExecution() { return forceExecution; @@ -60,7 +58,7 @@ public String toString() { @Override public void onFailure(final Exception e) { - threadPool.executor(executor).execute(new AbstractRunnable() { + executor.execute(new AbstractRunnable() { @Override public boolean isForceExecution() { return forceExecution; @@ -84,8 +82,8 @@ public void onRejection(Exception e2) { @Override public void onFailure(Exception e) { + logger.error(() -> "failed to execute failure callback on [" + ThreadedActionListener.this + "]", e); assert false : e; - logger.error(() -> "failed to execute failure callback on [" + delegate + "]", e); } @Override diff --git a/server/src/main/java/org/elasticsearch/cluster/InternalClusterInfoService.java b/server/src/main/java/org/elasticsearch/cluster/InternalClusterInfoService.java index 113062ee4d4d2..40f3f344c46db 100644 --- a/server/src/main/java/org/elasticsearch/cluster/InternalClusterInfoService.java +++ b/server/src/main/java/org/elasticsearch/cluster/InternalClusterInfoService.java @@ -200,9 +200,7 @@ private void fetchIndicesStats() { .stats( indicesStatsRequest, new ThreadedActionListener<>( - logger, - threadPool, - ThreadPool.Names.MANAGEMENT, + threadPool.executor(ThreadPool.Names.MANAGEMENT), ActionListener.releaseAfter(new ActionListener<>() { @Override public void onResponse(IndicesStatsResponse indicesStatsResponse) { @@ -277,8 +275,7 @@ public void onFailure(Exception e) { } indicesStatsSummary = IndicesStatsSummary.EMPTY; } - }, fetchRefs.acquire()), - false + }, fetchRefs.acquire()) ) ); } diff --git a/server/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java b/server/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java index f018ecb1bf4a3..669b9f564f62e 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java +++ b/server/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java @@ -543,11 +543,8 @@ private void restore( // BwC path, running against an old version master that did not add the IndexId to the recovery source repository.getRepositoryData( new ThreadedActionListener<>( - logger, - indexShard.getThreadPool(), - ThreadPool.Names.GENERIC, - indexIdListener.map(repositoryData -> repositoryData.resolveIndexId(indexId.getName())), - false + indexShard.getThreadPool().executor(ThreadPool.Names.GENERIC), + indexIdListener.map(repositoryData -> repositoryData.resolveIndexId(indexId.getName())) ) ); } else { diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java index e3130fc597fff..4f6cca32a3c0a 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java @@ -286,11 +286,8 @@ && isTargetSameHistory() shard.removePeerRecoveryRetentionLease( request.targetNode().getId(), new ThreadedActionListener<>( - logger, - shard.getThreadPool(), - ThreadPool.Names.GENERIC, - deleteRetentionLeaseStep, - false + shard.getThreadPool().executor(ThreadPool.Names.GENERIC), + deleteRetentionLeaseStep ) ); } catch (RetentionLeaseNotFoundException e) { @@ -980,7 +977,7 @@ void createRetentionLease(final long startingSeqNo, ActionListener cloneRetentionLeaseStep = new StepListener<>(); final RetentionLease clonedLease = shard.cloneLocalPeerRecoveryRetentionLease( request.targetNode().getId(), - new ThreadedActionListener<>(logger, shard.getThreadPool(), ThreadPool.Names.GENERIC, cloneRetentionLeaseStep, false) + new ThreadedActionListener<>(shard.getThreadPool().executor(ThreadPool.Names.GENERIC), cloneRetentionLeaseStep) ); logger.trace("cloned primary's retention lease as [{}]", clonedLease); cloneRetentionLeaseStep.addListener(listener.map(rr -> clonedLease)); @@ -995,7 +992,7 @@ void createRetentionLease(final long startingSeqNo, ActionListener(logger, shard.getThreadPool(), ThreadPool.Names.GENERIC, addRetentionLeaseStep, false) + new ThreadedActionListener<>(shard.getThreadPool().executor(ThreadPool.Names.GENERIC), addRetentionLeaseStep) ); addRetentionLeaseStep.addListener(listener.map(rr -> newLease)); logger.trace("created retention lease with estimated checkpoint of [{}]", estimatedGlobalCheckpoint); diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/plan/ShardSnapshotsService.java b/server/src/main/java/org/elasticsearch/indices/recovery/plan/ShardSnapshotsService.java index 762b6af6586b2..c6bb5fb1bb278 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/plan/ShardSnapshotsService.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/plan/ShardSnapshotsService.java @@ -104,7 +104,7 @@ public void fetchLatestSnapshotsForShard(ShardId shardId, ActionListener(logger, threadPool, ThreadPool.Names.GENERIC, listener.map(this::fetchSnapshotFiles), false) + new ThreadedActionListener<>(threadPool.executor(ThreadPool.Names.GENERIC), listener.map(this::fetchSnapshotFiles)) ); } diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index 9dea01238a02f..0c5a3417f34cb 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -1982,7 +1982,7 @@ private void doGetRepositoryData(ActionListener listener) { clusterService, metadata.name(), loaded, - new ThreadedActionListener<>(logger, threadPool, ThreadPool.Names.GENERIC, listener.map(v -> loaded), false) + new ThreadedActionListener<>(threadPool.executor(ThreadPool.Names.GENERIC), listener.map(v -> loaded)) ); } } diff --git a/server/src/main/java/org/elasticsearch/transport/TcpTransport.java b/server/src/main/java/org/elasticsearch/transport/TcpTransport.java index 001a5bd31ba86..f211b67824da1 100644 --- a/server/src/main/java/org/elasticsearch/transport/TcpTransport.java +++ b/server/src/main/java/org/elasticsearch/transport/TcpTransport.java @@ -410,7 +410,7 @@ private void initiateConnection(DiscoveryNode node, ConnectionProfile connection node, connectionProfile, channels, - new ThreadedActionListener<>(logger, threadPool, ThreadPool.Names.GENERIC, listener, false) + new ThreadedActionListener<>(threadPool.executor(ThreadPool.Names.GENERIC), listener) ); for (TcpChannel channel : channels) { diff --git a/server/src/test/java/org/elasticsearch/action/support/ThreadedActionListenerTests.java b/server/src/test/java/org/elasticsearch/action/support/ThreadedActionListenerTests.java index 88af6e68e50e8..d5eb188d01380 100644 --- a/server/src/test/java/org/elasticsearch/action/support/ThreadedActionListenerTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/ThreadedActionListenerTests.java @@ -11,6 +11,7 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.DeterministicTaskQueue; import org.elasticsearch.core.TimeValue; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.FixedExecutorBuilder; @@ -47,11 +48,9 @@ public void testRejectionHandling() throws InterruptedException { for (int i = 0; i < listenerCount; i++) { final var pool = randomFrom(pools); final var listener = new ThreadedActionListener( - logger, - threadPool, - pool, - ActionListener.wrap(countdownLatch::countDown), - (pool.equals("fixed-bounded-queue") || pool.startsWith("scaling")) && rarely() + threadPool.executor(pool), + (pool.equals("fixed-bounded-queue") || pool.startsWith("scaling")) && rarely(), + ActionListener.wrap(countdownLatch::countDown) ); synchronized (closeFlag) { if (closeFlag.get() && shutdownUnsafePools.contains(pool)) { @@ -76,4 +75,30 @@ public void testRejectionHandling() throws InterruptedException { assertTrue(countdownLatch.await(10, TimeUnit.SECONDS)); } + public void testToString() { + var deterministicTaskQueue = new DeterministicTaskQueue(); + + assertEquals( + "ThreadedActionListener[DeterministicTaskQueue/forkingExecutor/NoopActionListener]", + new ThreadedActionListener(deterministicTaskQueue.getThreadPool().generic(), randomBoolean(), ActionListener.noop()) + .toString() + ); + + assertEquals( + "ThreadedActionListener[DeterministicTaskQueue/forkingExecutor/NoopActionListener]/onResponse", + PlainActionFuture.get(future -> new ThreadedActionListener(deterministicTaskQueue.getThreadPool(s -> { + future.onResponse(s.toString()); + return s; + }).generic(), randomBoolean(), ActionListener.noop()).onResponse(null)) + ); + + assertEquals( + "ThreadedActionListener[DeterministicTaskQueue/forkingExecutor/NoopActionListener]/onFailure", + PlainActionFuture.get(future -> new ThreadedActionListener(deterministicTaskQueue.getThreadPool(s -> { + future.onResponse(s.toString()); + return s; + }).generic(), randomBoolean(), ActionListener.noop()).onFailure(new ElasticsearchException("test"))) + ); + } + } diff --git a/server/src/test/java/org/elasticsearch/action/support/master/TransportMasterNodeActionTests.java b/server/src/test/java/org/elasticsearch/action/support/master/TransportMasterNodeActionTests.java index d904310d13ff2..81d3cc9b02c8a 100644 --- a/server/src/test/java/org/elasticsearch/action/support/master/TransportMasterNodeActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/master/TransportMasterNodeActionTests.java @@ -246,7 +246,7 @@ class Action extends TransportMasterNodeAction { @Override protected void doExecute(Task task, final Request request, ActionListener listener) { // remove unneeded threading by wrapping listener with SAME to prevent super.doExecute from wrapping it with LISTENER - super.doExecute(task, request, new ThreadedActionListener<>(logger, threadPool, ThreadPool.Names.SAME, listener, false)); + super.doExecute(task, request, new ThreadedActionListener<>(threadPool.executor(ThreadPool.Names.SAME), listener)); } @Override diff --git a/server/src/test/java/org/elasticsearch/health/node/action/TransportHealthNodeActionTests.java b/server/src/test/java/org/elasticsearch/health/node/action/TransportHealthNodeActionTests.java index 91d924d8b97c9..260c595f44bc2 100644 --- a/server/src/test/java/org/elasticsearch/health/node/action/TransportHealthNodeActionTests.java +++ b/server/src/test/java/org/elasticsearch/health/node/action/TransportHealthNodeActionTests.java @@ -190,7 +190,7 @@ class Action extends TransportHealthNodeAction { @Override protected void doExecute(Task task, final Request request, ActionListener listener) { // remove unneeded threading by wrapping listener with SAME to prevent super.doExecute from wrapping it with LISTENER - super.doExecute(task, request, new ThreadedActionListener<>(logger, threadPool, ThreadPool.Names.SAME, listener, false)); + super.doExecute(task, request, new ThreadedActionListener<>(threadPool.executor(ThreadPool.Names.SAME), listener)); } @Override diff --git a/test/framework/src/main/java/org/elasticsearch/common/util/concurrent/DeterministicTaskQueue.java b/test/framework/src/main/java/org/elasticsearch/common/util/concurrent/DeterministicTaskQueue.java index b27ab2899c671..771e257860406 100644 --- a/test/framework/src/main/java/org/elasticsearch/common/util/concurrent/DeterministicTaskQueue.java +++ b/test/framework/src/main/java/org/elasticsearch/common/util/concurrent/DeterministicTaskQueue.java @@ -309,6 +309,11 @@ public T invokeAny(Collection> tasks, long timeout, Ti public void execute(Runnable command) { scheduleNow(runnableWrapper.apply(command)); } + + @Override + public String toString() { + return "DeterministicTaskQueue/forkingExecutor"; + } }; @Override diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java index 92270b552ee4a..8f802644e72a8 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java @@ -192,7 +192,7 @@ public void getSnapshotInfo(GetSnapshotInfoContext context) { : "RemoteClusterRepository only supports " + SNAPSHOT_ID + " as the SnapshotId but saw " + snapshotIds; try { csDeduplicator.execute( - new ThreadedActionListener<>(logger, threadPool, ThreadPool.Names.SNAPSHOT_META, context.map(response -> { + new ThreadedActionListener<>(threadPool.executor(ThreadPool.Names.SNAPSHOT_META), context.map(response -> { Metadata responseMetadata = response.metadata(); Map indicesMap = responseMetadata.indices(); return new SnapshotInfo( @@ -203,7 +203,7 @@ public void getSnapshotInfo(GetSnapshotInfoContext context) { response.getNodes().getMaxNodeVersion(), SnapshotState.SUCCESS ); - }), false) + })) ); } catch (Exception e) { assert false : e; diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/rest/RestCcrStatsAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/rest/RestCcrStatsAction.java index c3e93b418d86c..6e5c2e4f396ca 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/rest/RestCcrStatsAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/rest/RestCcrStatsAction.java @@ -42,11 +42,8 @@ protected RestChannelConsumer prepareRequest(final RestRequest restRequest, fina CcrStatsAction.INSTANCE, request, new ThreadedActionListener<>( - logger, - client.threadPool(), - Ccr.CCR_THREAD_POOL_NAME, - new RestChunkedToXContentListener<>(channel), - false + client.threadPool().executor(Ccr.CCR_THREAD_POOL_NAME), + new RestChunkedToXContentListener<>(channel) ) ); } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/rest/RestFollowStatsAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/rest/RestFollowStatsAction.java index 61c93bf6ac42f..07fdaef94637c 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/rest/RestFollowStatsAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/rest/RestFollowStatsAction.java @@ -44,11 +44,8 @@ protected RestChannelConsumer prepareRequest(final RestRequest restRequest, fina FollowStatsAction.INSTANCE, request, new ThreadedActionListener<>( - logger, - client.threadPool(), - Ccr.CCR_THREAD_POOL_NAME, - new RestChunkedToXContentListener<>(channel), - false + client.threadPool().executor(Ccr.CCR_THREAD_POOL_NAME), + new RestChunkedToXContentListener<>(channel) ) ); } diff --git a/x-pack/plugin/deprecation/src/main/java/org/elasticsearch/xpack/deprecation/TransportDeprecationInfoAction.java b/x-pack/plugin/deprecation/src/main/java/org/elasticsearch/xpack/deprecation/TransportDeprecationInfoAction.java index 17311571188f1..3f42c3b082c1f 100644 --- a/x-pack/plugin/deprecation/src/main/java/org/elasticsearch/xpack/deprecation/TransportDeprecationInfoAction.java +++ b/x-pack/plugin/deprecation/src/main/java/org/elasticsearch/xpack/deprecation/TransportDeprecationInfoAction.java @@ -126,9 +126,7 @@ protected final void masterOperation( PLUGIN_CHECKERS, components, new ThreadedActionListener<>( - logger, - client.threadPool(), - ThreadPool.Names.GENERIC, + client.threadPool().executor(ThreadPool.Names.GENERIC), listener.map( deprecationIssues -> DeprecationInfoAction.Response.from( state, @@ -140,8 +138,7 @@ protected final void masterOperation( deprecationIssues, skipTheseDeprecations ) - ), - false + ) ) ); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteExpiredDataAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteExpiredDataAction.java index bdb01a2c70f06..96a106ba525ba 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteExpiredDataAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteExpiredDataAction.java @@ -209,7 +209,7 @@ void deleteExpiredData( // thread is a disaster. remover.remove( requestsPerSecond, - new ThreadedActionListener<>(logger, threadPool, executor, nextListener, false), + new ThreadedActionListener<>(threadPool.executor(executor), nextListener), isTimedOutSupplier ); } else { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredAnnotationsRemover.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredAnnotationsRemover.java index f0a82ca2e7741..2ef38e3e3693c 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredAnnotationsRemover.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredAnnotationsRemover.java @@ -125,11 +125,8 @@ private static DeleteByQueryRequest createDBQRequest(Job job, float requestsPerS @Override void calcCutoffEpochMs(String jobId, long retentionDays, ActionListener listener) { ThreadedActionListener threadedActionListener = new ThreadedActionListener<>( - LOGGER, - threadPool, - MachineLearning.UTILITY_THREAD_POOL_NAME, - listener, - false + threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME), + listener ); latestBucketTime(client, getParentTaskId(), jobId, ActionListener.wrap(latestTime -> { if (latestTime == null) { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredForecastsRemover.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredForecastsRemover.java index 5adaa1cb3946a..388e971f1e063 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredForecastsRemover.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredForecastsRemover.java @@ -100,7 +100,7 @@ public void remove(float requestsPerSec, ActionListener listener, Boole client.execute( SearchAction.INSTANCE, searchRequest, - new ThreadedActionListener<>(LOGGER, threadPool, MachineLearning.UTILITY_THREAD_POOL_NAME, forecastStatsHandler, false) + new ThreadedActionListener<>(threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME), forecastStatsHandler) ); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemover.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemover.java index 1e4cedb200763..1854e3b752de3 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemover.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemover.java @@ -100,11 +100,8 @@ Long getRetentionDays(Job job) { @Override void calcCutoffEpochMs(String jobId, long retentionDays, ActionListener listener) { ThreadedActionListener threadedActionListener = new ThreadedActionListener<>( - LOGGER, - threadPool, - MachineLearning.UTILITY_THREAD_POOL_NAME, - listener, - false + threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME), + listener ); latestSnapshotTimeStamp(jobId, ActionListener.wrap(latestTime -> { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemover.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemover.java index 6faa2c4e84982..0e5ed9bbdd572 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemover.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemover.java @@ -150,11 +150,8 @@ private DeleteByQueryRequest createDBQRequest(Job job, float requestsPerSec, lon @Override void calcCutoffEpochMs(String jobId, long retentionDays, ActionListener listener) { ThreadedActionListener threadedActionListener = new ThreadedActionListener<>( - LOGGER, - threadPool, - MachineLearning.UTILITY_THREAD_POOL_NAME, - listener, - false + threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME), + listener ); latestBucketTime(client, getParentTaskId(), jobId, ActionListener.wrap(latestTime -> { if (latestTime == null) { diff --git a/x-pack/plugin/snapshot-repo-test-kit/src/main/java/org/elasticsearch/repositories/blobstore/testkit/BlobAnalyzeAction.java b/x-pack/plugin/snapshot-repo-test-kit/src/main/java/org/elasticsearch/repositories/blobstore/testkit/BlobAnalyzeAction.java index 0046f05919071..5109166a22c48 100644 --- a/x-pack/plugin/snapshot-repo-test-kit/src/main/java/org/elasticsearch/repositories/blobstore/testkit/BlobAnalyzeAction.java +++ b/x-pack/plugin/snapshot-repo-test-kit/src/main/java/org/elasticsearch/repositories/blobstore/testkit/BlobAnalyzeAction.java @@ -255,7 +255,7 @@ static class BlobAnalysis { final StepListener> readsCompleteStep = new StepListener<>(); readNodesListener = new GroupedActionListener<>( earlyReadNodes.size() + readNodes.size(), - new ThreadedActionListener<>(logger, transportService.getThreadPool(), ThreadPool.Names.SNAPSHOT, readsCompleteStep, false) + new ThreadedActionListener<>(transportService.getThreadPool().executor(ThreadPool.Names.SNAPSHOT), readsCompleteStep) ); // The order is important in this chain: if writing fails then we may never even start all the reads, and we want to cancel From 629320511b947a3fef93bf08efc8364b8ecf3a8f Mon Sep 17 00:00:00 2001 From: David Turner Date: Tue, 24 Jan 2023 12:25:31 +0000 Subject: [PATCH 2/2] Clean up GENERIC and SAME usages --- .../org/elasticsearch/index/shard/StoreRecovery.java | 2 +- .../indices/recovery/RecoverySourceHandler.java | 9 +++------ .../indices/recovery/plan/ShardSnapshotsService.java | 2 +- .../repositories/blobstore/BlobStoreRepository.java | 2 +- .../java/org/elasticsearch/transport/TcpTransport.java | 2 +- .../support/master/TransportMasterNodeActionTests.java | 7 ------- .../node/action/TransportHealthNodeActionTests.java | 7 ------- .../deprecation/TransportDeprecationInfoAction.java | 2 +- 8 files changed, 8 insertions(+), 25 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java b/server/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java index 669b9f564f62e..c3c875fc4c32a 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java +++ b/server/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java @@ -543,7 +543,7 @@ private void restore( // BwC path, running against an old version master that did not add the IndexId to the recovery source repository.getRepositoryData( new ThreadedActionListener<>( - indexShard.getThreadPool().executor(ThreadPool.Names.GENERIC), + indexShard.getThreadPool().generic(), indexIdListener.map(repositoryData -> repositoryData.resolveIndexId(indexId.getName())) ) ); diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java index 0ebbf9e78fd70..35286fe23d974 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java @@ -286,10 +286,7 @@ && isTargetSameHistory() // new one later on in the recovery. shard.removePeerRecoveryRetentionLease( request.targetNode().getId(), - new ThreadedActionListener<>( - shard.getThreadPool().executor(ThreadPool.Names.GENERIC), - deleteRetentionLeaseStep - ) + new ThreadedActionListener<>(shard.getThreadPool().generic(), deleteRetentionLeaseStep) ); } catch (RetentionLeaseNotFoundException e) { logger.debug("no peer-recovery retention lease for " + request.targetAllocationId()); @@ -978,7 +975,7 @@ void createRetentionLease(final long startingSeqNo, ActionListener cloneRetentionLeaseStep = new StepListener<>(); final RetentionLease clonedLease = shard.cloneLocalPeerRecoveryRetentionLease( request.targetNode().getId(), - new ThreadedActionListener<>(shard.getThreadPool().executor(ThreadPool.Names.GENERIC), cloneRetentionLeaseStep) + new ThreadedActionListener<>(shard.getThreadPool().generic(), cloneRetentionLeaseStep) ); logger.trace("cloned primary's retention lease as [{}]", clonedLease); cloneRetentionLeaseStep.addListener(listener.map(rr -> clonedLease)); @@ -993,7 +990,7 @@ void createRetentionLease(final long startingSeqNo, ActionListener(shard.getThreadPool().executor(ThreadPool.Names.GENERIC), addRetentionLeaseStep) + new ThreadedActionListener<>(shard.getThreadPool().generic(), addRetentionLeaseStep) ); addRetentionLeaseStep.addListener(listener.map(rr -> newLease)); logger.trace("created retention lease with estimated checkpoint of [{}]", estimatedGlobalCheckpoint); diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/plan/ShardSnapshotsService.java b/server/src/main/java/org/elasticsearch/indices/recovery/plan/ShardSnapshotsService.java index c6bb5fb1bb278..3aef06c72a63e 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/plan/ShardSnapshotsService.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/plan/ShardSnapshotsService.java @@ -104,7 +104,7 @@ public void fetchLatestSnapshotsForShard(ShardId shardId, ActionListener(threadPool.executor(ThreadPool.Names.GENERIC), listener.map(this::fetchSnapshotFiles)) + new ThreadedActionListener<>(threadPool.generic(), listener.map(this::fetchSnapshotFiles)) ); } diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index 0c5a3417f34cb..730244de5c47f 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -1982,7 +1982,7 @@ private void doGetRepositoryData(ActionListener listener) { clusterService, metadata.name(), loaded, - new ThreadedActionListener<>(threadPool.executor(ThreadPool.Names.GENERIC), listener.map(v -> loaded)) + new ThreadedActionListener<>(threadPool.generic(), listener.map(v -> loaded)) ); } } diff --git a/server/src/main/java/org/elasticsearch/transport/TcpTransport.java b/server/src/main/java/org/elasticsearch/transport/TcpTransport.java index 21340824e79ce..13e027cc94bf7 100644 --- a/server/src/main/java/org/elasticsearch/transport/TcpTransport.java +++ b/server/src/main/java/org/elasticsearch/transport/TcpTransport.java @@ -423,7 +423,7 @@ private void initiateConnection(DiscoveryNode node, ConnectionProfile connection node, connectionProfile, channels, - new ThreadedActionListener<>(threadPool.executor(ThreadPool.Names.GENERIC), listener) + new ThreadedActionListener<>(threadPool.generic(), listener) ); for (TcpChannel channel : channels) { diff --git a/server/src/test/java/org/elasticsearch/action/support/master/TransportMasterNodeActionTests.java b/server/src/test/java/org/elasticsearch/action/support/master/TransportMasterNodeActionTests.java index 81d3cc9b02c8a..04e3220e7a204 100644 --- a/server/src/test/java/org/elasticsearch/action/support/master/TransportMasterNodeActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/master/TransportMasterNodeActionTests.java @@ -19,7 +19,6 @@ import org.elasticsearch.action.support.ActionTestUtils; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.action.support.PlainActionFuture; -import org.elasticsearch.action.support.ThreadedActionListener; import org.elasticsearch.action.support.replication.ClusterStateCreationUtils; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; @@ -243,12 +242,6 @@ class Action extends TransportMasterNodeAction { ); } - @Override - protected void doExecute(Task task, final Request request, ActionListener listener) { - // remove unneeded threading by wrapping listener with SAME to prevent super.doExecute from wrapping it with LISTENER - super.doExecute(task, request, new ThreadedActionListener<>(threadPool.executor(ThreadPool.Names.SAME), listener)); - } - @Override protected void masterOperation(Task task, Request request, ClusterState state, ActionListener listener) throws Exception { listener.onResponse(new Response()); // default implementation, overridden in specific tests diff --git a/server/src/test/java/org/elasticsearch/health/node/action/TransportHealthNodeActionTests.java b/server/src/test/java/org/elasticsearch/health/node/action/TransportHealthNodeActionTests.java index 260c595f44bc2..8b9801724ed9d 100644 --- a/server/src/test/java/org/elasticsearch/health/node/action/TransportHealthNodeActionTests.java +++ b/server/src/test/java/org/elasticsearch/health/node/action/TransportHealthNodeActionTests.java @@ -15,7 +15,6 @@ import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.ActionTestUtils; import org.elasticsearch.action.support.PlainActionFuture; -import org.elasticsearch.action.support.ThreadedActionListener; import org.elasticsearch.action.support.replication.ClusterStateCreationUtils; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.node.DiscoveryNode; @@ -187,12 +186,6 @@ class Action extends TransportHealthNodeAction { ); } - @Override - protected void doExecute(Task task, final Request request, ActionListener listener) { - // remove unneeded threading by wrapping listener with SAME to prevent super.doExecute from wrapping it with LISTENER - super.doExecute(task, request, new ThreadedActionListener<>(threadPool.executor(ThreadPool.Names.SAME), listener)); - } - @Override protected void healthOperation(Task task, Request request, ClusterState state, ActionListener listener) { listener.onResponse(new Response()); diff --git a/x-pack/plugin/deprecation/src/main/java/org/elasticsearch/xpack/deprecation/TransportDeprecationInfoAction.java b/x-pack/plugin/deprecation/src/main/java/org/elasticsearch/xpack/deprecation/TransportDeprecationInfoAction.java index 3f42c3b082c1f..0ffcd0c01d18d 100644 --- a/x-pack/plugin/deprecation/src/main/java/org/elasticsearch/xpack/deprecation/TransportDeprecationInfoAction.java +++ b/x-pack/plugin/deprecation/src/main/java/org/elasticsearch/xpack/deprecation/TransportDeprecationInfoAction.java @@ -126,7 +126,7 @@ protected final void masterOperation( PLUGIN_CHECKERS, components, new ThreadedActionListener<>( - client.threadPool().executor(ThreadPool.Names.GENERIC), + client.threadPool().generic(), listener.map( deprecationIssues -> DeprecationInfoAction.Response.from( state,