diff --git a/docs/changelog/134497.yaml b/docs/changelog/134497.yaml new file mode 100644 index 0000000000000..3fc5e53e68e97 --- /dev/null +++ b/docs/changelog/134497.yaml @@ -0,0 +1,5 @@ +pr: 134497 +summary: Limit when we push topn to lucene +area: ES|QL +type: bug +issues: [] diff --git a/test/external-modules/esql-heap-attack/src/javaRestTest/java/org/elasticsearch/xpack/esql/heap_attack/HeapAttackIT.java b/test/external-modules/esql-heap-attack/src/javaRestTest/java/org/elasticsearch/xpack/esql/heap_attack/HeapAttackIT.java index a71ff6162cd84..d5a12db64d291 100644 --- a/test/external-modules/esql-heap-attack/src/javaRestTest/java/org/elasticsearch/xpack/esql/heap_attack/HeapAttackIT.java +++ b/test/external-modules/esql-heap-attack/src/javaRestTest/java/org/elasticsearch/xpack/esql/heap_attack/HeapAttackIT.java @@ -92,6 +92,7 @@ public void skipOnAborted() { */ public void testSortByManyLongsSuccess() throws IOException { initManyLongs(10); + // | SORT a, b, i0, i1, ...i500 | KEEP a, b | LIMIT 10000 Map response = sortByManyLongs(500); ListMatcher columns = matchesList().item(matchesMap().entry("name", "a").entry("type", "long")) .item(matchesMap().entry("name", "b").entry("type", "long")); diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn/TopNOperator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn/TopNOperator.java index 86032780bec20..ba522bf130f82 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn/TopNOperator.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn/TopNOperator.java @@ -55,6 +55,8 @@ public class TopNOperator implements Operator, Accountable { static final class Row implements Accountable, Releasable { private static final long SHALLOW_SIZE = RamUsageEstimator.shallowSizeOfInstance(Row.class); + private final CircuitBreaker breaker; + /** * The sort key. */ @@ -81,17 +83,11 @@ static final class Row implements Accountable, Releasable { @Nullable RefCounted shardRefCounter; - void setShardRefCountersAndShard(RefCounted shardRefCounter) { - if (this.shardRefCounter != null) { - this.shardRefCounter.decRef(); - } - this.shardRefCounter = shardRefCounter; - this.shardRefCounter.mustIncRef(); - } - Row(CircuitBreaker breaker, List sortOrders, int preAllocatedKeysSize, int preAllocatedValueSize) { + this.breaker = breaker; boolean success = false; try { + breaker.addEstimateBytesAndMaybeBreak(SHALLOW_SIZE, "topn"); keys = new BreakingBytesRefBuilder(breaker, "topn", preAllocatedKeysSize); values = new BreakingBytesRefBuilder(breaker, "topn", preAllocatedValueSize); bytesOrder = new BytesOrder(sortOrders, breaker, "topn"); @@ -111,7 +107,7 @@ public long ramBytesUsed() { @Override public void close() { clearRefCounters(); - Releasables.closeExpectNoException(keys, values, bytesOrder); + Releasables.closeExpectNoException(() -> breaker.addWithoutBreaking(-SHALLOW_SIZE), keys, values, bytesOrder); } public void clearRefCounters() { @@ -120,6 +116,14 @@ public void clearRefCounters() { } shardRefCounter = null; } + + void setShardRefCountersAndShard(RefCounted shardRefCounter) { + if (this.shardRefCounter != null) { + this.shardRefCounter.decRef(); + } + this.shardRefCounter = shardRefCounter; + this.shardRefCounter.mustIncRef(); + } } static final class BytesOrder implements Releasable, Accountable { diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/topn/TopNOperatorTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/topn/TopNOperatorTests.java index 80f92fbcf91a5..97f301e252fd2 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/topn/TopNOperatorTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/topn/TopNOperatorTests.java @@ -1489,9 +1489,9 @@ public void testRowResizes() { block.decRef(); op.addInput(new Page(blocks)); - // 94 are from the collection process + // 105 are from the objects // 1 is for the min-heap itself - assertThat(breaker.getMemoryRequestCount(), is(95L)); + assertThat(breaker.getMemoryRequestCount(), is(106L)); } } diff --git a/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/RestEsqlIT.java b/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/RestEsqlIT.java index 09ecd7a260109..419ccced11e5b 100644 --- a/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/RestEsqlIT.java +++ b/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/RestEsqlIT.java @@ -59,6 +59,7 @@ import static org.elasticsearch.test.MapMatcher.assertMap; import static org.elasticsearch.test.MapMatcher.matchesMap; import static org.elasticsearch.xpack.esql.core.type.DataType.isMillisOrNanos; +import static org.elasticsearch.xpack.esql.planner.PhysicalSettings.LUCENE_TOPN_LIMIT; import static org.elasticsearch.xpack.esql.qa.rest.RestEsqlTestCase.Mode.SYNC; import static org.elasticsearch.xpack.esql.tools.ProfileParser.parseProfile; import static org.elasticsearch.xpack.esql.tools.ProfileParser.readProfileFromResponse; @@ -188,6 +189,18 @@ private void setLoggingLevel(String level) throws IOException { client().performRequest(request); } + private void setTruncationWindowMax(Integer size) throws IOException { + Request request = new Request("PUT", "/_cluster/settings"); + request.setJsonEntity(""" + { + "persistent": { + "esql.query.result_truncation_max_size": $SIZE$ + } + } + """.replace("$SIZE$", size == null ? "null" : Integer.toString(size))); + client().performRequest(request); + } + public void testIncompatibleMappingsErrors() throws IOException { // create first index Request request = new Request("PUT", "/index1"); @@ -538,6 +551,86 @@ public void testInlineStatsProfile() throws IOException { ); } + public void testSmallTopNProfile() throws IOException { + testTopNProfile(false); + } + + public void testGiantTopNProfile() throws IOException { + testTopNProfile(true); + } + + private void testTopNProfile(boolean giant) throws IOException { + try { + setTruncationWindowMax(1000000); + indexTimestampData(1); + + int size = between(1, LUCENE_TOPN_LIMIT.get(Settings.EMPTY).intValue() - 1); + if (giant) { + size += LUCENE_TOPN_LIMIT.get(Settings.EMPTY).intValue(); + } + RequestObjectBuilder builder = requestObjectBuilder().query(fromIndex() + " | KEEP value | SORT value ASC | LIMIT " + size); + + builder.pragmas(Settings.builder().put("data_partitioning", "shard").build()); + builder.profile(true); + builder.pragmasOk(); + + Map result = runEsql(builder); + ListMatcher values = matchesList(); + for (int i = 0; i < Math.min(1000, size); i++) { + values = values.item(List.of(i)); + } + assertResultMap( + result, + getResultMatcher(result).entry("profile", getProfileMatcher()), + matchesList().item(matchesMap().entry("name", "value").entry("type", "long")), + values + ); + + @SuppressWarnings("unchecked") + List> profiles = (List>) ((Map) result.get("profile")).get("drivers"); + for (Map p : profiles) { + fixTypesOnProfile(p); + assertThat(p, commonProfile()); + List sig = new ArrayList<>(); + @SuppressWarnings("unchecked") + List> operators = (List>) p.get("operators"); + for (Map o : operators) { + sig.add(checkOperatorProfile(o)); + } + String description = p.get("description").toString(); + switch (description) { + case "data" -> assertMap( + sig, + giant + ? matchesList().item("LuceneSourceOperator") + .item("ValuesSourceReaderOperator") + .item("TopNOperator") + .item("ProjectOperator") + .item("ExchangeSinkOperator") + : matchesList().item("LuceneTopNSourceOperator") + .item("ValuesSourceReaderOperator") + .item("ProjectOperator") + .item("ExchangeSinkOperator") + ); + case "node_reduce" -> assertThat( + sig, + // If the coordinating node and data node are the same node then we get this + either(matchesList().item("ExchangeSourceOperator").item("ExchangeSinkOperator")) + // If the coordinating node and data node are *not* the same node we get this + .or(matchesList().item("ExchangeSourceOperator").item("TopNOperator").item("ExchangeSinkOperator")) + ); + case "final" -> assertMap( + sig, + matchesList().item("ExchangeSourceOperator").item("TopNOperator").item("ProjectOperator").item("OutputOperator") + ); + default -> throw new IllegalArgumentException("can't match " + description); + } + } + } finally { + setTruncationWindowMax(null); + } + } + public void testForceSleepsProfile() throws IOException { assumeTrue("requires pragmas", Build.current().isSnapshot()); @@ -940,7 +1033,9 @@ private String checkOperatorProfile(Map o) { .entry("rows_received", greaterThan(0)) .entry("rows_emitted", greaterThan(0)) .entry("ram_used", instanceOf(String.class)) - .entry("ram_bytes_used", greaterThan(0)); + .entry("ram_bytes_used", greaterThan(0)) + .entry("receive_nanos", greaterThan(0)) + .entry("emit_nanos", greaterThan(0)); case "LuceneTopNSourceOperator" -> matchesMap().entry("pages_emitted", greaterThan(0)) .entry("rows_emitted", greaterThan(0)) .entry("current", greaterThan(0)) @@ -951,7 +1046,8 @@ private String checkOperatorProfile(Map o) { .entry("slice_min", 0) .entry("process_nanos", greaterThan(0)) .entry("processed_queries", List.of("*:*")) - .entry("slice_index", 0); + .entry("slice_index", 0) + .entry("partitioning_strategies", matchesMap().entry("rest-esql-test:0", "SHARD")); default -> throw new AssertionError("unexpected status: " + o); }; MapMatcher expectedOp = matchesMap().entry("operator", startsWith(name)); diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LocalPhysicalOptimizerContext.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LocalPhysicalOptimizerContext.java index 1be024c9af76a..5d6c2890aef1f 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LocalPhysicalOptimizerContext.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LocalPhysicalOptimizerContext.java @@ -8,8 +8,15 @@ package org.elasticsearch.xpack.esql.optimizer; import org.elasticsearch.xpack.esql.core.expression.FoldContext; +import org.elasticsearch.xpack.esql.planner.PhysicalSettings; import org.elasticsearch.xpack.esql.plugin.EsqlFlags; import org.elasticsearch.xpack.esql.session.Configuration; import org.elasticsearch.xpack.esql.stats.SearchStats; -public record LocalPhysicalOptimizerContext(EsqlFlags flags, Configuration configuration, FoldContext foldCtx, SearchStats searchStats) {} +public record LocalPhysicalOptimizerContext( + PhysicalSettings physicalSettings, + EsqlFlags flags, + Configuration configuration, + FoldContext foldCtx, + SearchStats searchStats +) {} diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/PushTopNToSource.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/PushTopNToSource.java index 8ec6d6b4bee39..798e58c64bf3d 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/PushTopNToSource.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/PushTopNToSource.java @@ -18,6 +18,7 @@ import org.elasticsearch.xpack.esql.core.expression.MetadataAttribute; import org.elasticsearch.xpack.esql.core.expression.NameId; import org.elasticsearch.xpack.esql.core.expression.ReferenceAttribute; +import org.elasticsearch.xpack.esql.expression.Foldables; import org.elasticsearch.xpack.esql.expression.Order; import org.elasticsearch.xpack.esql.expression.function.scalar.spatial.BinarySpatialFunction; import org.elasticsearch.xpack.esql.expression.function.scalar.spatial.SpatialRelatesUtils; @@ -28,6 +29,7 @@ import org.elasticsearch.xpack.esql.plan.physical.EvalExec; import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan; import org.elasticsearch.xpack.esql.plan.physical.TopNExec; +import org.elasticsearch.xpack.esql.planner.PhysicalSettings; import java.util.ArrayList; import java.util.LinkedHashMap; @@ -63,7 +65,12 @@ public class PushTopNToSource extends PhysicalOptimizerRules.ParameterizedOptimi @Override protected PhysicalPlan rule(TopNExec topNExec, LocalPhysicalOptimizerContext ctx) { - Pushable pushable = evaluatePushable(ctx.foldCtx(), topNExec, LucenePushdownPredicates.from(ctx.searchStats(), ctx.flags())); + Pushable pushable = evaluatePushable( + ctx.physicalSettings(), + ctx.foldCtx(), + topNExec, + LucenePushdownPredicates.from(ctx.searchStats(), ctx.flags()) + ); return pushable.rewrite(topNExec); } @@ -124,15 +131,24 @@ public PhysicalPlan rewrite(TopNExec topNExec) { } } - private static Pushable evaluatePushable(FoldContext ctx, TopNExec topNExec, LucenePushdownPredicates lucenePushdownPredicates) { + private static Pushable evaluatePushable( + PhysicalSettings physicalSettings, + FoldContext ctx, + TopNExec topNExec, + LucenePushdownPredicates lucenePushdownPredicates + ) { PhysicalPlan child = topNExec.child(); if (child instanceof EsQueryExec queryExec && queryExec.canPushSorts() - && canPushDownOrders(topNExec.order(), lucenePushdownPredicates)) { + && canPushDownOrders(topNExec.order(), lucenePushdownPredicates) + && canPushLimit(topNExec, physicalSettings)) { // With the simplest case of `FROM index | SORT ...` we only allow pushing down if the sort is on a field return new PushableQueryExec(queryExec); } - if (child instanceof EvalExec evalExec && evalExec.child() instanceof EsQueryExec queryExec && queryExec.canPushSorts()) { + if (child instanceof EvalExec evalExec + && evalExec.child() instanceof EsQueryExec queryExec + && queryExec.canPushSorts() + && canPushLimit(topNExec, physicalSettings)) { // When we have an EVAL between the FROM and the SORT, we consider pushing down if the sort is on a field and/or // a distance function defined in the EVAL. We also move the EVAL to after the SORT. List orders = topNExec.order(); @@ -204,6 +220,10 @@ private static boolean canPushDownOrders(List orders, LucenePushdownPredi return orders.stream().allMatch(o -> isSortableAttribute.apply(o.child(), lucenePushdownPredicates)); } + private static boolean canPushLimit(TopNExec topn, PhysicalSettings physicalSettings) { + return Foldables.limitValue(topn.limit(), topn.sourceText()) <= physicalSettings.luceneTopNLimit(); + } + private static List buildFieldSorts(List orders) { List sorts = new ArrayList<>(orders.size()); for (Order o : orders) { diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java index 0b022186984b5..7b9966bc7926b 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java @@ -281,7 +281,7 @@ public final PhysicalOperation sourcePhysicalOperation(EsQueryExec esQueryExec, int rowEstimatedSize = esQueryExec.estimatedRowSize(); int limit = esQueryExec.limit() != null ? (Integer) esQueryExec.limit().fold(context.foldCtx()) : NO_LIMIT; boolean scoring = esQueryExec.hasScoring(); - if ((sorts != null && sorts.isEmpty() == false)) { + if (sorts != null && sorts.isEmpty() == false) { List> sortBuilders = new ArrayList<>(sorts.size()); long estimatedPerRowSortSize = 0; for (Sort sort : sorts) { diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/PhysicalSettings.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/PhysicalSettings.java index 4276eeaf39f9b..369fc73d4fdd3 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/PhysicalSettings.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/PhysicalSettings.java @@ -9,9 +9,11 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Setting; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.MemorySizeValue; import org.elasticsearch.compute.lucene.DataPartitioning; +import org.elasticsearch.index.IndexSettings; import org.elasticsearch.monitor.jvm.JvmInfo; /** @@ -35,8 +37,17 @@ public class PhysicalSettings { Setting.Property.Dynamic ); + public static final Setting LUCENE_TOPN_LIMIT = Setting.intSetting( + "esql.lucene_topn_limit", + IndexSettings.MAX_RESULT_WINDOW_SETTING.getDefault(Settings.EMPTY), + -1, + Setting.Property.NodeScope, + Setting.Property.Dynamic + ); + private volatile DataPartitioning defaultDataPartitioning; private volatile ByteSizeValue valuesLoadingJumboSize; + private volatile int luceneTopNLimit; /** * Ctor for prod that listens for updates from the {@link ClusterService}. @@ -44,14 +55,16 @@ public class PhysicalSettings { public PhysicalSettings(ClusterService clusterService) { clusterService.getClusterSettings().initializeAndWatch(DEFAULT_DATA_PARTITIONING, v -> this.defaultDataPartitioning = v); clusterService.getClusterSettings().initializeAndWatch(VALUES_LOADING_JUMBO_SIZE, v -> this.valuesLoadingJumboSize = v); + clusterService.getClusterSettings().initializeAndWatch(LUCENE_TOPN_LIMIT, v -> this.luceneTopNLimit = v); } /** * Ctor for testing. */ - public PhysicalSettings(DataPartitioning defaultDataPartitioning, ByteSizeValue valuesLoadingJumboSize) { + public PhysicalSettings(DataPartitioning defaultDataPartitioning, ByteSizeValue valuesLoadingJumboSize, int luceneTopNLimit) { this.defaultDataPartitioning = defaultDataPartitioning; this.valuesLoadingJumboSize = valuesLoadingJumboSize; + this.luceneTopNLimit = luceneTopNLimit; } public DataPartitioning defaultDataPartitioning() { @@ -61,4 +74,22 @@ public DataPartitioning defaultDataPartitioning() { public ByteSizeValue valuesLoadingJumboSize() { return valuesLoadingJumboSize; } + + /** + * Maximum {@code LIMIT} that we're willing to push to Lucene's topn. + *

+ * Lucene's topn code was designed for search + * which typically fetches 10 or 30 or 50 or 100 or 1000 documents. + * That's as many you want on a page, and that's what it's designed for. + * But if you go to, say, page 10, Lucene implements this as a search + * for {@code page_size * page_number} docs and then materializes only + * the last {@code page_size} documents. Traditionally, Elasticsearch + * limits that {@code page_size * page_number} which it calls the + * {@link IndexSettings#MAX_RESULT_WINDOW_SETTING "result window"}. + * So! ESQL defaults to the same default - {@code 10,000}. + *

+ */ + public int luceneTopNLimit() { + return luceneTopNLimit; + } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/PlannerUtils.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/PlannerUtils.java index 8296eeede62ec..fc7184d01976e 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/PlannerUtils.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/PlannerUtils.java @@ -180,16 +180,18 @@ private static void forEachRelation(PhysicalPlan plan, Consumer acti } public static PhysicalPlan localPlan( + PhysicalSettings physicalSettings, EsqlFlags flags, List searchContexts, Configuration configuration, FoldContext foldCtx, PhysicalPlan plan ) { - return localPlan(flags, configuration, foldCtx, plan, SearchContextStats.from(searchContexts)); + return localPlan(physicalSettings, flags, configuration, foldCtx, plan, SearchContextStats.from(searchContexts)); } public static PhysicalPlan localPlan( + PhysicalSettings physicalSettings, EsqlFlags flags, Configuration configuration, FoldContext foldCtx, @@ -198,7 +200,7 @@ public static PhysicalPlan localPlan( ) { final var logicalOptimizer = new LocalLogicalPlanOptimizer(new LocalLogicalOptimizerContext(configuration, foldCtx, searchStats)); var physicalOptimizer = new LocalPhysicalPlanOptimizer( - new LocalPhysicalOptimizerContext(flags, configuration, foldCtx, searchStats) + new LocalPhysicalOptimizerContext(physicalSettings, flags, configuration, foldCtx, searchStats) ); return localPlan(plan, logicalOptimizer, physicalOptimizer); diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java index c120c23c0f19c..ef72beff1ebf8 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java @@ -638,6 +638,7 @@ public SourceProvider createSourceProvider(SourceFilter sourceFilter) { LOGGER.debug("Received physical plan:\n{}", plan); var localPlan = PlannerUtils.localPlan( + physicalSettings, context.flags(), context.searchExecutionContexts(), context.configuration(), diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlPlugin.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlPlugin.java index df203980d43ef..c026718bf178a 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlPlugin.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlPlugin.java @@ -278,6 +278,7 @@ public List> getSettings() { ESQL_QUERYLOG_INCLUDE_USER_SETTING, PhysicalSettings.DEFAULT_DATA_PARTITIONING, PhysicalSettings.VALUES_LOADING_JUMBO_SIZE, + PhysicalSettings.LUCENE_TOPN_LIMIT, STORED_FIELDS_SEQUENTIAL_PROPORTION, EsqlFlags.ESQL_STRING_LIKE_ON_INDEX, EsqlFlags.ESQL_ROUNDTO_PUSHDOWN_THRESHOLD diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/CsvTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/CsvTests.java index d445cf443dc06..172017d9cde90 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/CsvTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/CsvTests.java @@ -26,6 +26,7 @@ import org.elasticsearch.common.util.iterable.Iterables; import org.elasticsearch.compute.data.BlockFactory; import org.elasticsearch.compute.data.Page; +import org.elasticsearch.compute.lucene.DataPartitioning; import org.elasticsearch.compute.operator.Driver; import org.elasticsearch.compute.operator.DriverCompletionInfo; import org.elasticsearch.compute.operator.DriverRunner; @@ -87,6 +88,7 @@ import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan; import org.elasticsearch.xpack.esql.planner.LocalExecutionPlanner; import org.elasticsearch.xpack.esql.planner.LocalExecutionPlanner.LocalExecutionPlan; +import org.elasticsearch.xpack.esql.planner.PhysicalSettings; import org.elasticsearch.xpack.esql.planner.PlannerUtils; import org.elasticsearch.xpack.esql.planner.TestPhysicalOperationProviders; import org.elasticsearch.xpack.esql.planner.mapper.Mapper; @@ -725,8 +727,10 @@ void executeSubPlan( if (dataNodePlan != null) { var searchStats = new DisabledSearchStats(); var logicalTestOptimizer = new LocalLogicalPlanOptimizer(new LocalLogicalOptimizerContext(configuration, foldCtx, searchStats)); + var physicalSettings = new PhysicalSettings(DataPartitioning.AUTO, ByteSizeValue.ofMb(1), 10_000); + var flags = new EsqlFlags(true); var physicalTestOptimizer = new TestLocalPhysicalPlanOptimizer( - new LocalPhysicalOptimizerContext(new EsqlFlags(true), configuration, foldCtx, searchStats) + new LocalPhysicalOptimizerContext(physicalSettings, flags, configuration, foldCtx, searchStats) ); var csvDataNodePhysicalPlan = PlannerUtils.localPlan(dataNodePlan, logicalTestOptimizer, physicalTestOptimizer); diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LocalPhysicalPlanOptimizerTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LocalPhysicalPlanOptimizerTests.java index affe9c26cc67b..d4d57809bdc04 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LocalPhysicalPlanOptimizerTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LocalPhysicalPlanOptimizerTests.java @@ -12,7 +12,9 @@ import org.apache.lucene.search.IndexSearcher; import org.elasticsearch.common.network.NetworkAddress; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.Fuzziness; +import org.elasticsearch.compute.lucene.DataPartitioning; import org.elasticsearch.core.Tuple; import org.elasticsearch.index.IndexMode; import org.elasticsearch.index.mapper.MapperService; @@ -96,6 +98,7 @@ import org.elasticsearch.xpack.esql.plan.physical.ProjectExec; import org.elasticsearch.xpack.esql.plan.physical.TopNExec; import org.elasticsearch.xpack.esql.planner.FilterTests; +import org.elasticsearch.xpack.esql.planner.PhysicalSettings; import org.elasticsearch.xpack.esql.plugin.EsqlFlags; import org.elasticsearch.xpack.esql.plugin.QueryPragmas; import org.elasticsearch.xpack.esql.querydsl.query.SingleValueQuery; @@ -2472,8 +2475,11 @@ public void testVerifierOnMissingReferences() throws Exception { } private LocalPhysicalPlanOptimizer getCustomRulesLocalPhysicalPlanOptimizer(List> batches) { + var physicalSettings = new PhysicalSettings(DataPartitioning.AUTO, ByteSizeValue.ofMb(1), 10_000); + var flags = new EsqlFlags(true); LocalPhysicalOptimizerContext context = new LocalPhysicalOptimizerContext( - new EsqlFlags(true), + physicalSettings, + flags, config, FoldContext.small(), SearchStats.EMPTY diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/PhysicalPlanOptimizerTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/PhysicalPlanOptimizerTests.java index 336551f9d0996..b6b8367379b37 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/PhysicalPlanOptimizerTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/PhysicalPlanOptimizerTests.java @@ -7876,7 +7876,9 @@ private LocalExecutionPlanner.LocalExecutionPlan physicalOperationsFromPhysicalP // The TopN needs an estimated row size for the planner to work var plans = PlannerUtils.breakPlanBetweenCoordinatorAndDataNode(EstimatesRowSize.estimateRowSize(0, plan), config); plan = useDataNodePlan ? plans.v2() : plans.v1(); - plan = PlannerUtils.localPlan(new EsqlFlags(true), config, FoldContext.small(), plan, TEST_SEARCH_STATS); + var physicalSettings = new PhysicalSettings(DataPartitioning.AUTO, ByteSizeValue.ofMb(1), 10_000); + var flags = new EsqlFlags(true); + plan = PlannerUtils.localPlan(physicalSettings, flags, config, FoldContext.small(), plan, TEST_SEARCH_STATS); ExchangeSinkHandler exchangeSinkHandler = new ExchangeSinkHandler(null, 10, () -> 10); LocalExecutionPlanner planner = new LocalExecutionPlanner( "test", @@ -7895,7 +7897,7 @@ private LocalExecutionPlanner.LocalExecutionPlan physicalOperationsFromPhysicalP FoldContext.small(), List.of(), null, - new PhysicalSettings(DataPartitioning.AUTO, ByteSizeValue.ofMb(1)) + new PhysicalSettings(DataPartitioning.AUTO, ByteSizeValue.ofMb(1), 10_000) ), List.of() ); @@ -8253,7 +8255,9 @@ private PhysicalPlan optimizedPlan(PhysicalPlan plan, SearchStats searchStats) { // individually hence why here the plan is kept as is var l = p.transformUp(FragmentExec.class, fragment -> { - var localPlan = PlannerUtils.localPlan(new EsqlFlags(true), config, FoldContext.small(), fragment, searchStats); + var physicalSettings = new PhysicalSettings(DataPartitioning.AUTO, ByteSizeValue.ofMb(1), 10_000); + var flags = new EsqlFlags(true); + var localPlan = PlannerUtils.localPlan(physicalSettings, flags, config, FoldContext.small(), fragment, searchStats); return EstimatesRowSize.estimateRowSize(fragment.estimatedRowSize(), localPlan); }); diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/TestPlannerOptimizer.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/TestPlannerOptimizer.java index 0d1b4dbe73c93..6602c461c1735 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/TestPlannerOptimizer.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/TestPlannerOptimizer.java @@ -7,6 +7,8 @@ package org.elasticsearch.xpack.esql.optimizer; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.compute.lucene.DataPartitioning; import org.elasticsearch.xpack.esql.EsqlTestUtils; import org.elasticsearch.xpack.esql.analysis.Analyzer; import org.elasticsearch.xpack.esql.core.expression.FoldContext; @@ -14,6 +16,7 @@ import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan; import org.elasticsearch.xpack.esql.plan.physical.EstimatesRowSize; import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan; +import org.elasticsearch.xpack.esql.planner.PhysicalSettings; import org.elasticsearch.xpack.esql.planner.PlannerUtils; import org.elasticsearch.xpack.esql.planner.mapper.Mapper; import org.elasticsearch.xpack.esql.plugin.EsqlFlags; @@ -75,8 +78,9 @@ private PhysicalPlan optimizedPlan(PhysicalPlan plan, SearchStats searchStats, E var logicalTestOptimizer = new LocalLogicalPlanOptimizer( new LocalLogicalOptimizerContext(config, FoldContext.small(), searchStats) ); + var physicalSettings = new PhysicalSettings(DataPartitioning.AUTO, ByteSizeValue.ofMb(1), 10_000); var physicalTestOptimizer = new TestLocalPhysicalPlanOptimizer( - new LocalPhysicalOptimizerContext(esqlFlags, config, FoldContext.small(), searchStats), + new LocalPhysicalOptimizerContext(physicalSettings, esqlFlags, config, FoldContext.small(), searchStats), true ); var l = PlannerUtils.localPlan(physicalPlan, logicalTestOptimizer, physicalTestOptimizer); diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/PushTopNToSourceTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/PushTopNToSourceTests.java index 9f0744edf0b13..82a90ba283754 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/PushTopNToSourceTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/PushTopNToSourceTests.java @@ -8,6 +8,8 @@ package org.elasticsearch.xpack.esql.optimizer.rules.physical.local; import org.apache.lucene.util.BytesRef; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.compute.lucene.DataPartitioning; import org.elasticsearch.geometry.Geometry; import org.elasticsearch.geometry.utils.GeometryValidator; import org.elasticsearch.geometry.utils.WellKnownBinary; @@ -35,6 +37,7 @@ import org.elasticsearch.xpack.esql.plan.physical.EvalExec; import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan; import org.elasticsearch.xpack.esql.plan.physical.TopNExec; +import org.elasticsearch.xpack.esql.planner.PhysicalSettings; import org.elasticsearch.xpack.esql.plugin.EsqlFlags; import org.elasticsearch.xpack.esql.stats.SearchStats; @@ -417,7 +420,13 @@ private static void assertNoPushdownSort(TestPhysicalPlanBuilder builder, String private static PhysicalPlan pushTopNToSource(TopNExec topNExec) { var configuration = EsqlTestUtils.configuration("from test"); - var ctx = new LocalPhysicalOptimizerContext(new EsqlFlags(true), configuration, FoldContext.small(), SearchStats.EMPTY); + var ctx = new LocalPhysicalOptimizerContext( + new PhysicalSettings(DataPartitioning.AUTO, ByteSizeValue.ofMb(1), 10_000), + new EsqlFlags(true), + configuration, + FoldContext.small(), + SearchStats.EMPTY + ); var pushTopNToSource = new PushTopNToSource(); return pushTopNToSource.rule(topNExec, ctx); } diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProvidersTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProvidersTests.java index 6430967584553..23bc2f2b2990b 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProvidersTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProvidersTests.java @@ -82,7 +82,7 @@ record TestCase(QueryBuilder query, List nullsFilteredFields) { FoldContext.small(), List.of(new EsPhysicalOperationProviders.DefaultShardContext(0, () -> {}, createMockContext(), AliasFilter.EMPTY)), null, - new PhysicalSettings(DataPartitioning.AUTO, ByteSizeValue.ofMb(1)) + new PhysicalSettings(DataPartitioning.AUTO, ByteSizeValue.ofMb(1), 10_000) ); for (TestCase testCase : testCases) { EsQueryExec queryExec = new EsQueryExec( diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlannerTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlannerTests.java index 30d05ad09fc15..014733ab005b6 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlannerTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlannerTests.java @@ -330,7 +330,7 @@ private EsPhysicalOperationProviders esPhysicalOperationProviders(List