From 1f54c76b87ec2356db7fea395e0c801900661081 Mon Sep 17 00:00:00 2001 From: Armin Date: Thu, 24 Apr 2025 13:34:34 +0200 Subject: [PATCH 1/2] Force all per-node query response handling onto a single thread Same reasoning as for field_caps in https://github.com/elastic/elasticsearch/pull/120863, no need to have multiple threads contending the same mutex(s) when the heavy lifting step in handling the results is sequential anyway. --- .../TransportFieldCapabilitiesAction.java | 27 +----------------- .../search/QueryPhaseResultConsumer.java | 9 ++---- .../SearchQueryThenFetchAsyncAction.java | 21 ++++++++++---- .../util/concurrent/ThrottledTaskRunner.java | 28 +++++++++++++++++++ 4 files changed, 47 insertions(+), 38 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/fieldcaps/TransportFieldCapabilitiesAction.java b/server/src/main/java/org/elasticsearch/action/fieldcaps/TransportFieldCapabilitiesAction.java index 1868cd649f0ee..aa303bbd673e4 100644 --- a/server/src/main/java/org/elasticsearch/action/fieldcaps/TransportFieldCapabilitiesAction.java +++ b/server/src/main/java/org/elasticsearch/action/fieldcaps/TransportFieldCapabilitiesAction.java @@ -32,11 +32,9 @@ import org.elasticsearch.common.collect.Iterators; import org.elasticsearch.common.regex.Regex; import org.elasticsearch.common.util.Maps; -import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.util.concurrent.ThrottledTaskRunner; import org.elasticsearch.core.Nullable; -import org.elasticsearch.core.Releasable; import org.elasticsearch.core.Tuple; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.indices.IndicesService; @@ -148,7 +146,7 @@ private void doExecuteForked( if (ccsCheckCompatibility) { checkCCSVersionCompatibility(request); } - final Executor singleThreadedExecutor = buildSingleThreadedExecutor(); + final Executor singleThreadedExecutor = ThrottledTaskRunner.buildSingleThreadedExecutor("field_caps", searchCoordinationExecutor); assert task instanceof CancellableTask; final CancellableTask fieldCapTask = (CancellableTask) task; // retrieve the initial timestamp in case the action is a cross cluster search @@ -314,29 +312,6 @@ private void doExecuteForked( } } - private Executor buildSingleThreadedExecutor() { - final ThrottledTaskRunner throttledTaskRunner = new ThrottledTaskRunner("field_caps", 1, searchCoordinationExecutor); - return r -> throttledTaskRunner.enqueueTask(new ActionListener<>() { - @Override - public void onResponse(Releasable releasable) { - try (releasable) { - r.run(); - } - } - - @Override - public void onFailure(Exception e) { - if (r instanceof AbstractRunnable abstractRunnable) { - abstractRunnable.onFailure(e); - } else { - // should be impossible, we should always submit an AbstractRunnable - logger.error("unexpected failure running " + r, e); - assert false : new AssertionError("unexpected failure running " + r, e); - } - } - }); - } - public interface RemoteRequestExecutor { void executeRemoteRequest( RemoteClusterClient remoteClient, diff --git a/server/src/main/java/org/elasticsearch/action/search/QueryPhaseResultConsumer.java b/server/src/main/java/org/elasticsearch/action/search/QueryPhaseResultConsumer.java index ec63d38616153..fc430e2e2ed40 100644 --- a/server/src/main/java/org/elasticsearch/action/search/QueryPhaseResultConsumer.java +++ b/server/src/main/java/org/elasticsearch/action/search/QueryPhaseResultConsumer.java @@ -188,9 +188,7 @@ MergeResult consumePartialMergeResultDataNode() { } void addBatchedPartialResult(TopDocsStats topDocsStats, MergeResult mergeResult) { - synchronized (batchedResults) { - batchedResults.add(new Tuple<>(topDocsStats, mergeResult)); - } + batchedResults.add(new Tuple<>(topDocsStats, mergeResult)); } @Override @@ -215,10 +213,7 @@ public SearchPhaseController.ReducedQueryPhase reduce() throws Exception { buffer.sort(RESULT_COMPARATOR); final TopDocsStats topDocsStats = this.topDocsStats; var mergeResult = this.mergeResult; - final ArrayDeque> batchedResults; - synchronized (this.batchedResults) { - batchedResults = this.batchedResults; - } + final ArrayDeque> batchedResults = this.batchedResults; final int resultSize = buffer.size() + (mergeResult == null ? 0 : 1) + batchedResults.size(); final List topDocsList = hasTopDocs ? new ArrayList<>(resultSize) : null; final Deque> aggsList = hasAggs ? new ArrayDeque<>(resultSize) : null; diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java b/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java index e552d9c9606c8..45dcfb92bafc3 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java @@ -30,6 +30,7 @@ import org.elasticsearch.common.util.concurrent.CountDown; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.util.concurrent.ListenableFuture; +import org.elasticsearch.common.util.concurrent.ThrottledTaskRunner; import org.elasticsearch.core.RefCounted; import org.elasticsearch.core.SimpleRefCounted; import org.elasticsearch.core.TimeValue; @@ -408,7 +409,8 @@ protected void doRun(Map shardIndexMap) { } AbstractSearchAsyncAction.doCheckNoMissingShards(getName(), request, shardsIts); final Map perNodeQueries = new HashMap<>(); - final String localNodeId = searchTransportService.transportService().getLocalNode().getId(); + final var transportService = searchTransportService.transportService(); + final String localNodeId = transportService.getLocalNode().getId(); final int numberOfShardsTotal = shardsIts.size(); for (int i = 0; i < numberOfShardsTotal; i++) { final SearchShardIterator shardRoutings = shardsIts.get(i); @@ -445,6 +447,10 @@ protected void doRun(Map shardIndexMap) { } } } + final Executor singleThreadedExecutor = ThrottledTaskRunner.buildSingleThreadedExecutor( + "node_query_response", + transportService.getThreadPool().executor(ThreadPool.Names.SEARCH_COORDINATION) + ); perNodeQueries.forEach((routing, request) -> { if (request.shards.size() == 1) { executeAsSingleRequest(routing, request.shards.getFirst()); @@ -463,8 +469,12 @@ protected void doRun(Map shardIndexMap) { executeWithoutBatching(routing, request); return; } - searchTransportService.transportService() - .sendChildRequest(connection, NODE_SEARCH_ACTION_NAME, request, task, new TransportResponseHandler() { + transportService.sendChildRequest( + connection, + NODE_SEARCH_ACTION_NAME, + request, + task, + new TransportResponseHandler() { @Override public NodeQueryResponse read(StreamInput in) throws IOException { return new NodeQueryResponse(in); @@ -472,7 +482,7 @@ public NodeQueryResponse read(StreamInput in) throws IOException { @Override public Executor executor() { - return EsExecutors.DIRECT_EXECUTOR_SERVICE; + return singleThreadedExecutor; } @Override @@ -517,7 +527,8 @@ public void handleException(TransportException e) { onPhaseFailure(getName(), "", cause); } } - }); + } + ); }); } diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/ThrottledTaskRunner.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/ThrottledTaskRunner.java index 61c6c588f552d..470ca778416cb 100644 --- a/server/src/main/java/org/elasticsearch/common/util/concurrent/ThrottledTaskRunner.java +++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/ThrottledTaskRunner.java @@ -11,12 +11,40 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.core.Releasable; +import org.elasticsearch.logging.LogManager; +import org.elasticsearch.logging.Logger; import java.util.concurrent.Executor; public class ThrottledTaskRunner extends AbstractThrottledTaskRunner> { + + private static final Logger logger = LogManager.getLogger(ThrottledTaskRunner.class); + // a simple AbstractThrottledTaskRunner which fixes the task type and uses a regular FIFO blocking queue. public ThrottledTaskRunner(String name, int maxRunningTasks, Executor executor) { super(name, maxRunningTasks, executor, ConcurrentCollections.newBlockingQueue()); } + + public static Executor buildSingleThreadedExecutor(String name, Executor executor) { + final ThrottledTaskRunner throttledTaskRunner = new ThrottledTaskRunner(name, 1, executor); + return r -> throttledTaskRunner.enqueueTask(new ActionListener<>() { + @Override + public void onResponse(Releasable releasable) { + try (releasable) { + r.run(); + } + } + + @Override + public void onFailure(Exception e) { + if (r instanceof AbstractRunnable abstractRunnable) { + abstractRunnable.onFailure(e); + } else { + // should be impossible, we should always submit an AbstractRunnable + logger.error("unexpected failure running [" + r + "] on [" + name + "]", e); + assert false : new AssertionError("unexpected failure running " + r, e); + } + } + }); + } } From 403a55f2ed73732b7493e2e718feb285886b0aca Mon Sep 17 00:00:00 2001 From: Armin Date: Thu, 24 Apr 2025 13:46:10 +0200 Subject: [PATCH 2/2] docs --- .../common/util/concurrent/ThrottledTaskRunner.java | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/ThrottledTaskRunner.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/ThrottledTaskRunner.java index 470ca778416cb..00e4084a82fcd 100644 --- a/server/src/main/java/org/elasticsearch/common/util/concurrent/ThrottledTaskRunner.java +++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/ThrottledTaskRunner.java @@ -25,6 +25,12 @@ public ThrottledTaskRunner(String name, int maxRunningTasks, Executor executor) super(name, maxRunningTasks, executor, ConcurrentCollections.newBlockingQueue()); } + /** + * Builds an executor that executes one task at a time from a throttled task runner. + * @param name name for the executor + * @param executor executor to use for executing the tasks + * @return executor that forwards and runs a single task at a time on the provided {@code executor} + */ public static Executor buildSingleThreadedExecutor(String name, Executor executor) { final ThrottledTaskRunner throttledTaskRunner = new ThrottledTaskRunner(name, 1, executor); return r -> throttledTaskRunner.enqueueTask(new ActionListener<>() {