From 0568a4d6cf11a6da1bb29cfc8ba3c02ba19615c5 Mon Sep 17 00:00:00 2001 From: afoucret Date: Thu, 17 Jul 2025 12:49:25 +0200 Subject: [PATCH 1/7] ESQL: Add asynchronous pre-optimization step for logical plan --- .../_nightly/esql/QueryPlanningBenchmark.java | 16 ++- .../xpack/esql/execution/PlanExecutor.java | 3 + .../optimizer/LogicalPlanPreOptimizer.java | 50 ++++++++ .../optimizer/LogicalPreOptimizerContext.java | 43 +++++++ .../xpack/esql/plan/logical/LogicalPlan.java | 9 ++ .../xpack/esql/session/EsqlSession.java | 17 +-- .../elasticsearch/xpack/esql/CsvTests.java | 1 + .../LogicalPlanPreOptimizerTests.java | 110 ++++++++++++++++++ 8 files changed, 238 insertions(+), 11 deletions(-) create mode 100644 x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanPreOptimizer.java create mode 100644 x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LogicalPreOptimizerContext.java create mode 100644 x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanPreOptimizerTests.java diff --git a/benchmarks/src/main/java/org/elasticsearch/benchmark/_nightly/esql/QueryPlanningBenchmark.java b/benchmarks/src/main/java/org/elasticsearch/benchmark/_nightly/esql/QueryPlanningBenchmark.java index 3b4d445002073..2a99bcb96f1e0 100644 --- a/benchmarks/src/main/java/org/elasticsearch/benchmark/_nightly/esql/QueryPlanningBenchmark.java +++ b/benchmarks/src/main/java/org/elasticsearch/benchmark/_nightly/esql/QueryPlanningBenchmark.java @@ -9,6 +9,7 @@ package org.elasticsearch.benchmark._nightly.esql; +import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.common.logging.LogConfigurator; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.IndexMode; @@ -26,6 +27,8 @@ import org.elasticsearch.xpack.esql.inference.InferenceResolution; import org.elasticsearch.xpack.esql.optimizer.LogicalOptimizerContext; import org.elasticsearch.xpack.esql.optimizer.LogicalPlanOptimizer; +import org.elasticsearch.xpack.esql.optimizer.LogicalPlanPreOptimizer; +import org.elasticsearch.xpack.esql.optimizer.LogicalPreOptimizerContext; import org.elasticsearch.xpack.esql.parser.EsqlParser; import org.elasticsearch.xpack.esql.parser.QueryParams; import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan; @@ -70,6 +73,7 @@ public class QueryPlanningBenchmark { private EsqlParser defaultParser; private Analyzer manyFieldsAnalyzer; private LogicalPlanOptimizer defaultOptimizer; + private LogicalPlanPreOptimizer defaultPreOptimizer; private Configuration config; @Setup @@ -112,18 +116,22 @@ public void setup() { ), new Verifier(new Metrics(functionRegistry), new XPackLicenseState(() -> 0L)) ); + defaultOptimizer = new LogicalPlanOptimizer(new LogicalOptimizerContext(config, FoldContext.small())); + defaultPreOptimizer = new LogicalPlanPreOptimizer(new LogicalPreOptimizerContext(FoldContext.small())); } - private LogicalPlan plan(EsqlParser parser, Analyzer analyzer, LogicalPlanOptimizer optimizer, String query) { + private LogicalPlan plan(EsqlParser parser, Analyzer analyzer, LogicalPlanOptimizer optimizer, LogicalPlanPreOptimizer preOptimizer, String query) { + PlainActionFuture future = new PlainActionFuture<>(); var parsed = parser.createStatement(query, new QueryParams(), telemetry, config); var analyzed = analyzer.analyze(parsed); - var optimized = optimizer.optimize(analyzed); - return optimized; + analyzed.setAnalyzed(); + preOptimizer.preOptimize(analyzed, future.map(optimizer::optimize)); + return future.actionGet(); } @Benchmark public void manyFields(Blackhole blackhole) { - blackhole.consume(plan(defaultParser, manyFieldsAnalyzer, defaultOptimizer, "FROM test | LIMIT 10")); + blackhole.consume(plan(defaultParser, manyFieldsAnalyzer, defaultOptimizer, defaultPreOptimizer, "FROM test | LIMIT 10")); } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/execution/PlanExecutor.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/execution/PlanExecutor.java index 620d86650abf4..414e1f372ea3f 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/execution/PlanExecutor.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/execution/PlanExecutor.java @@ -21,6 +21,8 @@ import org.elasticsearch.xpack.esql.expression.function.EsqlFunctionRegistry; import org.elasticsearch.xpack.esql.optimizer.LogicalOptimizerContext; import org.elasticsearch.xpack.esql.optimizer.LogicalPlanOptimizer; +import org.elasticsearch.xpack.esql.optimizer.LogicalPlanPreOptimizer; +import org.elasticsearch.xpack.esql.optimizer.LogicalPreOptimizerContext; import org.elasticsearch.xpack.esql.planner.mapper.Mapper; import org.elasticsearch.xpack.esql.plugin.TransportActionServices; import org.elasticsearch.xpack.esql.querylog.EsqlQueryLog; @@ -85,6 +87,7 @@ public void esql( indexResolver, enrichPolicyResolver, preAnalyzer, + new LogicalPlanPreOptimizer(new LogicalPreOptimizerContext(foldContext)), functionRegistry, new LogicalPlanOptimizer(new LogicalOptimizerContext(cfg, foldContext)), mapper, diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanPreOptimizer.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanPreOptimizer.java new file mode 100644 index 0000000000000..fdd8e1318f636 --- /dev/null +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanPreOptimizer.java @@ -0,0 +1,50 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.esql.optimizer; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan; + +/** + * The class is responsible for invoking any steps that need to be applied to the logical plan, + * before this is being optimized. + *

+ * This is useful, especially if you need to execute some async tasks before the plan is optimized. + *

+ */ +public class LogicalPlanPreOptimizer { + + private final LogicalPreOptimizerContext preOptimizerContext; + + public LogicalPlanPreOptimizer(LogicalPreOptimizerContext preOptimizerContext) { + this.preOptimizerContext = preOptimizerContext; + } + + /** + * Pre-optimize a logical plan. + * + * @param plan the analyzed logical plan to pre-optimize + * @param listener the listener returning the pre-optimized plan when pre-optimization is complete + */ + public void preOptimize(LogicalPlan plan, ActionListener listener) { + if (plan.analyzed() == false) { + listener.onFailure(new IllegalStateException("Expected analyzed plan")); + return; + } + + doPreOptimize(plan, listener.delegateFailureAndWrap((l, preOptimized) -> { + preOptimized.setPreOptimized(); + listener.onResponse(preOptimized); + })); + } + + private void doPreOptimize(LogicalPlan plan, ActionListener listener) { + // this is where we will be executing async tasks + listener.onResponse(plan); + } +} diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LogicalPreOptimizerContext.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LogicalPreOptimizerContext.java new file mode 100644 index 0000000000000..d082bd56fc46d --- /dev/null +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LogicalPreOptimizerContext.java @@ -0,0 +1,43 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.esql.optimizer; + +import org.elasticsearch.xpack.esql.core.expression.FoldContext; + +import java.util.Objects; + +public class LogicalPreOptimizerContext { + + private final FoldContext foldCtx; + + public LogicalPreOptimizerContext(FoldContext foldCtx) { + this.foldCtx = foldCtx; + } + + public FoldContext foldCtx() { + return foldCtx; + } + + @Override + public boolean equals(Object obj) { + if (obj == this) return true; + if (obj == null || obj.getClass() != this.getClass()) return false; + var that = (LogicalPreOptimizerContext) obj; + return this.foldCtx.equals(that.foldCtx); + } + + @Override + public int hashCode() { + return Objects.hash(foldCtx); + } + + @Override + public String toString() { + return "LogicalPreOptimizerContext[foldCtx=" + foldCtx + ']'; + } +} diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/LogicalPlan.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/LogicalPlan.java index ac4baea8bc853..762b22389ae24 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/LogicalPlan.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/LogicalPlan.java @@ -25,6 +25,7 @@ public enum Stage { PARSED, PRE_ANALYZED, ANALYZED, + PRE_OPTIMIZED, OPTIMIZED; } @@ -52,6 +53,14 @@ public void setAnalyzed() { stage = Stage.ANALYZED; } + public boolean preOptimized() { + return stage.ordinal() >= Stage.PRE_OPTIMIZED.ordinal(); + } + + public void setPreOptimized() { + stage = Stage.PRE_OPTIMIZED; + } + public boolean optimized() { return stage.ordinal() >= Stage.OPTIMIZED.ordinal(); } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java index df18051bcf721..e1dac72cdde07 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java @@ -71,6 +71,7 @@ import org.elasticsearch.xpack.esql.inference.InferenceResolution; import org.elasticsearch.xpack.esql.inference.InferenceRunner; import org.elasticsearch.xpack.esql.optimizer.LogicalPlanOptimizer; +import org.elasticsearch.xpack.esql.optimizer.LogicalPlanPreOptimizer; import org.elasticsearch.xpack.esql.optimizer.PhysicalOptimizerContext; import org.elasticsearch.xpack.esql.optimizer.PhysicalPlanOptimizer; import org.elasticsearch.xpack.esql.parser.EsqlParser; @@ -147,6 +148,7 @@ public interface PlanRunner { private final PreAnalyzer preAnalyzer; private final Verifier verifier; private final EsqlFunctionRegistry functionRegistry; + private final LogicalPlanPreOptimizer logicalPlanPreOptimizer; private final LogicalPlanOptimizer logicalPlanOptimizer; private final PreMapper preMapper; @@ -168,6 +170,7 @@ public EsqlSession( IndexResolver indexResolver, EnrichPolicyResolver enrichPolicyResolver, PreAnalyzer preAnalyzer, + LogicalPlanPreOptimizer logicalPlanPreOptimizer, EsqlFunctionRegistry functionRegistry, LogicalPlanOptimizer logicalPlanOptimizer, Mapper mapper, @@ -181,6 +184,7 @@ public EsqlSession( this.indexResolver = indexResolver; this.enrichPolicyResolver = enrichPolicyResolver; this.preAnalyzer = preAnalyzer; + this.logicalPlanPreOptimizer = logicalPlanPreOptimizer; this.verifier = verifier; this.functionRegistry = functionRegistry; this.mapper = mapper; @@ -212,11 +216,10 @@ public void execute(EsqlQueryRequest request, EsqlExecutionInfo executionInfo, P analyzedPlan(parsed, executionInfo, request.filter(), new EsqlCCSUtils.CssPartialErrorsActionListener(executionInfo, listener) { @Override public void onResponse(LogicalPlan analyzedPlan) { - LogicalPlan optimizedPlan = optimizedPlan(analyzedPlan); - preMapper.preMapper( - optimizedPlan, - listener.delegateFailureAndWrap((l, p) -> executeOptimizedPlan(request, executionInfo, planRunner, p, l)) - ); + SubscribableListener.newForked(l -> logicalPlanPreOptimizer.preOptimize(analyzedPlan, l)) + .andThen((l, p) -> preMapper.preMapper(optimizedPlan(p), l)) + .andThen((l, p) -> executeOptimizedPlan(request, executionInfo, planRunner, p, l)) + .addListener(listener); } }); } @@ -1043,8 +1046,8 @@ private PhysicalPlan logicalPlanToPhysicalPlan(LogicalPlan optimizedPlan, EsqlQu } public LogicalPlan optimizedPlan(LogicalPlan logicalPlan) { - if (logicalPlan.analyzed() == false) { - throw new IllegalStateException("Expected analyzed plan"); + if (logicalPlan.preOptimized() == false) { + throw new IllegalStateException("Expected pre-optimized plan"); } var plan = logicalPlanOptimizer.optimize(logicalPlan); LOGGER.debug("Optimized logicalPlan plan:\n{}", plan); diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/CsvTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/CsvTests.java index 62280a38ba608..7183dac6f0e53 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/CsvTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/CsvTests.java @@ -582,6 +582,7 @@ private ActualResults executePlan(BigArrays bigArrays) throws Exception { null, null, null, + null, functionRegistry, new LogicalPlanOptimizer(new LogicalOptimizerContext(configuration, foldCtx)), mapper, diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanPreOptimizerTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanPreOptimizerTests.java new file mode 100644 index 0000000000000..bba12e73b4e0c --- /dev/null +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanPreOptimizerTests.java @@ -0,0 +1,110 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.esql.optimizer; + +import org.apache.lucene.util.SetOnce; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.xpack.esql.EsqlTestUtils; +import org.elasticsearch.xpack.esql.core.expression.Alias; +import org.elasticsearch.xpack.esql.core.expression.Expression; +import org.elasticsearch.xpack.esql.core.expression.FoldContext; +import org.elasticsearch.xpack.esql.core.tree.Source; +import org.elasticsearch.xpack.esql.expression.function.scalar.string.Concat; +import org.elasticsearch.xpack.esql.expression.predicate.operator.arithmetic.Add; +import org.elasticsearch.xpack.esql.plan.logical.Eval; +import org.elasticsearch.xpack.esql.plan.logical.Filter; +import org.elasticsearch.xpack.esql.plan.logical.Limit; +import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan; +import org.elasticsearch.xpack.esql.plan.logical.Project; + +import java.util.List; + +import static org.elasticsearch.xpack.esql.EsqlTestUtils.as; +import static org.elasticsearch.xpack.esql.EsqlTestUtils.fieldAttribute; +import static org.elasticsearch.xpack.esql.EsqlTestUtils.of; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.notNullValue; + +public class LogicalPlanPreOptimizerTests extends ESTestCase { + + public void testPlanIsMarkedAsPreOptimized() throws Exception { + for (int round = 0; round < 100; round++) { + // We want to make sure that the pre-optimizer woks for a wide range of plans + preOptimizedPlan(randomPlan()); + } + } + + public void testPreOptimizeFailsIfPlanIsNotAnalyzed() throws Exception { + LogicalPlan plan = EsqlTestUtils.relation(); + SetOnce exceptionHolder = new SetOnce<>(); + + preOptimizer().preOptimize(plan, ActionListener.wrap(r -> fail("Should have failed"), exceptionHolder::set)); + assertBusy(() -> { + assertThat(exceptionHolder.get(), notNullValue()); + IllegalStateException e = as(exceptionHolder.get(), IllegalStateException.class); + assertThat(e.getMessage(), equalTo("Expected analyzed plan")); + }); + } + + public LogicalPlan preOptimizedPlan(LogicalPlan plan) throws Exception { + // set plan as analyzed + plan.setPreOptimized(); + + SetOnce resultHolder = new SetOnce<>(); + SetOnce exceptionHolder = new SetOnce<>(); + + preOptimizer().preOptimize(plan, ActionListener.wrap(resultHolder::set, exceptionHolder::set)); + + if (exceptionHolder.get() != null) { + throw exceptionHolder.get(); + } + + assertThat(resultHolder.get(), notNullValue()); + assertThat(resultHolder.get().preOptimized(), equalTo(true)); + + return resultHolder.get(); + } + + private LogicalPlanPreOptimizer preOptimizer() { + LogicalPreOptimizerContext preOptimizerContext = new LogicalPreOptimizerContext(FoldContext.small()); + return new LogicalPlanPreOptimizer(preOptimizerContext); + } + + private LogicalPlan randomPlan() { + LogicalPlan plan = EsqlTestUtils.relation(); + int numCommands = between(0, 100); + + for (int i = 0; i < numCommands; i++) { + plan = switch (randomInt(3)) { + case 0 -> new Eval(Source.EMPTY, plan, List.of(new Alias(Source.EMPTY, randomIdentifier(), randomExpression()))); + case 1 -> new Limit(Source.EMPTY, of(randomInt()), plan); + case 2 -> new Filter(Source.EMPTY, plan, randomExpression()); + default -> new Project(Source.EMPTY, plan, List.of(new Alias(Source.EMPTY, randomIdentifier(), fieldAttribute()))); + }; + } + return plan; + } + + private Expression randomExpression() { + return switch (randomInt(3)) { + case 0 -> of(randomInt()); + case 1 -> of(randomIdentifier()); + case 2 -> new Add(Source.EMPTY, of(randomInt()), of(randomDouble())); + default -> new Concat(Source.EMPTY, of(randomIdentifier()), randomList(1, 10, () -> of(randomIdentifier()))); + }; + } + + private Expression randomCondition() { + if (randomBoolean()) { + return EsqlTestUtils.equalsOf(randomExpression(), randomExpression()); + } + + return EsqlTestUtils.greaterThanOf(randomExpression(), randomExpression()); + } +} From 5da14bbb1651381c25629df016616b3643f71466 Mon Sep 17 00:00:00 2001 From: elasticsearchmachine Date: Thu, 17 Jul 2025 12:45:57 +0000 Subject: [PATCH 2/7] [CI] Auto commit changes from spotless --- .../benchmark/_nightly/esql/QueryPlanningBenchmark.java | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/benchmarks/src/main/java/org/elasticsearch/benchmark/_nightly/esql/QueryPlanningBenchmark.java b/benchmarks/src/main/java/org/elasticsearch/benchmark/_nightly/esql/QueryPlanningBenchmark.java index 2a99bcb96f1e0..db97db98e6a2b 100644 --- a/benchmarks/src/main/java/org/elasticsearch/benchmark/_nightly/esql/QueryPlanningBenchmark.java +++ b/benchmarks/src/main/java/org/elasticsearch/benchmark/_nightly/esql/QueryPlanningBenchmark.java @@ -121,7 +121,13 @@ public void setup() { defaultPreOptimizer = new LogicalPlanPreOptimizer(new LogicalPreOptimizerContext(FoldContext.small())); } - private LogicalPlan plan(EsqlParser parser, Analyzer analyzer, LogicalPlanOptimizer optimizer, LogicalPlanPreOptimizer preOptimizer, String query) { + private LogicalPlan plan( + EsqlParser parser, + Analyzer analyzer, + LogicalPlanOptimizer optimizer, + LogicalPlanPreOptimizer preOptimizer, + String query + ) { PlainActionFuture future = new PlainActionFuture<>(); var parsed = parser.createStatement(query, new QueryParams(), telemetry, config); var analyzed = analyzer.analyze(parsed); From 38aa860690040beb553f8ccc39dfda4436d2b583 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aur=C3=A9lien=20FOUCRET?= Date: Thu, 17 Jul 2025 15:37:33 +0200 Subject: [PATCH 3/7] Fix copy//paste --- .../xpack/esql/optimizer/LogicalPlanPreOptimizerTests.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanPreOptimizerTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanPreOptimizerTests.java index bba12e73b4e0c..faf997ef762da 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanPreOptimizerTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanPreOptimizerTests.java @@ -84,7 +84,7 @@ private LogicalPlan randomPlan() { plan = switch (randomInt(3)) { case 0 -> new Eval(Source.EMPTY, plan, List.of(new Alias(Source.EMPTY, randomIdentifier(), randomExpression()))); case 1 -> new Limit(Source.EMPTY, of(randomInt()), plan); - case 2 -> new Filter(Source.EMPTY, plan, randomExpression()); +case 2 -> new Filter(Source.EMPTY, plan, randomCondition()); default -> new Project(Source.EMPTY, plan, List.of(new Alias(Source.EMPTY, randomIdentifier(), fieldAttribute()))); }; } From a4d27ac3a308b2b24020f53ad7cd312b9300a88a Mon Sep 17 00:00:00 2001 From: afoucret Date: Thu, 17 Jul 2025 15:41:34 +0200 Subject: [PATCH 4/7] Revert uselss change in QueryPlanningBenchmark --- .../_nightly/esql/QueryPlanningBenchmark.java | 22 ++++--------------- 1 file changed, 4 insertions(+), 18 deletions(-) diff --git a/benchmarks/src/main/java/org/elasticsearch/benchmark/_nightly/esql/QueryPlanningBenchmark.java b/benchmarks/src/main/java/org/elasticsearch/benchmark/_nightly/esql/QueryPlanningBenchmark.java index db97db98e6a2b..3b4d445002073 100644 --- a/benchmarks/src/main/java/org/elasticsearch/benchmark/_nightly/esql/QueryPlanningBenchmark.java +++ b/benchmarks/src/main/java/org/elasticsearch/benchmark/_nightly/esql/QueryPlanningBenchmark.java @@ -9,7 +9,6 @@ package org.elasticsearch.benchmark._nightly.esql; -import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.common.logging.LogConfigurator; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.IndexMode; @@ -27,8 +26,6 @@ import org.elasticsearch.xpack.esql.inference.InferenceResolution; import org.elasticsearch.xpack.esql.optimizer.LogicalOptimizerContext; import org.elasticsearch.xpack.esql.optimizer.LogicalPlanOptimizer; -import org.elasticsearch.xpack.esql.optimizer.LogicalPlanPreOptimizer; -import org.elasticsearch.xpack.esql.optimizer.LogicalPreOptimizerContext; import org.elasticsearch.xpack.esql.parser.EsqlParser; import org.elasticsearch.xpack.esql.parser.QueryParams; import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan; @@ -73,7 +70,6 @@ public class QueryPlanningBenchmark { private EsqlParser defaultParser; private Analyzer manyFieldsAnalyzer; private LogicalPlanOptimizer defaultOptimizer; - private LogicalPlanPreOptimizer defaultPreOptimizer; private Configuration config; @Setup @@ -116,28 +112,18 @@ public void setup() { ), new Verifier(new Metrics(functionRegistry), new XPackLicenseState(() -> 0L)) ); - defaultOptimizer = new LogicalPlanOptimizer(new LogicalOptimizerContext(config, FoldContext.small())); - defaultPreOptimizer = new LogicalPlanPreOptimizer(new LogicalPreOptimizerContext(FoldContext.small())); } - private LogicalPlan plan( - EsqlParser parser, - Analyzer analyzer, - LogicalPlanOptimizer optimizer, - LogicalPlanPreOptimizer preOptimizer, - String query - ) { - PlainActionFuture future = new PlainActionFuture<>(); + private LogicalPlan plan(EsqlParser parser, Analyzer analyzer, LogicalPlanOptimizer optimizer, String query) { var parsed = parser.createStatement(query, new QueryParams(), telemetry, config); var analyzed = analyzer.analyze(parsed); - analyzed.setAnalyzed(); - preOptimizer.preOptimize(analyzed, future.map(optimizer::optimize)); - return future.actionGet(); + var optimized = optimizer.optimize(analyzed); + return optimized; } @Benchmark public void manyFields(Blackhole blackhole) { - blackhole.consume(plan(defaultParser, manyFieldsAnalyzer, defaultOptimizer, defaultPreOptimizer, "FROM test | LIMIT 10")); + blackhole.consume(plan(defaultParser, manyFieldsAnalyzer, defaultOptimizer, "FROM test | LIMIT 10")); } } From d4e857da444cf59cae7f9f9abe6a7f558462cb2f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aur=C3=A9lien=20FOUCRET?= Date: Thu, 17 Jul 2025 15:45:59 +0200 Subject: [PATCH 5/7] Lint --- .../xpack/esql/optimizer/LogicalPlanPreOptimizerTests.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanPreOptimizerTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanPreOptimizerTests.java index faf997ef762da..5356cfc6c0b5d 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanPreOptimizerTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanPreOptimizerTests.java @@ -84,7 +84,7 @@ private LogicalPlan randomPlan() { plan = switch (randomInt(3)) { case 0 -> new Eval(Source.EMPTY, plan, List.of(new Alias(Source.EMPTY, randomIdentifier(), randomExpression()))); case 1 -> new Limit(Source.EMPTY, of(randomInt()), plan); -case 2 -> new Filter(Source.EMPTY, plan, randomCondition()); + case 2 -> new Filter(Source.EMPTY, plan, randomCondition()); default -> new Project(Source.EMPTY, plan, List.of(new Alias(Source.EMPTY, randomIdentifier(), fieldAttribute()))); }; } From b061fc7550eb10ffb198f01036d3b3eb3e8820cc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aur=C3=A9lien=20FOUCRET?= Date: Thu, 17 Jul 2025 15:48:30 +0200 Subject: [PATCH 6/7] Lint again --- .../xpack/esql/optimizer/LogicalPlanPreOptimizerTests.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanPreOptimizerTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanPreOptimizerTests.java index 5356cfc6c0b5d..8e573dd1cf3c9 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanPreOptimizerTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanPreOptimizerTests.java @@ -84,7 +84,7 @@ private LogicalPlan randomPlan() { plan = switch (randomInt(3)) { case 0 -> new Eval(Source.EMPTY, plan, List.of(new Alias(Source.EMPTY, randomIdentifier(), randomExpression()))); case 1 -> new Limit(Source.EMPTY, of(randomInt()), plan); - case 2 -> new Filter(Source.EMPTY, plan, randomCondition()); + case 2 -> new Filter(Source.EMPTY, plan, randomCondition()); default -> new Project(Source.EMPTY, plan, List.of(new Alias(Source.EMPTY, randomIdentifier(), fieldAttribute()))); }; } From 26d815e5602b358e6022b1585eabb94c3cc00068 Mon Sep 17 00:00:00 2001 From: afoucret Date: Thu, 17 Jul 2025 17:05:58 +0200 Subject: [PATCH 7/7] Fix CsvTests --- .../xpack/esql/session/EsqlSession.java | 6 ++- .../elasticsearch/xpack/esql/CsvTests.java | 39 +++++++++++-------- 2 files changed, 27 insertions(+), 18 deletions(-) diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java index e1dac72cdde07..4590cb8c6d3bd 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java @@ -216,7 +216,7 @@ public void execute(EsqlQueryRequest request, EsqlExecutionInfo executionInfo, P analyzedPlan(parsed, executionInfo, request.filter(), new EsqlCCSUtils.CssPartialErrorsActionListener(executionInfo, listener) { @Override public void onResponse(LogicalPlan analyzedPlan) { - SubscribableListener.newForked(l -> logicalPlanPreOptimizer.preOptimize(analyzedPlan, l)) + SubscribableListener.newForked(l -> preOptimizedPlan(analyzedPlan, l)) .andThen((l, p) -> preMapper.preMapper(optimizedPlan(p), l)) .andThen((l, p) -> executeOptimizedPlan(request, executionInfo, planRunner, p, l)) .addListener(listener); @@ -1054,6 +1054,10 @@ public LogicalPlan optimizedPlan(LogicalPlan logicalPlan) { return plan; } + public void preOptimizedPlan(LogicalPlan logicalPlan, ActionListener listener) { + logicalPlanPreOptimizer.preOptimize(logicalPlan, listener); + } + public PhysicalPlan physicalPlan(LogicalPlan optimizedPlan) { if (optimizedPlan.optimized() == false) { throw new IllegalStateException("Expected optimized plan"); diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/CsvTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/CsvTests.java index 7183dac6f0e53..b38b3089823d5 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/CsvTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/CsvTests.java @@ -73,6 +73,8 @@ import org.elasticsearch.xpack.esql.optimizer.LocalPhysicalOptimizerContext; import org.elasticsearch.xpack.esql.optimizer.LogicalOptimizerContext; import org.elasticsearch.xpack.esql.optimizer.LogicalPlanOptimizer; +import org.elasticsearch.xpack.esql.optimizer.LogicalPlanPreOptimizer; +import org.elasticsearch.xpack.esql.optimizer.LogicalPreOptimizerContext; import org.elasticsearch.xpack.esql.optimizer.TestLocalPhysicalPlanOptimizer; import org.elasticsearch.xpack.esql.parser.EsqlParser; import org.elasticsearch.xpack.esql.plan.logical.Enrich; @@ -582,7 +584,7 @@ private ActualResults executePlan(BigArrays bigArrays) throws Exception { null, null, null, - null, + new LogicalPlanPreOptimizer(new LogicalPreOptimizerContext(foldCtx)), functionRegistry, new LogicalPlanOptimizer(new LogicalOptimizerContext(configuration, foldCtx)), mapper, @@ -595,24 +597,27 @@ private ActualResults executePlan(BigArrays bigArrays) throws Exception { PlainActionFuture listener = new PlainActionFuture<>(); - session.executeOptimizedPlan( - new EsqlQueryRequest(), - new EsqlExecutionInfo(randomBoolean()), - planRunner(bigArrays, foldCtx, physicalOperationProviders), - session.optimizedPlan(analyzed), - listener.delegateFailureAndWrap( - // Wrap so we can capture the warnings in the calling thread - (next, result) -> next.onResponse( - new ActualResults( - result.schema().stream().map(Attribute::name).toList(), - result.schema().stream().map(a -> Type.asType(a.dataType().nameUpper())).toList(), - result.schema().stream().map(Attribute::dataType).toList(), - result.pages(), - threadPool.getThreadContext().getResponseHeaders() + session.preOptimizedPlan(analyzed, listener.delegateFailureAndWrap((l, preOptimized) -> { + session.executeOptimizedPlan( + new EsqlQueryRequest(), + new EsqlExecutionInfo(randomBoolean()), + planRunner(bigArrays, foldCtx, physicalOperationProviders), + session.optimizedPlan(preOptimized), + listener.delegateFailureAndWrap( + // Wrap so we can capture the warnings in the calling thread + (next, result) -> next.onResponse( + new ActualResults( + result.schema().stream().map(Attribute::name).toList(), + result.schema().stream().map(a -> Type.asType(a.dataType().nameUpper())).toList(), + result.schema().stream().map(Attribute::dataType).toList(), + result.pages(), + threadPool.getThreadContext().getResponseHeaders() + ) ) ) - ) - ); + ); + })); + return listener.get(); }