From 080c1a13497eb1a56b202db534cebf95b9f0733d Mon Sep 17 00:00:00 2001 From: Nik Everett Date: Tue, 9 Sep 2025 13:02:29 -0400 Subject: [PATCH 1/9] ESQL: Limit when we push topn to lucene Right now we push all topn operations to lucene if possible. But Lucene was not written to handle a topn of 100,000. It's very fast, but it allocates more memory than we'd like. This limits the size of the topns that we push to lucene to the 10,000, which is the default window size limit. We'll run a regular lucene scan with our own in-engine topn instead. That's designed to scan huge numbers of documents. It doesn't have the nice min_competitive optimization. But it tracks memory very well. --- .../xpack/esql/qa/single_node/RestEsqlIT.java | 76 ++++++++++++++++++- .../planner/EsPhysicalOperationProviders.java | 2 +- .../xpack/esql/planner/PhysicalSettings.java | 33 +++++++- .../xpack/esql/plugin/EsqlPlugin.java | 1 + .../optimizer/PhysicalPlanOptimizerTests.java | 2 +- .../EsPhysicalOperationProvidersTests.java | 2 +- .../planner/LocalExecutionPlannerTests.java | 2 +- 7 files changed, 111 insertions(+), 7 deletions(-) diff --git a/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/RestEsqlIT.java b/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/RestEsqlIT.java index 4390d730fcae3..eb60ab3bb4910 100644 --- a/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/RestEsqlIT.java +++ b/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/RestEsqlIT.java @@ -29,6 +29,7 @@ import org.elasticsearch.xcontent.json.JsonXContent; import org.elasticsearch.xpack.esql.core.plugin.EsqlCorePlugin; import org.elasticsearch.xpack.esql.core.type.DataType; +import org.elasticsearch.xpack.esql.planner.PhysicalSettings; import org.elasticsearch.xpack.esql.qa.rest.RestEsqlTestCase; import org.elasticsearch.xpack.esql.tools.ProfileParser; import org.hamcrest.Matchers; @@ -538,6 +539,74 @@ public void testInlineStatsProfile() throws IOException { ); } + public void testSmallTopNProfile() throws IOException { + testTopNProfile(false); + } + + public void testGiantTopNProfile() throws IOException { + testTopNProfile(true); + } + + private void testTopNProfile(boolean giant) throws IOException { + indexTimestampData(1); + + int size = between(1, PhysicalSettings.LUCENE_TOPN_LIMIT.get(Settings.EMPTY).intValue() - 1); + if (giant) { + size += PhysicalSettings.LUCENE_TOPN_LIMIT.get(Settings.EMPTY).intValue(); + } + RequestObjectBuilder builder = requestObjectBuilder().query(fromIndex() + " | KEEP value | SORT value ASC | LIMIT " + size); + + builder.pragmas(Settings.builder().put("data_partitioning", "shard").build()); + builder.profile(true); + builder.pragmasOk(); + + Map result = runEsql(builder); + ListMatcher values = matchesList(); + for (int i = 0; i < 1000; i++) { + values = values.item(List.of(i)); + } + assertResultMap( + result, + getResultMatcher(result).entry("profile", getProfileMatcher()), + matchesList().item(matchesMap().entry("name", "value").entry("type", "long")), + values + ); + + @SuppressWarnings("unchecked") + List> profiles = (List>) ((Map) result.get("profile")).get("drivers"); + for (Map p : profiles) { + fixTypesOnProfile(p); + assertThat(p, commonProfile()); + List sig = new ArrayList<>(); + @SuppressWarnings("unchecked") + List> operators = (List>) p.get("operators"); + for (Map o : operators) { + sig.add(checkOperatorProfile(o)); + } + String description = p.get("description").toString(); + switch (description) { + case "data" -> assertMap( + sig, + giant + ? matchesList().item("LuceneSourceOperator") + .item("ValuesSourceReaderOperator") + .item("ProjectOperator") + .item("ExchangeSinkOperator") + : matchesList().item("LuceneTopNSourceOperator") + .item("ValuesSourceReaderOperator") + .item("ProjectOperator") + .item("ExchangeSinkOperator") + ); + case "node_reduce" -> assertThat(sig, matchesList().item("ExchangeSourceOperator").item("ExchangeSinkOperator")); + case "final" -> assertMap( + sig, + matchesList().item("ExchangeSourceOperator").item("TopNOperator").item("ProjectOperator").item("OutputOperator") + ); + default -> throw new IllegalArgumentException("can't match " + description); + } + } + } + public void testForceSleepsProfile() throws IOException { assumeTrue("requires pragmas", Build.current().isSnapshot()); @@ -940,7 +1009,9 @@ private String checkOperatorProfile(Map o) { .entry("rows_received", greaterThan(0)) .entry("rows_emitted", greaterThan(0)) .entry("ram_used", instanceOf(String.class)) - .entry("ram_bytes_used", greaterThan(0)); + .entry("ram_bytes_used", greaterThan(0)) + .entry("receive_nanos", greaterThan(0)) + .entry("emit_nanos", greaterThan(0)); case "LuceneTopNSourceOperator" -> matchesMap().entry("pages_emitted", greaterThan(0)) .entry("rows_emitted", greaterThan(0)) .entry("current", greaterThan(0)) @@ -951,7 +1022,8 @@ private String checkOperatorProfile(Map o) { .entry("slice_min", 0) .entry("process_nanos", greaterThan(0)) .entry("processed_queries", List.of("ConstantScore(*:*)")) - .entry("slice_index", 0); + .entry("slice_index", 0) + .entry("partitioning_strategies", matchesMap().entry("rest-esql-test:0", "SHARD")); default -> throw new AssertionError("unexpected status: " + o); }; MapMatcher expectedOp = matchesMap().entry("operator", startsWith(name)); diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java index dc0f4ea7dcdd3..51ca0d6d6bd96 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java @@ -281,7 +281,7 @@ public final PhysicalOperation sourcePhysicalOperation(EsQueryExec esQueryExec, int rowEstimatedSize = esQueryExec.estimatedRowSize(); int limit = esQueryExec.limit() != null ? (Integer) esQueryExec.limit().fold(context.foldCtx()) : NO_LIMIT; boolean scoring = esQueryExec.hasScoring(); - if ((sorts != null && sorts.isEmpty() == false)) { + if (sorts != null && sorts.isEmpty() == false && limit < physicalSettings.luceneTopNLimit()) { List> sortBuilders = new ArrayList<>(sorts.size()); long estimatedPerRowSortSize = 0; for (Sort sort : sorts) { diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/PhysicalSettings.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/PhysicalSettings.java index 4276eeaf39f9b..369fc73d4fdd3 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/PhysicalSettings.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/PhysicalSettings.java @@ -9,9 +9,11 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Setting; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.MemorySizeValue; import org.elasticsearch.compute.lucene.DataPartitioning; +import org.elasticsearch.index.IndexSettings; import org.elasticsearch.monitor.jvm.JvmInfo; /** @@ -35,8 +37,17 @@ public class PhysicalSettings { Setting.Property.Dynamic ); + public static final Setting LUCENE_TOPN_LIMIT = Setting.intSetting( + "esql.lucene_topn_limit", + IndexSettings.MAX_RESULT_WINDOW_SETTING.getDefault(Settings.EMPTY), + -1, + Setting.Property.NodeScope, + Setting.Property.Dynamic + ); + private volatile DataPartitioning defaultDataPartitioning; private volatile ByteSizeValue valuesLoadingJumboSize; + private volatile int luceneTopNLimit; /** * Ctor for prod that listens for updates from the {@link ClusterService}. @@ -44,14 +55,16 @@ public class PhysicalSettings { public PhysicalSettings(ClusterService clusterService) { clusterService.getClusterSettings().initializeAndWatch(DEFAULT_DATA_PARTITIONING, v -> this.defaultDataPartitioning = v); clusterService.getClusterSettings().initializeAndWatch(VALUES_LOADING_JUMBO_SIZE, v -> this.valuesLoadingJumboSize = v); + clusterService.getClusterSettings().initializeAndWatch(LUCENE_TOPN_LIMIT, v -> this.luceneTopNLimit = v); } /** * Ctor for testing. */ - public PhysicalSettings(DataPartitioning defaultDataPartitioning, ByteSizeValue valuesLoadingJumboSize) { + public PhysicalSettings(DataPartitioning defaultDataPartitioning, ByteSizeValue valuesLoadingJumboSize, int luceneTopNLimit) { this.defaultDataPartitioning = defaultDataPartitioning; this.valuesLoadingJumboSize = valuesLoadingJumboSize; + this.luceneTopNLimit = luceneTopNLimit; } public DataPartitioning defaultDataPartitioning() { @@ -61,4 +74,22 @@ public DataPartitioning defaultDataPartitioning() { public ByteSizeValue valuesLoadingJumboSize() { return valuesLoadingJumboSize; } + + /** + * Maximum {@code LIMIT} that we're willing to push to Lucene's topn. + *

+ * Lucene's topn code was designed for search + * which typically fetches 10 or 30 or 50 or 100 or 1000 documents. + * That's as many you want on a page, and that's what it's designed for. + * But if you go to, say, page 10, Lucene implements this as a search + * for {@code page_size * page_number} docs and then materializes only + * the last {@code page_size} documents. Traditionally, Elasticsearch + * limits that {@code page_size * page_number} which it calls the + * {@link IndexSettings#MAX_RESULT_WINDOW_SETTING "result window"}. + * So! ESQL defaults to the same default - {@code 10,000}. + *

+ */ + public int luceneTopNLimit() { + return luceneTopNLimit; + } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlPlugin.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlPlugin.java index be019af8e03a9..c9e364ec85037 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlPlugin.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlPlugin.java @@ -258,6 +258,7 @@ public List> getSettings() { ESQL_QUERYLOG_INCLUDE_USER_SETTING, PhysicalSettings.DEFAULT_DATA_PARTITIONING, PhysicalSettings.VALUES_LOADING_JUMBO_SIZE, + PhysicalSettings.LUCENE_TOPN_LIMIT, STORED_FIELDS_SEQUENTIAL_PROPORTION, EsqlFlags.ESQL_STRING_LIKE_ON_INDEX, EsqlFlags.ESQL_ROUNDTO_PUSHDOWN_THRESHOLD diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/PhysicalPlanOptimizerTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/PhysicalPlanOptimizerTests.java index 9e255d9322e73..c151d1f2f082c 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/PhysicalPlanOptimizerTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/PhysicalPlanOptimizerTests.java @@ -7903,7 +7903,7 @@ private LocalExecutionPlanner.LocalExecutionPlan physicalOperationsFromPhysicalP FoldContext.small(), List.of(), null, - new PhysicalSettings(DataPartitioning.AUTO, ByteSizeValue.ofMb(1)) + new PhysicalSettings(DataPartitioning.AUTO, ByteSizeValue.ofMb(1), 10_000) ), List.of() ); diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProvidersTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProvidersTests.java index 6430967584553..23bc2f2b2990b 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProvidersTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProvidersTests.java @@ -82,7 +82,7 @@ record TestCase(QueryBuilder query, List nullsFilteredFields) { FoldContext.small(), List.of(new EsPhysicalOperationProviders.DefaultShardContext(0, () -> {}, createMockContext(), AliasFilter.EMPTY)), null, - new PhysicalSettings(DataPartitioning.AUTO, ByteSizeValue.ofMb(1)) + new PhysicalSettings(DataPartitioning.AUTO, ByteSizeValue.ofMb(1), 10_000) ); for (TestCase testCase : testCases) { EsQueryExec queryExec = new EsQueryExec( diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlannerTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlannerTests.java index 83446d0f5fa80..e9e318d25fb4c 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlannerTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlannerTests.java @@ -328,7 +328,7 @@ private EsPhysicalOperationProviders esPhysicalOperationProviders(List Date: Wed, 10 Sep 2025 15:23:30 -0400 Subject: [PATCH 2/9] Update docs/changelog/134497.yaml --- docs/changelog/134497.yaml | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 docs/changelog/134497.yaml diff --git a/docs/changelog/134497.yaml b/docs/changelog/134497.yaml new file mode 100644 index 0000000000000..3fc5e53e68e97 --- /dev/null +++ b/docs/changelog/134497.yaml @@ -0,0 +1,5 @@ +pr: 134497 +summary: Limit when we push topn to lucene +area: ES|QL +type: bug +issues: [] From 387d49d1bbc6be1b6d942ee3ed7a160d55832344 Mon Sep 17 00:00:00 2001 From: Nik Everett Date: Wed, 10 Sep 2025 20:09:02 -0400 Subject: [PATCH 3/9] Oh no --- .../org/elasticsearch/xpack/esql/heap_attack/HeapAttackIT.java | 1 + .../org/elasticsearch/xpack/esql/qa/single_node/RestEsqlIT.java | 1 + 2 files changed, 2 insertions(+) diff --git a/test/external-modules/esql-heap-attack/src/javaRestTest/java/org/elasticsearch/xpack/esql/heap_attack/HeapAttackIT.java b/test/external-modules/esql-heap-attack/src/javaRestTest/java/org/elasticsearch/xpack/esql/heap_attack/HeapAttackIT.java index d106f7bf4d740..130427af0bfc1 100644 --- a/test/external-modules/esql-heap-attack/src/javaRestTest/java/org/elasticsearch/xpack/esql/heap_attack/HeapAttackIT.java +++ b/test/external-modules/esql-heap-attack/src/javaRestTest/java/org/elasticsearch/xpack/esql/heap_attack/HeapAttackIT.java @@ -92,6 +92,7 @@ public void skipOnAborted() { */ public void testSortByManyLongsSuccess() throws IOException { initManyLongs(10); + // | SORT a, b, i0, i1, ...i500 | KEEP a, b | LIMIT 10000 Map response = sortByManyLongs(500); ListMatcher columns = matchesList().item(matchesMap().entry("name", "a").entry("type", "long")) .item(matchesMap().entry("name", "b").entry("type", "long")); diff --git a/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/RestEsqlIT.java b/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/RestEsqlIT.java index db387eb515464..a04cdd210d28c 100644 --- a/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/RestEsqlIT.java +++ b/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/RestEsqlIT.java @@ -588,6 +588,7 @@ private void testTopNProfile(boolean giant) throws IOException { case "data" -> assertMap( sig, giant + // NOCOMMIT this should include a TopN ? matchesList().item("LuceneSourceOperator") .item("ValuesSourceReaderOperator") .item("ProjectOperator") From 18e2f0a02dfdc0eedb7fb96afdf2468d5066714b Mon Sep 17 00:00:00 2001 From: Nik Everett Date: Wed, 10 Sep 2025 21:12:55 -0400 Subject: [PATCH 4/9] Move turn off --- .../xpack/esql/qa/single_node/RestEsqlIT.java | 121 ++++++++++-------- .../LocalPhysicalOptimizerContext.java | 9 +- .../physical/local/PushTopNToSource.java | 28 +++- .../planner/EsPhysicalOperationProviders.java | 2 +- .../xpack/esql/planner/PlannerUtils.java | 6 +- .../xpack/esql/plugin/ComputeService.java | 1 + .../elasticsearch/xpack/esql/CsvTests.java | 6 +- .../LocalPhysicalPlanOptimizerTests.java | 8 +- .../optimizer/PhysicalPlanOptimizerTests.java | 8 +- .../esql/optimizer/TestPlannerOptimizer.java | 6 +- .../physical/local/PushTopNToSourceTests.java | 11 +- 11 files changed, 140 insertions(+), 66 deletions(-) diff --git a/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/RestEsqlIT.java b/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/RestEsqlIT.java index a04cdd210d28c..05b33013b30b7 100644 --- a/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/RestEsqlIT.java +++ b/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/RestEsqlIT.java @@ -29,7 +29,6 @@ import org.elasticsearch.xcontent.json.JsonXContent; import org.elasticsearch.xpack.esql.core.plugin.EsqlCorePlugin; import org.elasticsearch.xpack.esql.core.type.DataType; -import org.elasticsearch.xpack.esql.planner.PhysicalSettings; import org.elasticsearch.xpack.esql.qa.rest.RestEsqlTestCase; import org.elasticsearch.xpack.esql.tools.ProfileParser; import org.hamcrest.Matchers; @@ -60,6 +59,7 @@ import static org.elasticsearch.test.MapMatcher.assertMap; import static org.elasticsearch.test.MapMatcher.matchesMap; import static org.elasticsearch.xpack.esql.core.type.DataType.isMillisOrNanos; +import static org.elasticsearch.xpack.esql.planner.PhysicalSettings.LUCENE_TOPN_LIMIT; import static org.elasticsearch.xpack.esql.qa.rest.RestEsqlTestCase.Mode.SYNC; import static org.elasticsearch.xpack.esql.tools.ProfileParser.parseProfile; import static org.elasticsearch.xpack.esql.tools.ProfileParser.readProfileFromResponse; @@ -189,6 +189,18 @@ private void setLoggingLevel(String level) throws IOException { client().performRequest(request); } + private void setTruncationWindowMax(Integer size) throws IOException { + Request request = new Request("PUT", "/_cluster/settings"); + request.setJsonEntity(""" + { + "persistent": { + "esql.query.result_truncation_max_size": $SIZE$ + } + } + """.replace("$SIZE$", size == null ? "null" : Integer.toString(size))); + client().performRequest(request); + } + public void testIncompatibleMappingsErrors() throws IOException { // create first index Request request = new Request("PUT", "/index1"); @@ -548,63 +560,68 @@ public void testGiantTopNProfile() throws IOException { } private void testTopNProfile(boolean giant) throws IOException { - indexTimestampData(1); + try { + setTruncationWindowMax(1000000); + indexTimestampData(1); - int size = between(1, PhysicalSettings.LUCENE_TOPN_LIMIT.get(Settings.EMPTY).intValue() - 1); - if (giant) { - size += PhysicalSettings.LUCENE_TOPN_LIMIT.get(Settings.EMPTY).intValue(); - } - RequestObjectBuilder builder = requestObjectBuilder().query(fromIndex() + " | KEEP value | SORT value ASC | LIMIT " + size); + int size = between(1, LUCENE_TOPN_LIMIT.get(Settings.EMPTY).intValue() - 1); + if (giant) { + size += LUCENE_TOPN_LIMIT.get(Settings.EMPTY).intValue(); + } + RequestObjectBuilder builder = requestObjectBuilder().query(fromIndex() + " | KEEP value | SORT value ASC | LIMIT " + size); - builder.pragmas(Settings.builder().put("data_partitioning", "shard").build()); - builder.profile(true); - builder.pragmasOk(); + builder.pragmas(Settings.builder().put("data_partitioning", "shard").build()); + builder.profile(true); + builder.pragmasOk(); - Map result = runEsql(builder); - ListMatcher values = matchesList(); - for (int i = 0; i < 1000; i++) { - values = values.item(List.of(i)); - } - assertResultMap( - result, - getResultMatcher(result).entry("profile", getProfileMatcher()), - matchesList().item(matchesMap().entry("name", "value").entry("type", "long")), - values - ); + Map result = runEsql(builder); + ListMatcher values = matchesList(); + for (int i = 0; i < 1000; i++) { + values = values.item(List.of(i)); + } + assertResultMap( + result, + getResultMatcher(result).entry("profile", getProfileMatcher()), + matchesList().item(matchesMap().entry("name", "value").entry("type", "long")), + values + ); - @SuppressWarnings("unchecked") - List> profiles = (List>) ((Map) result.get("profile")).get("drivers"); - for (Map p : profiles) { - fixTypesOnProfile(p); - assertThat(p, commonProfile()); - List sig = new ArrayList<>(); @SuppressWarnings("unchecked") - List> operators = (List>) p.get("operators"); - for (Map o : operators) { - sig.add(checkOperatorProfile(o)); - } - String description = p.get("description").toString(); - switch (description) { - case "data" -> assertMap( - sig, - giant - // NOCOMMIT this should include a TopN - ? matchesList().item("LuceneSourceOperator") - .item("ValuesSourceReaderOperator") - .item("ProjectOperator") - .item("ExchangeSinkOperator") - : matchesList().item("LuceneTopNSourceOperator") - .item("ValuesSourceReaderOperator") - .item("ProjectOperator") - .item("ExchangeSinkOperator") - ); - case "node_reduce" -> assertThat(sig, matchesList().item("ExchangeSourceOperator").item("ExchangeSinkOperator")); - case "final" -> assertMap( - sig, - matchesList().item("ExchangeSourceOperator").item("TopNOperator").item("ProjectOperator").item("OutputOperator") - ); - default -> throw new IllegalArgumentException("can't match " + description); + List> profiles = (List>) ((Map) result.get("profile")).get("drivers"); + for (Map p : profiles) { + fixTypesOnProfile(p); + assertThat(p, commonProfile()); + List sig = new ArrayList<>(); + @SuppressWarnings("unchecked") + List> operators = (List>) p.get("operators"); + for (Map o : operators) { + sig.add(checkOperatorProfile(o)); + } + String description = p.get("description").toString(); + switch (description) { + case "data" -> assertMap( + sig, + giant + ? matchesList().item("LuceneSourceOperator") + .item("ValuesSourceReaderOperator") + .item("TopNOperator") + .item("ProjectOperator") + .item("ExchangeSinkOperator") + : matchesList().item("LuceneTopNSourceOperator") + .item("ValuesSourceReaderOperator") + .item("ProjectOperator") + .item("ExchangeSinkOperator") + ); + case "node_reduce" -> assertThat(sig, matchesList().item("ExchangeSourceOperator").item("ExchangeSinkOperator")); + case "final" -> assertMap( + sig, + matchesList().item("ExchangeSourceOperator").item("TopNOperator").item("ProjectOperator").item("OutputOperator") + ); + default -> throw new IllegalArgumentException("can't match " + description); + } } + } finally { + setTruncationWindowMax(null); } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LocalPhysicalOptimizerContext.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LocalPhysicalOptimizerContext.java index 1be024c9af76a..5d6c2890aef1f 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LocalPhysicalOptimizerContext.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LocalPhysicalOptimizerContext.java @@ -8,8 +8,15 @@ package org.elasticsearch.xpack.esql.optimizer; import org.elasticsearch.xpack.esql.core.expression.FoldContext; +import org.elasticsearch.xpack.esql.planner.PhysicalSettings; import org.elasticsearch.xpack.esql.plugin.EsqlFlags; import org.elasticsearch.xpack.esql.session.Configuration; import org.elasticsearch.xpack.esql.stats.SearchStats; -public record LocalPhysicalOptimizerContext(EsqlFlags flags, Configuration configuration, FoldContext foldCtx, SearchStats searchStats) {} +public record LocalPhysicalOptimizerContext( + PhysicalSettings physicalSettings, + EsqlFlags flags, + Configuration configuration, + FoldContext foldCtx, + SearchStats searchStats +) {} diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/PushTopNToSource.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/PushTopNToSource.java index 8ec6d6b4bee39..de2768fa8ca7b 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/PushTopNToSource.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/PushTopNToSource.java @@ -15,6 +15,7 @@ import org.elasticsearch.xpack.esql.core.expression.Expression; import org.elasticsearch.xpack.esql.core.expression.FieldAttribute; import org.elasticsearch.xpack.esql.core.expression.FoldContext; +import org.elasticsearch.xpack.esql.core.expression.Literal; import org.elasticsearch.xpack.esql.core.expression.MetadataAttribute; import org.elasticsearch.xpack.esql.core.expression.NameId; import org.elasticsearch.xpack.esql.core.expression.ReferenceAttribute; @@ -28,6 +29,7 @@ import org.elasticsearch.xpack.esql.plan.physical.EvalExec; import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan; import org.elasticsearch.xpack.esql.plan.physical.TopNExec; +import org.elasticsearch.xpack.esql.planner.PhysicalSettings; import java.util.ArrayList; import java.util.LinkedHashMap; @@ -63,7 +65,12 @@ public class PushTopNToSource extends PhysicalOptimizerRules.ParameterizedOptimi @Override protected PhysicalPlan rule(TopNExec topNExec, LocalPhysicalOptimizerContext ctx) { - Pushable pushable = evaluatePushable(ctx.foldCtx(), topNExec, LucenePushdownPredicates.from(ctx.searchStats(), ctx.flags())); + Pushable pushable = evaluatePushable( + ctx.physicalSettings(), + ctx.foldCtx(), + topNExec, + LucenePushdownPredicates.from(ctx.searchStats(), ctx.flags()) + ); return pushable.rewrite(topNExec); } @@ -124,15 +131,24 @@ public PhysicalPlan rewrite(TopNExec topNExec) { } } - private static Pushable evaluatePushable(FoldContext ctx, TopNExec topNExec, LucenePushdownPredicates lucenePushdownPredicates) { + private static Pushable evaluatePushable( + PhysicalSettings physicalSettings, + FoldContext ctx, + TopNExec topNExec, + LucenePushdownPredicates lucenePushdownPredicates + ) { PhysicalPlan child = topNExec.child(); if (child instanceof EsQueryExec queryExec && queryExec.canPushSorts() - && canPushDownOrders(topNExec.order(), lucenePushdownPredicates)) { + && canPushDownOrders(topNExec.order(), lucenePushdownPredicates) + && canPushLimit(topNExec, physicalSettings)) { // With the simplest case of `FROM index | SORT ...` we only allow pushing down if the sort is on a field return new PushableQueryExec(queryExec); } - if (child instanceof EvalExec evalExec && evalExec.child() instanceof EsQueryExec queryExec && queryExec.canPushSorts()) { + if (child instanceof EvalExec evalExec + && evalExec.child() instanceof EsQueryExec queryExec + && queryExec.canPushSorts() + && canPushLimit(topNExec, physicalSettings)) { // When we have an EVAL between the FROM and the SORT, we consider pushing down if the sort is on a field and/or // a distance function defined in the EVAL. We also move the EVAL to after the SORT. List orders = topNExec.order(); @@ -204,6 +220,10 @@ private static boolean canPushDownOrders(List orders, LucenePushdownPredi return orders.stream().allMatch(o -> isSortableAttribute.apply(o.child(), lucenePushdownPredicates)); } + private static boolean canPushLimit(TopNExec topn, PhysicalSettings physicalSettings) { + return topn.limit() instanceof Literal l && ((Number) l.value()).intValue() <= physicalSettings.luceneTopNLimit(); + } + private static List buildFieldSorts(List orders) { List sorts = new ArrayList<>(orders.size()); for (Order o : orders) { diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java index 51ca0d6d6bd96..02e88904bcd9a 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java @@ -281,7 +281,7 @@ public final PhysicalOperation sourcePhysicalOperation(EsQueryExec esQueryExec, int rowEstimatedSize = esQueryExec.estimatedRowSize(); int limit = esQueryExec.limit() != null ? (Integer) esQueryExec.limit().fold(context.foldCtx()) : NO_LIMIT; boolean scoring = esQueryExec.hasScoring(); - if (sorts != null && sorts.isEmpty() == false && limit < physicalSettings.luceneTopNLimit()) { + if (sorts != null && sorts.isEmpty() == false) { List> sortBuilders = new ArrayList<>(sorts.size()); long estimatedPerRowSortSize = 0; for (Sort sort : sorts) { diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/PlannerUtils.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/PlannerUtils.java index ef4f605b22d36..bc47c80b81605 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/PlannerUtils.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/PlannerUtils.java @@ -179,16 +179,18 @@ private static void forEachRelation(PhysicalPlan plan, Consumer acti } public static PhysicalPlan localPlan( + PhysicalSettings physicalSettings, EsqlFlags flags, List searchContexts, Configuration configuration, FoldContext foldCtx, PhysicalPlan plan ) { - return localPlan(flags, configuration, foldCtx, plan, SearchContextStats.from(searchContexts)); + return localPlan(physicalSettings, flags, configuration, foldCtx, plan, SearchContextStats.from(searchContexts)); } public static PhysicalPlan localPlan( + PhysicalSettings physicalSettings, EsqlFlags flags, Configuration configuration, FoldContext foldCtx, @@ -197,7 +199,7 @@ public static PhysicalPlan localPlan( ) { final var logicalOptimizer = new LocalLogicalPlanOptimizer(new LocalLogicalOptimizerContext(configuration, foldCtx, searchStats)); var physicalOptimizer = new LocalPhysicalPlanOptimizer( - new LocalPhysicalOptimizerContext(flags, configuration, foldCtx, searchStats) + new LocalPhysicalOptimizerContext(physicalSettings, flags, configuration, foldCtx, searchStats) ); return localPlan(plan, logicalOptimizer, physicalOptimizer); diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java index 257b8db73f5b8..dd0ed2f97e770 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java @@ -638,6 +638,7 @@ public SourceProvider createSourceProvider(SourceFilter sourceFilter) { LOGGER.debug("Received physical plan:\n{}", plan); var localPlan = PlannerUtils.localPlan( + physicalSettings, context.flags(), context.searchExecutionContexts(), context.configuration(), diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/CsvTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/CsvTests.java index 2ff6ce71be516..c41ce8fb9517e 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/CsvTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/CsvTests.java @@ -26,6 +26,7 @@ import org.elasticsearch.common.util.iterable.Iterables; import org.elasticsearch.compute.data.BlockFactory; import org.elasticsearch.compute.data.Page; +import org.elasticsearch.compute.lucene.DataPartitioning; import org.elasticsearch.compute.operator.Driver; import org.elasticsearch.compute.operator.DriverCompletionInfo; import org.elasticsearch.compute.operator.DriverRunner; @@ -87,6 +88,7 @@ import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan; import org.elasticsearch.xpack.esql.planner.LocalExecutionPlanner; import org.elasticsearch.xpack.esql.planner.LocalExecutionPlanner.LocalExecutionPlan; +import org.elasticsearch.xpack.esql.planner.PhysicalSettings; import org.elasticsearch.xpack.esql.planner.PlannerUtils; import org.elasticsearch.xpack.esql.planner.TestPhysicalOperationProviders; import org.elasticsearch.xpack.esql.planner.mapper.Mapper; @@ -725,8 +727,10 @@ void executeSubPlan( if (dataNodePlan != null) { var searchStats = new DisabledSearchStats(); var logicalTestOptimizer = new LocalLogicalPlanOptimizer(new LocalLogicalOptimizerContext(configuration, foldCtx, searchStats)); + var physicalSettings = new PhysicalSettings(DataPartitioning.AUTO, ByteSizeValue.ofMb(1), 10_000); + var flags = new EsqlFlags(true); var physicalTestOptimizer = new TestLocalPhysicalPlanOptimizer( - new LocalPhysicalOptimizerContext(new EsqlFlags(true), configuration, foldCtx, searchStats) + new LocalPhysicalOptimizerContext(physicalSettings, flags, configuration, foldCtx, searchStats) ); var csvDataNodePhysicalPlan = PlannerUtils.localPlan(dataNodePlan, logicalTestOptimizer, physicalTestOptimizer); diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LocalPhysicalPlanOptimizerTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LocalPhysicalPlanOptimizerTests.java index eedd9aea3cbc1..d7ec0de78d7c1 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LocalPhysicalPlanOptimizerTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LocalPhysicalPlanOptimizerTests.java @@ -12,7 +12,9 @@ import org.apache.lucene.search.IndexSearcher; import org.elasticsearch.common.network.NetworkAddress; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.Fuzziness; +import org.elasticsearch.compute.lucene.DataPartitioning; import org.elasticsearch.core.Tuple; import org.elasticsearch.index.IndexMode; import org.elasticsearch.index.mapper.MapperService; @@ -96,6 +98,7 @@ import org.elasticsearch.xpack.esql.plan.physical.ProjectExec; import org.elasticsearch.xpack.esql.plan.physical.TopNExec; import org.elasticsearch.xpack.esql.planner.FilterTests; +import org.elasticsearch.xpack.esql.planner.PhysicalSettings; import org.elasticsearch.xpack.esql.plugin.EsqlFlags; import org.elasticsearch.xpack.esql.plugin.QueryPragmas; import org.elasticsearch.xpack.esql.querydsl.query.SingleValueQuery; @@ -2470,8 +2473,11 @@ public void testVerifierOnMissingReferences() throws Exception { } private LocalPhysicalPlanOptimizer getCustomRulesLocalPhysicalPlanOptimizer(List> batches) { + var physicalSettings = new PhysicalSettings(DataPartitioning.AUTO, ByteSizeValue.ofMb(1), 10_000); + var flags = new EsqlFlags(true); LocalPhysicalOptimizerContext context = new LocalPhysicalOptimizerContext( - new EsqlFlags(true), + physicalSettings, + flags, config, FoldContext.small(), SearchStats.EMPTY diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/PhysicalPlanOptimizerTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/PhysicalPlanOptimizerTests.java index c151d1f2f082c..aca654c9c6ff8 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/PhysicalPlanOptimizerTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/PhysicalPlanOptimizerTests.java @@ -7884,7 +7884,9 @@ private LocalExecutionPlanner.LocalExecutionPlan physicalOperationsFromPhysicalP // The TopN needs an estimated row size for the planner to work var plans = PlannerUtils.breakPlanBetweenCoordinatorAndDataNode(EstimatesRowSize.estimateRowSize(0, plan), config); plan = useDataNodePlan ? plans.v2() : plans.v1(); - plan = PlannerUtils.localPlan(new EsqlFlags(true), config, FoldContext.small(), plan, TEST_SEARCH_STATS); + var physicalSettings = new PhysicalSettings(DataPartitioning.AUTO, ByteSizeValue.ofMb(1), 10_000); + var flags = new EsqlFlags(true); + plan = PlannerUtils.localPlan(physicalSettings, flags, config, FoldContext.small(), plan, TEST_SEARCH_STATS); ExchangeSinkHandler exchangeSinkHandler = new ExchangeSinkHandler(null, 10, () -> 10); LocalExecutionPlanner planner = new LocalExecutionPlanner( "test", @@ -8261,7 +8263,9 @@ private PhysicalPlan optimizedPlan(PhysicalPlan plan, SearchStats searchStats) { // individually hence why here the plan is kept as is var l = p.transformUp(FragmentExec.class, fragment -> { - var localPlan = PlannerUtils.localPlan(new EsqlFlags(true), config, FoldContext.small(), fragment, searchStats); + var physicalSettings = new PhysicalSettings(DataPartitioning.AUTO, ByteSizeValue.ofMb(1), 10_000); + var flags = new EsqlFlags(true); + var localPlan = PlannerUtils.localPlan(physicalSettings, flags, config, FoldContext.small(), fragment, searchStats); return EstimatesRowSize.estimateRowSize(fragment.estimatedRowSize(), localPlan); }); diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/TestPlannerOptimizer.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/TestPlannerOptimizer.java index 4bd6fa6737041..92e598d908872 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/TestPlannerOptimizer.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/TestPlannerOptimizer.java @@ -7,12 +7,15 @@ package org.elasticsearch.xpack.esql.optimizer; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.compute.lucene.DataPartitioning; import org.elasticsearch.xpack.esql.EsqlTestUtils; import org.elasticsearch.xpack.esql.analysis.Analyzer; import org.elasticsearch.xpack.esql.core.expression.FoldContext; import org.elasticsearch.xpack.esql.parser.EsqlParser; import org.elasticsearch.xpack.esql.plan.physical.EstimatesRowSize; import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan; +import org.elasticsearch.xpack.esql.planner.PhysicalSettings; import org.elasticsearch.xpack.esql.planner.PlannerUtils; import org.elasticsearch.xpack.esql.planner.mapper.Mapper; import org.elasticsearch.xpack.esql.plugin.EsqlFlags; @@ -74,8 +77,9 @@ private PhysicalPlan optimizedPlan(PhysicalPlan plan, SearchStats searchStats, E var logicalTestOptimizer = new LocalLogicalPlanOptimizer( new LocalLogicalOptimizerContext(config, FoldContext.small(), searchStats) ); + var physicalSettings = new PhysicalSettings(DataPartitioning.AUTO, ByteSizeValue.ofMb(1), 10_000); var physicalTestOptimizer = new TestLocalPhysicalPlanOptimizer( - new LocalPhysicalOptimizerContext(esqlFlags, config, FoldContext.small(), searchStats), + new LocalPhysicalOptimizerContext(physicalSettings, esqlFlags, config, FoldContext.small(), searchStats), true ); var l = PlannerUtils.localPlan(physicalPlan, logicalTestOptimizer, physicalTestOptimizer); diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/PushTopNToSourceTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/PushTopNToSourceTests.java index 9f0744edf0b13..82a90ba283754 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/PushTopNToSourceTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/PushTopNToSourceTests.java @@ -8,6 +8,8 @@ package org.elasticsearch.xpack.esql.optimizer.rules.physical.local; import org.apache.lucene.util.BytesRef; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.compute.lucene.DataPartitioning; import org.elasticsearch.geometry.Geometry; import org.elasticsearch.geometry.utils.GeometryValidator; import org.elasticsearch.geometry.utils.WellKnownBinary; @@ -35,6 +37,7 @@ import org.elasticsearch.xpack.esql.plan.physical.EvalExec; import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan; import org.elasticsearch.xpack.esql.plan.physical.TopNExec; +import org.elasticsearch.xpack.esql.planner.PhysicalSettings; import org.elasticsearch.xpack.esql.plugin.EsqlFlags; import org.elasticsearch.xpack.esql.stats.SearchStats; @@ -417,7 +420,13 @@ private static void assertNoPushdownSort(TestPhysicalPlanBuilder builder, String private static PhysicalPlan pushTopNToSource(TopNExec topNExec) { var configuration = EsqlTestUtils.configuration("from test"); - var ctx = new LocalPhysicalOptimizerContext(new EsqlFlags(true), configuration, FoldContext.small(), SearchStats.EMPTY); + var ctx = new LocalPhysicalOptimizerContext( + new PhysicalSettings(DataPartitioning.AUTO, ByteSizeValue.ofMb(1), 10_000), + new EsqlFlags(true), + configuration, + FoldContext.small(), + SearchStats.EMPTY + ); var pushTopNToSource = new PushTopNToSource(); return pushTopNToSource.rule(topNExec, ctx); } From 6fa73c37632d8879965bcb45bb1883f757a79aa8 Mon Sep 17 00:00:00 2001 From: Nik Everett Date: Fri, 12 Sep 2025 14:47:51 -0400 Subject: [PATCH 5/9] fix --- .../org/elasticsearch/xpack/esql/qa/single_node/RestEsqlIT.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/RestEsqlIT.java b/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/RestEsqlIT.java index 05b33013b30b7..219a3a5d76a80 100644 --- a/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/RestEsqlIT.java +++ b/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/RestEsqlIT.java @@ -576,7 +576,7 @@ private void testTopNProfile(boolean giant) throws IOException { Map result = runEsql(builder); ListMatcher values = matchesList(); - for (int i = 0; i < 1000; i++) { + for (int i = 0; i < Math.min(1000, size); i++) { values = values.item(List.of(i)); } assertResultMap( From c6d05dab7276c64ed2bca96697a08e3b29fe1e2a Mon Sep 17 00:00:00 2001 From: Nik Everett Date: Wed, 17 Sep 2025 14:58:00 -0400 Subject: [PATCH 6/9] Better way --- .../esql/optimizer/rules/physical/local/PushTopNToSource.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/PushTopNToSource.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/PushTopNToSource.java index de2768fa8ca7b..798e58c64bf3d 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/PushTopNToSource.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/PushTopNToSource.java @@ -15,10 +15,10 @@ import org.elasticsearch.xpack.esql.core.expression.Expression; import org.elasticsearch.xpack.esql.core.expression.FieldAttribute; import org.elasticsearch.xpack.esql.core.expression.FoldContext; -import org.elasticsearch.xpack.esql.core.expression.Literal; import org.elasticsearch.xpack.esql.core.expression.MetadataAttribute; import org.elasticsearch.xpack.esql.core.expression.NameId; import org.elasticsearch.xpack.esql.core.expression.ReferenceAttribute; +import org.elasticsearch.xpack.esql.expression.Foldables; import org.elasticsearch.xpack.esql.expression.Order; import org.elasticsearch.xpack.esql.expression.function.scalar.spatial.BinarySpatialFunction; import org.elasticsearch.xpack.esql.expression.function.scalar.spatial.SpatialRelatesUtils; @@ -221,7 +221,7 @@ private static boolean canPushDownOrders(List orders, LucenePushdownPredi } private static boolean canPushLimit(TopNExec topn, PhysicalSettings physicalSettings) { - return topn.limit() instanceof Literal l && ((Number) l.value()).intValue() <= physicalSettings.luceneTopNLimit(); + return Foldables.limitValue(topn.limit(), topn.sourceText()) <= physicalSettings.luceneTopNLimit(); } private static List buildFieldSorts(List orders) { From 82b4f2582625d8748cbe6b3fe1087caab9e7023e Mon Sep 17 00:00:00 2001 From: Nik Everett Date: Fri, 19 Sep 2025 13:02:41 -0400 Subject: [PATCH 7/9] serverless --- .../xpack/esql/qa/single_node/RestEsqlIT.java | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/RestEsqlIT.java b/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/RestEsqlIT.java index 21bc117dbe0ad..419ccced11e5b 100644 --- a/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/RestEsqlIT.java +++ b/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/RestEsqlIT.java @@ -612,7 +612,13 @@ private void testTopNProfile(boolean giant) throws IOException { .item("ProjectOperator") .item("ExchangeSinkOperator") ); - case "node_reduce" -> assertThat(sig, matchesList().item("ExchangeSourceOperator").item("ExchangeSinkOperator")); + case "node_reduce" -> assertThat( + sig, + // If the coordinating node and data node are the same node then we get this + either(matchesList().item("ExchangeSourceOperator").item("ExchangeSinkOperator")) + // If the coordinating node and data node are *not* the same node we get this + .or(matchesList().item("ExchangeSourceOperator").item("TopNOperator").item("ExchangeSinkOperator")) + ); case "final" -> assertMap( sig, matchesList().item("ExchangeSourceOperator").item("TopNOperator").item("ProjectOperator").item("OutputOperator") From e4bc310a71837d1373bc0cd3504ccbe1d8b056a2 Mon Sep 17 00:00:00 2001 From: Nik Everett Date: Sun, 21 Sep 2025 20:57:49 -0400 Subject: [PATCH 8/9] one more --- .../compute/operator/topn/TopNOperator.java | 22 +++++++++++-------- 1 file changed, 13 insertions(+), 9 deletions(-) diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn/TopNOperator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn/TopNOperator.java index 86032780bec20..ba522bf130f82 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn/TopNOperator.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn/TopNOperator.java @@ -55,6 +55,8 @@ public class TopNOperator implements Operator, Accountable { static final class Row implements Accountable, Releasable { private static final long SHALLOW_SIZE = RamUsageEstimator.shallowSizeOfInstance(Row.class); + private final CircuitBreaker breaker; + /** * The sort key. */ @@ -81,17 +83,11 @@ static final class Row implements Accountable, Releasable { @Nullable RefCounted shardRefCounter; - void setShardRefCountersAndShard(RefCounted shardRefCounter) { - if (this.shardRefCounter != null) { - this.shardRefCounter.decRef(); - } - this.shardRefCounter = shardRefCounter; - this.shardRefCounter.mustIncRef(); - } - Row(CircuitBreaker breaker, List sortOrders, int preAllocatedKeysSize, int preAllocatedValueSize) { + this.breaker = breaker; boolean success = false; try { + breaker.addEstimateBytesAndMaybeBreak(SHALLOW_SIZE, "topn"); keys = new BreakingBytesRefBuilder(breaker, "topn", preAllocatedKeysSize); values = new BreakingBytesRefBuilder(breaker, "topn", preAllocatedValueSize); bytesOrder = new BytesOrder(sortOrders, breaker, "topn"); @@ -111,7 +107,7 @@ public long ramBytesUsed() { @Override public void close() { clearRefCounters(); - Releasables.closeExpectNoException(keys, values, bytesOrder); + Releasables.closeExpectNoException(() -> breaker.addWithoutBreaking(-SHALLOW_SIZE), keys, values, bytesOrder); } public void clearRefCounters() { @@ -120,6 +116,14 @@ public void clearRefCounters() { } shardRefCounter = null; } + + void setShardRefCountersAndShard(RefCounted shardRefCounter) { + if (this.shardRefCounter != null) { + this.shardRefCounter.decRef(); + } + this.shardRefCounter = shardRefCounter; + this.shardRefCounter.mustIncRef(); + } } static final class BytesOrder implements Releasable, Accountable { From bfc240ac3a24ca12de767b6f70fbb3ddab51a5a8 Mon Sep 17 00:00:00 2001 From: Nik Everett Date: Mon, 22 Sep 2025 09:31:22 -0400 Subject: [PATCH 9/9] Please stop --- .../compute/operator/topn/TopNOperatorTests.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/topn/TopNOperatorTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/topn/TopNOperatorTests.java index 80f92fbcf91a5..97f301e252fd2 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/topn/TopNOperatorTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/topn/TopNOperatorTests.java @@ -1489,9 +1489,9 @@ public void testRowResizes() { block.decRef(); op.addInput(new Page(blocks)); - // 94 are from the collection process + // 105 are from the objects // 1 is for the min-heap itself - assertThat(breaker.getMemoryRequestCount(), is(95L)); + assertThat(breaker.getMemoryRequestCount(), is(106L)); } }