Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import org.elasticsearch.xpack.esql.expression.function.EsqlFunctionRegistry;
import org.elasticsearch.xpack.esql.optimizer.LogicalOptimizerContext;
import org.elasticsearch.xpack.esql.optimizer.LogicalPlanOptimizer;
import org.elasticsearch.xpack.esql.optimizer.LogicalPlanPreOptimizer;
import org.elasticsearch.xpack.esql.optimizer.LogicalPreOptimizerContext;
import org.elasticsearch.xpack.esql.planner.mapper.Mapper;
import org.elasticsearch.xpack.esql.plugin.TransportActionServices;
import org.elasticsearch.xpack.esql.querylog.EsqlQueryLog;
Expand Down Expand Up @@ -85,6 +87,7 @@ public void esql(
indexResolver,
enrichPolicyResolver,
preAnalyzer,
new LogicalPlanPreOptimizer(new LogicalPreOptimizerContext(foldContext)),
functionRegistry,
new LogicalPlanOptimizer(new LogicalOptimizerContext(cfg, foldContext)),
mapper,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.esql.optimizer;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;

/**
* The class is responsible for invoking any steps that need to be applied to the logical plan,
* before this is being optimized.
* <p>
* This is useful, especially if you need to execute some async tasks before the plan is optimized.
* </p>
*/
public class LogicalPlanPreOptimizer {

private final LogicalPreOptimizerContext preOptimizerContext;

public LogicalPlanPreOptimizer(LogicalPreOptimizerContext preOptimizerContext) {
this.preOptimizerContext = preOptimizerContext;
}

/**
* Pre-optimize a logical plan.
*
* @param plan the analyzed logical plan to pre-optimize
* @param listener the listener returning the pre-optimized plan when pre-optimization is complete
*/
public void preOptimize(LogicalPlan plan, ActionListener<LogicalPlan> listener) {
if (plan.analyzed() == false) {
listener.onFailure(new IllegalStateException("Expected analyzed plan"));
return;
}

doPreOptimize(plan, listener.delegateFailureAndWrap((l, preOptimized) -> {
preOptimized.setPreOptimized();
listener.onResponse(preOptimized);
}));
}

private void doPreOptimize(LogicalPlan plan, ActionListener<LogicalPlan> listener) {
// this is where we will be executing async tasks
listener.onResponse(plan);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.esql.optimizer;

import org.elasticsearch.xpack.esql.core.expression.FoldContext;

import java.util.Objects;

public class LogicalPreOptimizerContext {

private final FoldContext foldCtx;

public LogicalPreOptimizerContext(FoldContext foldCtx) {
this.foldCtx = foldCtx;
}

public FoldContext foldCtx() {
return foldCtx;
}

@Override
public boolean equals(Object obj) {
if (obj == this) return true;
if (obj == null || obj.getClass() != this.getClass()) return false;
var that = (LogicalPreOptimizerContext) obj;
return this.foldCtx.equals(that.foldCtx);
}

@Override
public int hashCode() {
return Objects.hash(foldCtx);
}

@Override
public String toString() {
return "LogicalPreOptimizerContext[foldCtx=" + foldCtx + ']';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ public enum Stage {
PARSED,
PRE_ANALYZED,
ANALYZED,
PRE_OPTIMIZED,
OPTIMIZED;
}

Expand Down Expand Up @@ -52,6 +53,14 @@ public void setAnalyzed() {
stage = Stage.ANALYZED;
}

public boolean preOptimized() {
return stage.ordinal() >= Stage.PRE_OPTIMIZED.ordinal();
}

public void setPreOptimized() {
stage = Stage.PRE_OPTIMIZED;
}

public boolean optimized() {
return stage.ordinal() >= Stage.OPTIMIZED.ordinal();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
import org.elasticsearch.xpack.esql.inference.InferenceResolution;
import org.elasticsearch.xpack.esql.inference.InferenceRunner;
import org.elasticsearch.xpack.esql.optimizer.LogicalPlanOptimizer;
import org.elasticsearch.xpack.esql.optimizer.LogicalPlanPreOptimizer;
import org.elasticsearch.xpack.esql.optimizer.PhysicalOptimizerContext;
import org.elasticsearch.xpack.esql.optimizer.PhysicalPlanOptimizer;
import org.elasticsearch.xpack.esql.parser.EsqlParser;
Expand Down Expand Up @@ -147,6 +148,7 @@ public interface PlanRunner {
private final PreAnalyzer preAnalyzer;
private final Verifier verifier;
private final EsqlFunctionRegistry functionRegistry;
private final LogicalPlanPreOptimizer logicalPlanPreOptimizer;
private final LogicalPlanOptimizer logicalPlanOptimizer;
private final PreMapper preMapper;

Expand All @@ -168,6 +170,7 @@ public EsqlSession(
IndexResolver indexResolver,
EnrichPolicyResolver enrichPolicyResolver,
PreAnalyzer preAnalyzer,
LogicalPlanPreOptimizer logicalPlanPreOptimizer,
EsqlFunctionRegistry functionRegistry,
LogicalPlanOptimizer logicalPlanOptimizer,
Mapper mapper,
Expand All @@ -181,6 +184,7 @@ public EsqlSession(
this.indexResolver = indexResolver;
this.enrichPolicyResolver = enrichPolicyResolver;
this.preAnalyzer = preAnalyzer;
this.logicalPlanPreOptimizer = logicalPlanPreOptimizer;
this.verifier = verifier;
this.functionRegistry = functionRegistry;
this.mapper = mapper;
Expand Down Expand Up @@ -212,11 +216,10 @@ public void execute(EsqlQueryRequest request, EsqlExecutionInfo executionInfo, P
analyzedPlan(parsed, executionInfo, request.filter(), new EsqlCCSUtils.CssPartialErrorsActionListener(executionInfo, listener) {
@Override
public void onResponse(LogicalPlan analyzedPlan) {
LogicalPlan optimizedPlan = optimizedPlan(analyzedPlan);
preMapper.preMapper(
optimizedPlan,
listener.delegateFailureAndWrap((l, p) -> executeOptimizedPlan(request, executionInfo, planRunner, p, l))
);
SubscribableListener.<LogicalPlan>newForked(l -> preOptimizedPlan(analyzedPlan, l))
.<LogicalPlan>andThen((l, p) -> preMapper.preMapper(optimizedPlan(p), l))
.<Result>andThen((l, p) -> executeOptimizedPlan(request, executionInfo, planRunner, p, l))
.addListener(listener);
}
});
}
Expand Down Expand Up @@ -1043,14 +1046,18 @@ private PhysicalPlan logicalPlanToPhysicalPlan(LogicalPlan optimizedPlan, EsqlQu
}

public LogicalPlan optimizedPlan(LogicalPlan logicalPlan) {
if (logicalPlan.analyzed() == false) {
throw new IllegalStateException("Expected analyzed plan");
if (logicalPlan.preOptimized() == false) {
throw new IllegalStateException("Expected pre-optimized plan");
}
var plan = logicalPlanOptimizer.optimize(logicalPlan);
LOGGER.debug("Optimized logicalPlan plan:\n{}", plan);
return plan;
}

public void preOptimizedPlan(LogicalPlan logicalPlan, ActionListener<LogicalPlan> listener) {
logicalPlanPreOptimizer.preOptimize(logicalPlan, listener);
}

public PhysicalPlan physicalPlan(LogicalPlan optimizedPlan) {
if (optimizedPlan.optimized() == false) {
throw new IllegalStateException("Expected optimized plan");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@
import org.elasticsearch.xpack.esql.optimizer.LocalPhysicalOptimizerContext;
import org.elasticsearch.xpack.esql.optimizer.LogicalOptimizerContext;
import org.elasticsearch.xpack.esql.optimizer.LogicalPlanOptimizer;
import org.elasticsearch.xpack.esql.optimizer.LogicalPlanPreOptimizer;
import org.elasticsearch.xpack.esql.optimizer.LogicalPreOptimizerContext;
import org.elasticsearch.xpack.esql.optimizer.TestLocalPhysicalPlanOptimizer;
import org.elasticsearch.xpack.esql.parser.EsqlParser;
import org.elasticsearch.xpack.esql.plan.logical.Enrich;
Expand Down Expand Up @@ -582,6 +584,7 @@ private ActualResults executePlan(BigArrays bigArrays) throws Exception {
null,
null,
null,
new LogicalPlanPreOptimizer(new LogicalPreOptimizerContext(foldCtx)),
functionRegistry,
new LogicalPlanOptimizer(new LogicalOptimizerContext(configuration, foldCtx)),
mapper,
Expand All @@ -594,24 +597,27 @@ private ActualResults executePlan(BigArrays bigArrays) throws Exception {

PlainActionFuture<ActualResults> listener = new PlainActionFuture<>();

session.executeOptimizedPlan(
new EsqlQueryRequest(),
new EsqlExecutionInfo(randomBoolean()),
planRunner(bigArrays, foldCtx, physicalOperationProviders),
session.optimizedPlan(analyzed),
listener.delegateFailureAndWrap(
// Wrap so we can capture the warnings in the calling thread
(next, result) -> next.onResponse(
new ActualResults(
result.schema().stream().map(Attribute::name).toList(),
result.schema().stream().map(a -> Type.asType(a.dataType().nameUpper())).toList(),
result.schema().stream().map(Attribute::dataType).toList(),
result.pages(),
threadPool.getThreadContext().getResponseHeaders()
session.preOptimizedPlan(analyzed, listener.delegateFailureAndWrap((l, preOptimized) -> {
session.executeOptimizedPlan(
new EsqlQueryRequest(),
new EsqlExecutionInfo(randomBoolean()),
planRunner(bigArrays, foldCtx, physicalOperationProviders),
session.optimizedPlan(preOptimized),
listener.delegateFailureAndWrap(
// Wrap so we can capture the warnings in the calling thread
(next, result) -> next.onResponse(
new ActualResults(
result.schema().stream().map(Attribute::name).toList(),
result.schema().stream().map(a -> Type.asType(a.dataType().nameUpper())).toList(),
result.schema().stream().map(Attribute::dataType).toList(),
result.pages(),
threadPool.getThreadContext().getResponseHeaders()
)
)
)
)
);
);
}));

return listener.get();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.esql.optimizer;

import org.apache.lucene.util.SetOnce;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.esql.EsqlTestUtils;
import org.elasticsearch.xpack.esql.core.expression.Alias;
import org.elasticsearch.xpack.esql.core.expression.Expression;
import org.elasticsearch.xpack.esql.core.expression.FoldContext;
import org.elasticsearch.xpack.esql.core.tree.Source;
import org.elasticsearch.xpack.esql.expression.function.scalar.string.Concat;
import org.elasticsearch.xpack.esql.expression.predicate.operator.arithmetic.Add;
import org.elasticsearch.xpack.esql.plan.logical.Eval;
import org.elasticsearch.xpack.esql.plan.logical.Filter;
import org.elasticsearch.xpack.esql.plan.logical.Limit;
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
import org.elasticsearch.xpack.esql.plan.logical.Project;

import java.util.List;

import static org.elasticsearch.xpack.esql.EsqlTestUtils.as;
import static org.elasticsearch.xpack.esql.EsqlTestUtils.fieldAttribute;
import static org.elasticsearch.xpack.esql.EsqlTestUtils.of;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.notNullValue;

public class LogicalPlanPreOptimizerTests extends ESTestCase {

public void testPlanIsMarkedAsPreOptimized() throws Exception {
for (int round = 0; round < 100; round++) {
// We want to make sure that the pre-optimizer woks for a wide range of plans
preOptimizedPlan(randomPlan());
}
}

public void testPreOptimizeFailsIfPlanIsNotAnalyzed() throws Exception {
LogicalPlan plan = EsqlTestUtils.relation();
SetOnce<Exception> exceptionHolder = new SetOnce<>();

preOptimizer().preOptimize(plan, ActionListener.wrap(r -> fail("Should have failed"), exceptionHolder::set));
assertBusy(() -> {
assertThat(exceptionHolder.get(), notNullValue());
IllegalStateException e = as(exceptionHolder.get(), IllegalStateException.class);
assertThat(e.getMessage(), equalTo("Expected analyzed plan"));
});
}

public LogicalPlan preOptimizedPlan(LogicalPlan plan) throws Exception {
// set plan as analyzed
plan.setPreOptimized();

SetOnce<LogicalPlan> resultHolder = new SetOnce<>();
SetOnce<Exception> exceptionHolder = new SetOnce<>();

preOptimizer().preOptimize(plan, ActionListener.wrap(resultHolder::set, exceptionHolder::set));

if (exceptionHolder.get() != null) {
throw exceptionHolder.get();
}

assertThat(resultHolder.get(), notNullValue());
assertThat(resultHolder.get().preOptimized(), equalTo(true));

return resultHolder.get();
}

private LogicalPlanPreOptimizer preOptimizer() {
LogicalPreOptimizerContext preOptimizerContext = new LogicalPreOptimizerContext(FoldContext.small());
return new LogicalPlanPreOptimizer(preOptimizerContext);
}

private LogicalPlan randomPlan() {
LogicalPlan plan = EsqlTestUtils.relation();
int numCommands = between(0, 100);

for (int i = 0; i < numCommands; i++) {
plan = switch (randomInt(3)) {
case 0 -> new Eval(Source.EMPTY, plan, List.of(new Alias(Source.EMPTY, randomIdentifier(), randomExpression())));
case 1 -> new Limit(Source.EMPTY, of(randomInt()), plan);
case 2 -> new Filter(Source.EMPTY, plan, randomCondition());
default -> new Project(Source.EMPTY, plan, List.of(new Alias(Source.EMPTY, randomIdentifier(), fieldAttribute())));
};
}
return plan;
}

private Expression randomExpression() {
return switch (randomInt(3)) {
case 0 -> of(randomInt());
case 1 -> of(randomIdentifier());
case 2 -> new Add(Source.EMPTY, of(randomInt()), of(randomDouble()));
default -> new Concat(Source.EMPTY, of(randomIdentifier()), randomList(1, 10, () -> of(randomIdentifier())));
};
}

private Expression randomCondition() {
if (randomBoolean()) {
return EsqlTestUtils.equalsOf(randomExpression(), randomExpression());
}

return EsqlTestUtils.greaterThanOf(randomExpression(), randomExpression());
}
}