From ce03ca76d656d367bd00921e34c25f343f8e265b Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Sat, 25 Jan 2025 20:34:43 +0100 Subject: [PATCH 1/3] Force all FieldCaps response handling onto a single thread per request The current implementation uses a task per response model. This is unnecessarily costly when all results are merged into a single synchronzied map. The concurrency across requests allows for concurrent deserialization of responses but becomes incredibly costly when responses collide because of the hot lock acquisitions. Also, deserializing more than a single response at a time comes with higher than necessary heap overheads (especially for the remote cluster use-case) because we hold multiple responses on-heap and deserialized but merging is sequential in the end anyways. --- .../fieldcaps/FieldCapabilitiesResponse.java | 3 ++ .../TransportFieldCapabilitiesAction.java | 53 ++++++++++++++----- 2 files changed, 43 insertions(+), 13 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/fieldcaps/FieldCapabilitiesResponse.java b/server/src/main/java/org/elasticsearch/action/fieldcaps/FieldCapabilitiesResponse.java index e0f54aeef72ea..2452560c3ded7 100644 --- a/server/src/main/java/org/elasticsearch/action/fieldcaps/FieldCapabilitiesResponse.java +++ b/server/src/main/java/org/elasticsearch/action/fieldcaps/FieldCapabilitiesResponse.java @@ -30,11 +30,14 @@ * Response for {@link FieldCapabilitiesRequest} requests. */ public class FieldCapabilitiesResponse extends ActionResponse implements ChunkedToXContentObject { + public static final ParseField INDICES_FIELD = new ParseField("indices"); public static final ParseField FIELDS_FIELD = new ParseField("fields"); private static final ParseField FAILED_INDICES_FIELD = new ParseField("failed_indices"); public static final ParseField FAILURES_FIELD = new ParseField("failures"); + public static final FieldCapabilitiesResponse EMPTY = new FieldCapabilitiesResponse(Strings.EMPTY_ARRAY, Collections.emptyMap()); + private final String[] indices; private final Map> responseMap; private final List failures; diff --git a/server/src/main/java/org/elasticsearch/action/fieldcaps/TransportFieldCapabilitiesAction.java b/server/src/main/java/org/elasticsearch/action/fieldcaps/TransportFieldCapabilitiesAction.java index 50dee7f85102e..acbec3bd7b82e 100644 --- a/server/src/main/java/org/elasticsearch/action/fieldcaps/TransportFieldCapabilitiesAction.java +++ b/server/src/main/java/org/elasticsearch/action/fieldcaps/TransportFieldCapabilitiesAction.java @@ -31,8 +31,11 @@ import org.elasticsearch.common.collect.Iterators; import org.elasticsearch.common.regex.Regex; import org.elasticsearch.common.util.Maps; +import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.EsExecutors; +import org.elasticsearch.common.util.concurrent.ThrottledTaskRunner; import org.elasticsearch.core.Nullable; +import org.elasticsearch.core.Releasable; import org.elasticsearch.core.Tuple; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.indices.IndicesService; @@ -52,13 +55,14 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; -import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.NavigableMap; import java.util.Set; +import java.util.TreeMap; import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.BiConsumer; @@ -141,6 +145,7 @@ private void doExecuteForked( if (ccsCheckCompatibility) { checkCCSVersionCompatibility(request); } + final Executor singleThreadedExecutor = buildSingleThreadedExecutor(); assert task instanceof CancellableTask; final CancellableTask fieldCapTask = (CancellableTask) task; // retrieve the initial timestamp in case the action is a cross cluster search @@ -158,15 +163,15 @@ private void doExecuteForked( } if (concreteIndices.length == 0 && remoteClusterIndices.isEmpty()) { - listener.onResponse(new FieldCapabilitiesResponse(new String[0], Collections.emptyMap())); + listener.onResponse(FieldCapabilitiesResponse.EMPTY); return; } checkIndexBlocks(clusterState, concreteIndices); final FailureCollector indexFailures = new FailureCollector(); - final Map indexResponses = Collections.synchronizedMap(new HashMap<>()); + final NavigableMap indexResponses = new TreeMap<>(); // This map is used to share the index response for indices which have the same index mapping hash to reduce the memory usage. - final Map indexMappingHashToResponses = Collections.synchronizedMap(new HashMap<>()); + final Map indexMappingHashToResponses = new HashMap<>(); final Runnable releaseResourcesOnCancel = () -> { LOGGER.trace("clear index responses on cancellation"); indexFailures.clear(); @@ -225,7 +230,7 @@ private void doExecuteForked( final var finishedOrCancelled = new AtomicBoolean(); fieldCapTask.addListener(() -> { if (finishedOrCancelled.compareAndSet(false, true)) { - releaseResourcesOnCancel.run(); + singleThreadedExecutor.execute(releaseResourcesOnCancel); } }); try (RefCountingRunnable refs = new RefCountingRunnable(() -> { @@ -245,7 +250,7 @@ private void doExecuteForked( localIndices, nowInMillis, concreteIndices, - searchCoordinationExecutor, + singleThreadedExecutor, handleIndexResponse, handleIndexFailure, refs.acquire()::close @@ -260,7 +265,7 @@ private void doExecuteForked( var remoteClusterClient = transportService.getRemoteClusterService() .getRemoteClusterClient( clusterAlias, - searchCoordinationExecutor, + singleThreadedExecutor, RemoteClusterService.DisconnectedStrategy.RECONNECT_UNLESS_SKIP_UNAVAILABLE ); FieldCapabilitiesRequest remoteRequest = prepareRemoteRequest(request, originalIndices, nowInMillis); @@ -295,7 +300,7 @@ private void doExecuteForked( // This fork is a workaround to ensure that the merging of field-caps always occurs on the search_coordinator. // TODO: remove this workaround after we fixed https://github.com/elastic/elasticsearch/issues/107439 new ForkingOnFailureActionListener<>( - searchCoordinationExecutor, + singleThreadedExecutor, true, ActionListener.releaseAfter(remoteListener, refs.acquire()) ) @@ -304,6 +309,29 @@ private void doExecuteForked( } } + private Executor buildSingleThreadedExecutor() { + final ThrottledTaskRunner throttledTaskRunner = new ThrottledTaskRunner("field_caps", 1, searchCoordinationExecutor); + return r -> throttledTaskRunner.enqueueTask(new ActionListener<>() { + @Override + public void onResponse(Releasable releasable) { + try (releasable) { + r.run(); + } + } + + @Override + public void onFailure(Exception e) { + if (r instanceof AbstractRunnable abstractRunnable) { + abstractRunnable.onFailure(e); + } else { + // should be impossible, we should always submit an AbstractRunnable + logger.error("unexpected failure running " + r, e); + assert false : new AssertionError("unexpected failure running " + r, e); + } + } + }); + } + public interface RemoteRequestExecutor { void executeRemoteRequest( RemoteClusterClient remoteClient, @@ -327,12 +355,12 @@ private static void checkIndexBlocks(ClusterState clusterState, String[] concret private static void mergeIndexResponses( FieldCapabilitiesRequest request, CancellableTask task, - Map indexResponses, + NavigableMap indexResponses, FailureCollector indexFailures, ActionListener listener ) { List failures = indexFailures.build(indexResponses.keySet()); - if (indexResponses.size() > 0) { + if (indexResponses.isEmpty() == false) { if (request.isMergeResults()) { ActionListener.completeWith(listener, () -> merge(indexResponses, task, request, failures)); } else { @@ -375,7 +403,7 @@ private static boolean hasSameMappingHash(FieldCapabilitiesIndexResponse r1, Fie } private static FieldCapabilitiesResponse merge( - Map indexResponsesMap, + NavigableMap indexResponsesMap, CancellableTask task, FieldCapabilitiesRequest request, List failures @@ -383,7 +411,6 @@ private static FieldCapabilitiesResponse merge( assert ThreadPool.assertCurrentThreadPool(ThreadPool.Names.SEARCH_COORDINATION); // too expensive to run this on a transport worker task.ensureNotCancelled(); final FieldCapabilitiesIndexResponse[] indexResponses = indexResponsesMap.values().toArray(new FieldCapabilitiesIndexResponse[0]); - Arrays.sort(indexResponses, Comparator.comparing(FieldCapabilitiesIndexResponse::getIndexName)); final String[] indices = Arrays.stream(indexResponses).map(FieldCapabilitiesIndexResponse::getIndexName).toArray(String[]::new); final Map> responseMapBuilder = new HashMap<>(); int lastPendingIndex = 0; @@ -541,7 +568,7 @@ private static void innerMerge( * list, these failures will be skipped because they have no affect on the final response. */ private static final class FailureCollector { - private final Map failuresByIndex = Collections.synchronizedMap(new HashMap<>()); + private final Map failuresByIndex = new HashMap<>(); List build(Set successfulIndices) { Map, FieldCapabilitiesFailure> indexFailures = new HashMap<>(); From e3d5b024617e0cc387196b302e4611fccc888343 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Sat, 25 Jan 2025 20:44:32 +0100 Subject: [PATCH 2/3] cleanup --- .../fieldcaps/FieldCapabilitiesResponse.java | 3 --- .../TransportFieldCapabilitiesAction.java | 14 +++++++------- 2 files changed, 7 insertions(+), 10 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/fieldcaps/FieldCapabilitiesResponse.java b/server/src/main/java/org/elasticsearch/action/fieldcaps/FieldCapabilitiesResponse.java index 2452560c3ded7..e0f54aeef72ea 100644 --- a/server/src/main/java/org/elasticsearch/action/fieldcaps/FieldCapabilitiesResponse.java +++ b/server/src/main/java/org/elasticsearch/action/fieldcaps/FieldCapabilitiesResponse.java @@ -30,14 +30,11 @@ * Response for {@link FieldCapabilitiesRequest} requests. */ public class FieldCapabilitiesResponse extends ActionResponse implements ChunkedToXContentObject { - public static final ParseField INDICES_FIELD = new ParseField("indices"); public static final ParseField FIELDS_FIELD = new ParseField("fields"); private static final ParseField FAILED_INDICES_FIELD = new ParseField("failed_indices"); public static final ParseField FAILURES_FIELD = new ParseField("failures"); - public static final FieldCapabilitiesResponse EMPTY = new FieldCapabilitiesResponse(Strings.EMPTY_ARRAY, Collections.emptyMap()); - private final String[] indices; private final Map> responseMap; private final List failures; diff --git a/server/src/main/java/org/elasticsearch/action/fieldcaps/TransportFieldCapabilitiesAction.java b/server/src/main/java/org/elasticsearch/action/fieldcaps/TransportFieldCapabilitiesAction.java index acbec3bd7b82e..1ab4b07fe4213 100644 --- a/server/src/main/java/org/elasticsearch/action/fieldcaps/TransportFieldCapabilitiesAction.java +++ b/server/src/main/java/org/elasticsearch/action/fieldcaps/TransportFieldCapabilitiesAction.java @@ -55,14 +55,13 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.NavigableMap; import java.util.Set; -import java.util.TreeMap; import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.BiConsumer; @@ -163,13 +162,13 @@ private void doExecuteForked( } if (concreteIndices.length == 0 && remoteClusterIndices.isEmpty()) { - listener.onResponse(FieldCapabilitiesResponse.EMPTY); + listener.onResponse(new FieldCapabilitiesResponse(new String[0], Collections.emptyMap())); return; } checkIndexBlocks(clusterState, concreteIndices); final FailureCollector indexFailures = new FailureCollector(); - final NavigableMap indexResponses = new TreeMap<>(); + final Map indexResponses = new HashMap<>(); // This map is used to share the index response for indices which have the same index mapping hash to reduce the memory usage. final Map indexMappingHashToResponses = new HashMap<>(); final Runnable releaseResourcesOnCancel = () -> { @@ -355,12 +354,12 @@ private static void checkIndexBlocks(ClusterState clusterState, String[] concret private static void mergeIndexResponses( FieldCapabilitiesRequest request, CancellableTask task, - NavigableMap indexResponses, + Map indexResponses, FailureCollector indexFailures, ActionListener listener ) { List failures = indexFailures.build(indexResponses.keySet()); - if (indexResponses.isEmpty() == false) { + if (indexResponses.size() > 0) { if (request.isMergeResults()) { ActionListener.completeWith(listener, () -> merge(indexResponses, task, request, failures)); } else { @@ -403,7 +402,7 @@ private static boolean hasSameMappingHash(FieldCapabilitiesIndexResponse r1, Fie } private static FieldCapabilitiesResponse merge( - NavigableMap indexResponsesMap, + Map indexResponsesMap, CancellableTask task, FieldCapabilitiesRequest request, List failures @@ -411,6 +410,7 @@ private static FieldCapabilitiesResponse merge( assert ThreadPool.assertCurrentThreadPool(ThreadPool.Names.SEARCH_COORDINATION); // too expensive to run this on a transport worker task.ensureNotCancelled(); final FieldCapabilitiesIndexResponse[] indexResponses = indexResponsesMap.values().toArray(new FieldCapabilitiesIndexResponse[0]); + Arrays.sort(indexResponses, Comparator.comparing(FieldCapabilitiesIndexResponse::getIndexName)); final String[] indices = Arrays.stream(indexResponses).map(FieldCapabilitiesIndexResponse::getIndexName).toArray(String[]::new); final Map> responseMapBuilder = new HashMap<>(); int lastPendingIndex = 0; From 151c3a9da4b789e115511a3b8965cb2a98dc7cb6 Mon Sep 17 00:00:00 2001 From: Armin Date: Thu, 24 Apr 2025 00:07:28 +0200 Subject: [PATCH 3/3] fix test --- .../org/elasticsearch/search/fieldcaps/FieldCapabilitiesIT.java | 2 +- .../action/fieldcaps/TransportFieldCapabilitiesAction.java | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/search/fieldcaps/FieldCapabilitiesIT.java b/server/src/internalClusterTest/java/org/elasticsearch/search/fieldcaps/FieldCapabilitiesIT.java index 2dc5c4e7d92e6..6450b964f1f5b 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/search/fieldcaps/FieldCapabilitiesIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/search/fieldcaps/FieldCapabilitiesIT.java @@ -718,7 +718,7 @@ public void testCancel() throws Exception { "clear resources", TransportFieldCapabilitiesAction.class.getCanonicalName(), Level.TRACE, - "clear index responses on cancellation" + "clear index responses on cancellation submitted" ) ); BlockingOnRewriteQueryBuilder.blockOnRewrite(); diff --git a/server/src/main/java/org/elasticsearch/action/fieldcaps/TransportFieldCapabilitiesAction.java b/server/src/main/java/org/elasticsearch/action/fieldcaps/TransportFieldCapabilitiesAction.java index 3cba0dec3ebe2..1868cd649f0ee 100644 --- a/server/src/main/java/org/elasticsearch/action/fieldcaps/TransportFieldCapabilitiesAction.java +++ b/server/src/main/java/org/elasticsearch/action/fieldcaps/TransportFieldCapabilitiesAction.java @@ -234,6 +234,7 @@ private void doExecuteForked( fieldCapTask.addListener(() -> { if (finishedOrCancelled.compareAndSet(false, true)) { singleThreadedExecutor.execute(releaseResourcesOnCancel); + LOGGER.trace("clear index responses on cancellation submitted"); } }); try (RefCountingRunnable refs = new RefCountingRunnable(() -> {