Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions muted-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -435,9 +435,6 @@ tests:
- class: org.elasticsearch.xpack.esql.heap_attack.HeapAttackLookupJoinIT
method: testLookupExplosionBigString
issue: https://github.com/elastic/elasticsearch/issues/138510
- class: org.elasticsearch.xpack.esql.qa.single_node.GenerativeForkIT
method: test {csv-spec:inlinestats.MvMinMvExpand}
issue: https://github.com/elastic/elasticsearch/issues/137679
- class: org.elasticsearch.xpack.esql.optimizer.rules.physical.local.SubstituteRoundToTests
method: testSubqueryWithCountStarAndDateTrunc {default}
issue: https://github.com/elastic/elasticsearch/issues/138601
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.elasticsearch.xpack.esql.plan.logical.Eval;
import org.elasticsearch.xpack.esql.plan.logical.Filter;
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
import org.elasticsearch.xpack.esql.plan.logical.Project;
import org.elasticsearch.xpack.esql.plan.logical.local.EsqlProject;
import org.elasticsearch.xpack.esql.rule.ParameterizedRule;

Expand All @@ -34,9 +35,40 @@
import static org.elasticsearch.xpack.esql.core.expression.Attribute.rawTemporaryName;

/**
* Replaces vector similarity functions with a field attribute that applies
* the similarity function during value loading, when one side of the function is a literal.
* It also adds the new field function attribute to the EsRelation output, and adds a projection after it to remove it from the output.
* Replaces {@link Expression}s that can be pushed to field loading with a field attribute
* that calculates the expression during value loading. See {@link BlockLoaderExpression}
* for more about how these loads are implemented and why we do this.
* <p>
* This rule runs in one downward (aka output-to-read) pass, making four sorts
* of transformations:
* </p>
* <ul>
* <li>
* When we see a use of a <strong>new</strong> pushable function we build an
* attribute for the function, record that attribute, and discard it after use.
* For example, {@code EVAL l = LENGTH(message)} becomes
* {@code EVAL l = $$message$LENGTH$1324$$ | DROP $$message$LENGTH$1324$$ }.
* We need the {@code DROP} so we don't change the output schema.
* </li>
* <li>
* When we see a use of pushable function for which we already have an attribute
* we just use it. This looks like the {@code l} attribute in
* {@code EVAL l = LENGTH(message) | EVAL l2 = LENGTH(message)}
* </li>
* <li>
* When we see a PROJECT, add any new attributes to the projection so we can use
* them on previously visited nodes. So {@code KEEP foo | EVAL l = LENGTH(message)}
* becomes
* <pre>{@code
* | KEEP foo, $$message$LENGTH$1324$$
* | EVAL l = $$message$LENGTH$1324$$
* | DROP $$message$LENGTH$1324$$}
* }</pre>
* </li>
* <li>
* When we see a relation, add the attribute to it.
* </li>
* </ul>
*/
public class PushExpressionsToFieldLoad extends ParameterizedRule<LogicalPlan, LogicalPlan, LocalLogicalOptimizerContext> {

Expand All @@ -56,45 +88,46 @@ private class Rule {
* The primary indices, lazily initialized.
*/
private List<EsRelation> primaries;
private boolean planWasTransformed = false;
private boolean addedNewAttribute = false;

private Rule(LocalLogicalOptimizerContext context, LogicalPlan plan) {
this.context = context;
this.plan = plan;
}

private LogicalPlan doRule(LogicalPlan plan) {
planWasTransformed = false;
addedNewAttribute = false;
if (plan instanceof Eval || plan instanceof Filter || plan instanceof Aggregate) {
LogicalPlan transformedPlan = plan.transformExpressionsOnly(Expression.class, e -> {
if (e instanceof BlockLoaderExpression ble) {
return transformExpression(e, ble);
}
return e;
});
return transformPotentialInvocation(plan);
}
if (addedAttrs.isEmpty()) {
return plan;
}
if (plan instanceof Project project) {
return transformProject(project);
}
if (plan instanceof EsRelation rel) {
return transformRelation(rel);
}
return plan;
}

// TODO rebuild everything one time rather than after each find.
if (planWasTransformed == false) {
return plan;
private LogicalPlan transformPotentialInvocation(LogicalPlan plan) {
LogicalPlan transformedPlan = plan.transformExpressionsOnly(Expression.class, e -> {
if (e instanceof BlockLoaderExpression ble) {
return transformExpression(e, ble);
}

List<Attribute> previousAttrs = transformedPlan.output();
// Transforms EsRelation to extract the new attributes
List<Attribute> addedAttrsList = addedAttrs.values().stream().toList();
transformedPlan = transformedPlan.transformDown(EsRelation.class, esRelation -> {
AttributeSet updatedOutput = esRelation.outputSet().combine(AttributeSet.of(addedAttrsList));
return esRelation.withAttributes(updatedOutput.stream().toList());
});
// Transforms Projects so the new attribute is not discarded
transformedPlan = transformedPlan.transformDown(EsqlProject.class, esProject -> {
List<NamedExpression> projections = new ArrayList<>(esProject.projections());
projections.addAll(addedAttrsList);
return esProject.withProjections(projections);
});

return new EsqlProject(Source.EMPTY, transformedPlan, previousAttrs);
return e;
});
if (addedNewAttribute == false) {
/*
* Either didn't see anything pushable or everything pushable already
* has a pushed attribute.
*/
return plan;
}
return plan;
// Found a new pushable attribute, discard it *after* use so we don't modify the output.
return new EsqlProject(Source.EMPTY, transformedPlan, transformedPlan.output());
}

private Expression transformExpression(Expression e, BlockLoaderExpression ble) {
Expand All @@ -109,10 +142,23 @@ private Expression transformExpression(Expression e, BlockLoaderExpression ble)
if (context.searchStats().supportsLoaderConfig(fuse.field().fieldName(), fuse.config(), preference) == false) {
return e;
}
planWasTransformed = true;
addedNewAttribute = true;
return replaceFieldsForFieldTransformations(e, fuse);
}

private LogicalPlan transformProject(Project project) {
// Preserve any pushed attributes so we can use them later
List<NamedExpression> projections = new ArrayList<>(project.projections());
projections.addAll(addedAttrs.values());
return project.withProjections(projections);
}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doing this on Project helps here.


private LogicalPlan transformRelation(EsRelation rel) {
// Add the pushed attribute
AttributeSet updatedOutput = rel.outputSet().combine(AttributeSet.of(addedAttrs.values()));
return rel.withAttributes(updatedOutput.stream().toList());
}

private Expression replaceFieldsForFieldTransformations(Expression e, BlockLoaderExpression.PushedBlockLoaderExpression fuse) {
// Change the expression to a reference of the pushed down function on the field
FunctionEsField functionEsField = new FunctionEsField(fuse.field().field(), e.dataType(), fuse.config());
Expand Down