diff --git a/server/src/internalClusterTest/java/org/elasticsearch/search/fieldcaps/FieldCapabilitiesIT.java b/server/src/internalClusterTest/java/org/elasticsearch/search/fieldcaps/FieldCapabilitiesIT.java index 2dc5c4e7d92e6..6450b964f1f5b 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/search/fieldcaps/FieldCapabilitiesIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/search/fieldcaps/FieldCapabilitiesIT.java @@ -718,7 +718,7 @@ public void testCancel() throws Exception { "clear resources", TransportFieldCapabilitiesAction.class.getCanonicalName(), Level.TRACE, - "clear index responses on cancellation" + "clear index responses on cancellation submitted" ) ); BlockingOnRewriteQueryBuilder.blockOnRewrite(); diff --git a/server/src/main/java/org/elasticsearch/action/fieldcaps/TransportFieldCapabilitiesAction.java b/server/src/main/java/org/elasticsearch/action/fieldcaps/TransportFieldCapabilitiesAction.java index 5da32824b859a..1868cd649f0ee 100644 --- a/server/src/main/java/org/elasticsearch/action/fieldcaps/TransportFieldCapabilitiesAction.java +++ b/server/src/main/java/org/elasticsearch/action/fieldcaps/TransportFieldCapabilitiesAction.java @@ -32,8 +32,11 @@ import org.elasticsearch.common.collect.Iterators; import org.elasticsearch.common.regex.Regex; import org.elasticsearch.common.util.Maps; +import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.EsExecutors; +import org.elasticsearch.common.util.concurrent.ThrottledTaskRunner; import org.elasticsearch.core.Nullable; +import org.elasticsearch.core.Releasable; import org.elasticsearch.core.Tuple; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.indices.IndicesService; @@ -145,6 +148,7 @@ private void doExecuteForked( if (ccsCheckCompatibility) { checkCCSVersionCompatibility(request); } + final Executor singleThreadedExecutor = buildSingleThreadedExecutor(); assert task instanceof CancellableTask; final CancellableTask fieldCapTask = (CancellableTask) task; // retrieve the initial timestamp in case the action is a cross cluster search @@ -168,9 +172,9 @@ private void doExecuteForked( checkIndexBlocks(projectState, concreteIndices); final FailureCollector indexFailures = new FailureCollector(); - final Map indexResponses = Collections.synchronizedMap(new HashMap<>()); + final Map indexResponses = new HashMap<>(); // This map is used to share the index response for indices which have the same index mapping hash to reduce the memory usage. - final Map indexMappingHashToResponses = Collections.synchronizedMap(new HashMap<>()); + final Map indexMappingHashToResponses = new HashMap<>(); final Runnable releaseResourcesOnCancel = () -> { LOGGER.trace("clear index responses on cancellation"); indexFailures.clear(); @@ -229,7 +233,8 @@ private void doExecuteForked( final var finishedOrCancelled = new AtomicBoolean(); fieldCapTask.addListener(() -> { if (finishedOrCancelled.compareAndSet(false, true)) { - releaseResourcesOnCancel.run(); + singleThreadedExecutor.execute(releaseResourcesOnCancel); + LOGGER.trace("clear index responses on cancellation submitted"); } }); try (RefCountingRunnable refs = new RefCountingRunnable(() -> { @@ -250,7 +255,7 @@ private void doExecuteForked( localIndices, nowInMillis, concreteIndices, - searchCoordinationExecutor, + singleThreadedExecutor, handleIndexResponse, handleIndexFailure, refs.acquire()::close @@ -265,7 +270,7 @@ private void doExecuteForked( var remoteClusterClient = transportService.getRemoteClusterService() .getRemoteClusterClient( clusterAlias, - searchCoordinationExecutor, + singleThreadedExecutor, RemoteClusterService.DisconnectedStrategy.RECONNECT_UNLESS_SKIP_UNAVAILABLE ); FieldCapabilitiesRequest remoteRequest = prepareRemoteRequest(request, originalIndices, nowInMillis); @@ -300,7 +305,7 @@ private void doExecuteForked( // This fork is a workaround to ensure that the merging of field-caps always occurs on the search_coordinator. // TODO: remove this workaround after we fixed https://github.com/elastic/elasticsearch/issues/107439 new ForkingOnFailureActionListener<>( - searchCoordinationExecutor, + singleThreadedExecutor, true, ActionListener.releaseAfter(remoteListener, refs.acquire()) ) @@ -309,6 +314,29 @@ private void doExecuteForked( } } + private Executor buildSingleThreadedExecutor() { + final ThrottledTaskRunner throttledTaskRunner = new ThrottledTaskRunner("field_caps", 1, searchCoordinationExecutor); + return r -> throttledTaskRunner.enqueueTask(new ActionListener<>() { + @Override + public void onResponse(Releasable releasable) { + try (releasable) { + r.run(); + } + } + + @Override + public void onFailure(Exception e) { + if (r instanceof AbstractRunnable abstractRunnable) { + abstractRunnable.onFailure(e); + } else { + // should be impossible, we should always submit an AbstractRunnable + logger.error("unexpected failure running " + r, e); + assert false : new AssertionError("unexpected failure running " + r, e); + } + } + }); + } + public interface RemoteRequestExecutor { void executeRemoteRequest( RemoteClusterClient remoteClient, @@ -546,7 +574,7 @@ private static void innerMerge( * list, these failures will be skipped because they have no affect on the final response. */ private static final class FailureCollector { - private final Map failuresByIndex = Collections.synchronizedMap(new HashMap<>()); + private final Map failuresByIndex = new HashMap<>(); List build(Set successfulIndices) { Map, FieldCapabilitiesFailure> indexFailures = new HashMap<>();