From e400761e8598612d4b9af96603b5115ec9ac38e0 Mon Sep 17 00:00:00 2001 From: Alexander Spies Date: Fri, 2 May 2025 19:19:21 +0200 Subject: [PATCH 1/5] Sketch solution --- .../esql/optimizer/LogicalPlanOptimizer.java | 2 + .../logical/PushDownJoinPastProject.java | 56 +++++++++++++++++++ .../optimizer/LogicalPlanOptimizerTests.java | 22 ++++++++ 3 files changed, 80 insertions(+) create mode 100644 x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/PushDownJoinPastProject.java diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizer.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizer.java index 92dc75ed2ad1a..615bdfe02fc6c 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizer.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizer.java @@ -43,6 +43,7 @@ import org.elasticsearch.xpack.esql.optimizer.rules.logical.PushDownCompletion; import org.elasticsearch.xpack.esql.optimizer.rules.logical.PushDownEnrich; import org.elasticsearch.xpack.esql.optimizer.rules.logical.PushDownEval; +import org.elasticsearch.xpack.esql.optimizer.rules.logical.PushDownJoinPastProject; import org.elasticsearch.xpack.esql.optimizer.rules.logical.PushDownRegexExtract; import org.elasticsearch.xpack.esql.optimizer.rules.logical.RemoveStatsOverride; import org.elasticsearch.xpack.esql.optimizer.rules.logical.ReplaceAggregateAggExpressionWithEval; @@ -199,6 +200,7 @@ protected static Batch operators() { new PushDownEval(), new PushDownRegexExtract(), new PushDownEnrich(), + new PushDownJoinPastProject(), new PushDownAndCombineOrderBy(), new PruneRedundantOrderBy(), new PruneRedundantSortClauses() diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/PushDownJoinPastProject.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/PushDownJoinPastProject.java new file mode 100644 index 0000000000000..0022a99c98ab2 --- /dev/null +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/PushDownJoinPastProject.java @@ -0,0 +1,56 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.esql.optimizer.rules.logical; + +import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan; +import org.elasticsearch.xpack.esql.plan.logical.Project; +import org.elasticsearch.xpack.esql.plan.logical.join.Join; + +/** + * This is different from e.g. {@link PushDownEnrich} because a {@link Join} is not a + * {@link org.elasticsearch.xpack.esql.plan.GeneratingPlan} as we cannot assign arbitrary names to the "added" columns, which is required + * to solve name conflicts while pushing down. (We could assign arbitrary qualifiers if we had them, however, to achieve the same.) + * + * This will turn + * \_Join[LEFT,[language_code],[language_code],[language_code]] + * |_Project[[language_code]] + * | \_EsRelation[test][other_field, language_code] + * \_EsRelation[languages_lookup][LOOKUP][language_code, language_name] + * + * into + * \_Project[[language_code, language_name]] + * \_Join[LEFT,[language_code],[language_code],[language_code]] <- LOOKUP JOIN languages_lookup ON language_code + * | \_EsRelation[test][other_field, language_code] + * \_EsRelation[languages_lookup][LOOKUP][language_code, language_name] + * + * which allows multiple {@link Project}s to be combined downstream so that determining which fields actually matter and need to be + * extraced becomes more precise. + * + * In case of name conflicts, a {@link Project} node has to remain upstream from the {@link Join} to perform a renaming; but we still get + * the benefit of precise field extractions as long as the {@link Project} can bubble downstream. + * + * E.g. + * \_Join[LEFT,[language_code],[language_code],[language_code]] + * |_Project[[language_code, language_name{f}#1, language_name{f}#1 AS foo]] <- conflict: language_name#1 shadowed by #2 but foo needs it + * | \_EsRelation[test][language_code, language_name{f}#1] + * \_EsRelation[languages_lookup][LOOKUP][language_code, language_name{f}#2] + * + * becomes + * \_Project[[language_code, $$temp$name AS foo, language_name{f}#2]] + * \_Join[LEFT,[language_code],[language_code],[language_code]] + * |_Project[[language_code, language_name{f}#1 AS $$temp$name]] <- we still need language_name#1 for foo + * | \_EsRelation[test][language_code, language_name{f}#1] + * \_EsRelation[languages_lookup][LOOKUP][language_code, language_name{f}#2] + */ +public final class PushDownJoinPastProject extends OptimizerRules.OptimizerRule { + @Override + protected LogicalPlan rule(Join join) { + // TODO: here + return join; + } +} diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizerTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizerTests.java index de56f0676b4a7..8619b88dd56e6 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizerTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizerTests.java @@ -6802,6 +6802,28 @@ public void testMultipleLookupShadowing() { var limit3 = asLimit(eval.child(), 1000, false); } + public void testMultipleLookupProject() { + // TODO a test case where pushing down past the RENAME would shadow + // analogous to + // Project[[x{f}#1, y{f}#2 as z, $$y{r}#3 as y]] + // \_Eval[[2 * x{f}#1 as $$y]] + assumeTrue("Requires LOOKUP JOIN", EsqlCapabilities.Cap.JOIN_LOOKUP_V12.isEnabled()); + + String query = """ + FROM test + | EVAL language_code = languages + | LOOKUP JOIN languages_lookup ON language_code + | RENAME language_name AS foo + | LOOKUP JOIN languages_lookup ON language_code + | DROP foo + """; + + var plan = optimizedPlan(query); + + // TODO: here + assert false; + } + // // // From 958659a9e6d36b878a5499fd3d303933909da8cc Mon Sep 17 00:00:00 2001 From: Alexander Spies Date: Mon, 5 May 2025 11:52:07 +0200 Subject: [PATCH 2/5] Sketch it out some more --- .../logical/PushDownJoinPastProject.java | 36 ++++++++++++++++--- .../xpack/esql/plan/logical/join/Join.java | 3 ++ 2 files changed, 35 insertions(+), 4 deletions(-) diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/PushDownJoinPastProject.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/PushDownJoinPastProject.java index 0022a99c98ab2..d9d39aa9ae05a 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/PushDownJoinPastProject.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/PushDownJoinPastProject.java @@ -7,11 +7,21 @@ package org.elasticsearch.xpack.esql.optimizer.rules.logical; +import org.elasticsearch.xpack.esql.core.expression.Attribute; +import org.elasticsearch.xpack.esql.core.expression.NamedExpression; +import org.elasticsearch.xpack.esql.plan.logical.Eval; import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan; import org.elasticsearch.xpack.esql.plan.logical.Project; import org.elasticsearch.xpack.esql.plan.logical.join.Join; +import org.elasticsearch.xpack.esql.plan.logical.join.JoinTypes; + +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; /** + * Pushdown a LEFT {@link Join} past a {@link Project} in its left child. + * * This is different from e.g. {@link PushDownEnrich} because a {@link Join} is not a * {@link org.elasticsearch.xpack.esql.plan.GeneratingPlan} as we cannot assign arbitrary names to the "added" columns, which is required * to solve name conflicts while pushing down. (We could assign arbitrary qualifiers if we had them, however, to achieve the same.) @@ -31,7 +41,8 @@ * which allows multiple {@link Project}s to be combined downstream so that determining which fields actually matter and need to be * extraced becomes more precise. * - * In case of name conflicts, a {@link Project} node has to remain upstream from the {@link Join} to perform a renaming; but we still get + * There can be name conflicts, where a field added to the left output via the {@link Join} shadows a field that was used to perform a + * rename before. In such cases, an {@link Eval} node has to remain upstream from the {@link Join} to perform the rename. But we still get * the benefit of precise field extractions as long as the {@link Project} can bubble downstream. * * E.g. @@ -41,16 +52,33 @@ * \_EsRelation[languages_lookup][LOOKUP][language_code, language_name{f}#2] * * becomes - * \_Project[[language_code, $$temp$name AS foo, language_name{f}#2]] + * \_Project[[language_code, foo, language_name{f}#2]] * \_Join[LEFT,[language_code],[language_code],[language_code]] - * |_Project[[language_code, language_name{f}#1 AS $$temp$name]] <- we still need language_name#1 for foo + * |_Eval[[language_name{f}#1 AS foo]] <- we still need language_name#1 for foo * | \_EsRelation[test][language_code, language_name{f}#1] * \_EsRelation[languages_lookup][LOOKUP][language_code, language_name{f}#2] */ public final class PushDownJoinPastProject extends OptimizerRules.OptimizerRule { @Override protected LogicalPlan rule(Join join) { - // TODO: here + if (join.left() instanceof Project projectChild && JoinTypes.LEFT.equals(join.config().type())) { + // Collect the output of the Join; we'll construct the downstream Project from this. + List finalOutput = join.output(); + + // Find out which fields the Join adds to the fields of the left input. + Set rightOutputNames = join.rightOutputFields().stream().map(NamedExpression::name).collect(Collectors.toSet()); + // Collect the renames from the Project into an AttributeMap to resolve the Join's references. + // TODO + + // Also resolve the final projections; some of these can resolve to an attribute that is shadowed by the right fields. + // For such fields, we need to create a synthetic attribute with a temporary name and add an Eval upstream from the Join to + // preserve these attributes. + + // TODO: here + return join; + } + return join; + } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/join/Join.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/join/Join.java index 5e1afe1452d99..3db551d317791 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/join/Join.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/join/Join.java @@ -113,6 +113,9 @@ public AttributeSet rightReferences() { return Expressions.references(config().rightFields()); } + /** + * The output fields obtained from the right child. + */ public List rightOutputFields() { AttributeSet leftInputs = left().outputSet(); From d1961d03d69fc93d61357b38dd73ebc41715bc49 Mon Sep 17 00:00:00 2001 From: Alexander Spies Date: Mon, 5 May 2025 12:53:04 +0200 Subject: [PATCH 3/5] Start another approach --- .../lucene/ValuesSourceReaderOperator.java | 1 + .../esql/enrich/AbstractLookupService.java | 17 +++-- .../logical/PushDownJoinPastProject.java | 67 ++----------------- .../esql/planner/LocalExecutionPlanner.java | 3 +- 4 files changed, 18 insertions(+), 70 deletions(-) diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/ValuesSourceReaderOperator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/ValuesSourceReaderOperator.java index 57819d6d7e041..7609ba2aa8488 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/ValuesSourceReaderOperator.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/ValuesSourceReaderOperator.java @@ -105,6 +105,7 @@ public String describe() { * {@code blockLoader} maps shard index to the {@link BlockLoader}s * which load the actual blocks. */ + // TODO: enforce a FieldName at compile time public record FieldInfo(String name, ElementType type, IntFunction blockLoader) {} public record ShardContext(IndexReader reader, Supplier newSourceLoader, double storedFieldsSequentialProportion) {} diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/AbstractLookupService.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/AbstractLookupService.java index 42d03f4e1b161..f4bf5894a68a1 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/AbstractLookupService.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/AbstractLookupService.java @@ -68,6 +68,7 @@ import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.esql.EsqlIllegalArgumentException; import org.elasticsearch.xpack.esql.core.expression.Alias; +import org.elasticsearch.xpack.esql.core.expression.FieldAttribute; import org.elasticsearch.xpack.esql.core.expression.NamedExpression; import org.elasticsearch.xpack.esql.core.tree.Source; import org.elasticsearch.xpack.esql.core.type.DataType; @@ -388,22 +389,20 @@ private static Operator extractFieldsOperator( ) { List fields = new ArrayList<>(extractFields.size()); for (NamedExpression extractField : extractFields) { + BlockLoader loader = shardContext.blockLoader( extractField instanceof Alias a ? ((NamedExpression) a.child()).name() : extractField.name(), extractField.dataType() == DataType.UNSUPPORTED, MappedFieldType.FieldExtractPreference.NONE ); + String physicalName = extractField instanceof FieldAttribute fa ? fa.fieldName() : extractField.name(); fields.add( - new ValuesSourceReaderOperator.FieldInfo( - extractField.name(), - PlannerUtils.toElementType(extractField.dataType()), - shardIdx -> { - if (shardIdx != 0) { - throw new IllegalStateException("only one shard"); - } - return loader; + new ValuesSourceReaderOperator.FieldInfo(physicalName, PlannerUtils.toElementType(extractField.dataType()), shardIdx -> { + if (shardIdx != 0) { + throw new IllegalStateException("only one shard"); } - ) + return loader; + }) ); } return new ValuesSourceReaderOperator( diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/PushDownJoinPastProject.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/PushDownJoinPastProject.java index d9d39aa9ae05a..56047b4f2931e 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/PushDownJoinPastProject.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/PushDownJoinPastProject.java @@ -7,74 +7,21 @@ package org.elasticsearch.xpack.esql.optimizer.rules.logical; -import org.elasticsearch.xpack.esql.core.expression.Attribute; -import org.elasticsearch.xpack.esql.core.expression.NamedExpression; -import org.elasticsearch.xpack.esql.plan.logical.Eval; +import org.elasticsearch.index.IndexMode; +import org.elasticsearch.xpack.esql.plan.logical.EsRelation; import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan; import org.elasticsearch.xpack.esql.plan.logical.Project; import org.elasticsearch.xpack.esql.plan.logical.join.Join; import org.elasticsearch.xpack.esql.plan.logical.join.JoinTypes; -import java.util.List; -import java.util.Set; -import java.util.stream.Collectors; - -/** - * Pushdown a LEFT {@link Join} past a {@link Project} in its left child. - * - * This is different from e.g. {@link PushDownEnrich} because a {@link Join} is not a - * {@link org.elasticsearch.xpack.esql.plan.GeneratingPlan} as we cannot assign arbitrary names to the "added" columns, which is required - * to solve name conflicts while pushing down. (We could assign arbitrary qualifiers if we had them, however, to achieve the same.) - * - * This will turn - * \_Join[LEFT,[language_code],[language_code],[language_code]] - * |_Project[[language_code]] - * | \_EsRelation[test][other_field, language_code] - * \_EsRelation[languages_lookup][LOOKUP][language_code, language_name] - * - * into - * \_Project[[language_code, language_name]] - * \_Join[LEFT,[language_code],[language_code],[language_code]] <- LOOKUP JOIN languages_lookup ON language_code - * | \_EsRelation[test][other_field, language_code] - * \_EsRelation[languages_lookup][LOOKUP][language_code, language_name] - * - * which allows multiple {@link Project}s to be combined downstream so that determining which fields actually matter and need to be - * extraced becomes more precise. - * - * There can be name conflicts, where a field added to the left output via the {@link Join} shadows a field that was used to perform a - * rename before. In such cases, an {@link Eval} node has to remain upstream from the {@link Join} to perform the rename. But we still get - * the benefit of precise field extractions as long as the {@link Project} can bubble downstream. - * - * E.g. - * \_Join[LEFT,[language_code],[language_code],[language_code]] - * |_Project[[language_code, language_name{f}#1, language_name{f}#1 AS foo]] <- conflict: language_name#1 shadowed by #2 but foo needs it - * | \_EsRelation[test][language_code, language_name{f}#1] - * \_EsRelation[languages_lookup][LOOKUP][language_code, language_name{f}#2] - * - * becomes - * \_Project[[language_code, foo, language_name{f}#2]] - * \_Join[LEFT,[language_code],[language_code],[language_code]] - * |_Eval[[language_name{f}#1 AS foo]] <- we still need language_name#1 for foo - * | \_EsRelation[test][language_code, language_name{f}#1] - * \_EsRelation[languages_lookup][LOOKUP][language_code, language_name{f}#2] - */ public final class PushDownJoinPastProject extends OptimizerRules.OptimizerRule { @Override protected LogicalPlan rule(Join join) { - if (join.left() instanceof Project projectChild && JoinTypes.LEFT.equals(join.config().type())) { - // Collect the output of the Join; we'll construct the downstream Project from this. - List finalOutput = join.output(); - - // Find out which fields the Join adds to the fields of the left input. - Set rightOutputNames = join.rightOutputFields().stream().map(NamedExpression::name).collect(Collectors.toSet()); - // Collect the renames from the Project into an AttributeMap to resolve the Join's references. - // TODO - - // Also resolve the final projections; some of these can resolve to an attribute that is shadowed by the right fields. - // For such fields, we need to create a synthetic attribute with a temporary name and add an Eval upstream from the Join to - // preserve these attributes. - - // TODO: here + if (join.left() instanceof Project projectChild + && JoinTypes.LEFT.equals(join.config().type()) + && join.right() instanceof EsRelation lookupIndex + && lookupIndex.indexMode() == IndexMode.LOOKUP) { + // TODO: refactor the relevant part of pushGeneratingPlanPastProjectAndOrderBy and re-use it here. return join; } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java index abcd0ec1318ed..6ec395977dcf7 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java @@ -718,7 +718,8 @@ private PhysicalOperation planLookupJoin(LookupJoinExec join, LocalExecutionPlan private record MatchConfig(String fieldName, int channel, DataType type) { private MatchConfig(FieldAttribute match, Layout.ChannelAndType input) { // Note, this handles TEXT fields with KEYWORD subfields - this(match.exactAttribute().name(), input.channel(), input.type()); + // TODO: This probably also led to bugs for LOOKUP JOIN on a union typed field, let's add a test. + this(match.exactAttribute().fieldName(), input.channel(), input.type()); } } From d15f5e4a9cdc2ecbe4a325f87b05b1ca9255f053 Mon Sep 17 00:00:00 2001 From: Alexander Spies Date: Tue, 6 May 2025 18:49:58 +0200 Subject: [PATCH 4/5] Implement the optimization and add a csv test --- .../esql/core/expression/Expressions.java | 4 - .../esql/core/expression/FieldAttribute.java | 2 +- .../src/main/resources/lookup-join.csv-spec | 24 +++ .../xpack/esql/action/EsqlCapabilities.java | 5 + .../esql/enrich/AbstractLookupService.java | 20 ++- .../logical/PushDownJoinPastProject.java | 157 +++++++++++++++++- .../xpack/esql/planner/TranslatorHandler.java | 2 +- .../optimizer/LogicalPlanOptimizerTests.java | 15 ++ 8 files changed, 214 insertions(+), 15 deletions(-) diff --git a/x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/expression/Expressions.java b/x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/expression/Expressions.java index 0ccc0d74f7025..821ef6b91105d 100644 --- a/x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/expression/Expressions.java +++ b/x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/expression/Expressions.java @@ -20,10 +20,6 @@ public final class Expressions { private Expressions() {} - public static NamedExpression wrapAsNamed(Expression exp) { - return exp instanceof NamedExpression ne ? ne : new Alias(exp.source(), exp.sourceText(), exp); - } - public static List asAttributes(List named) { if (named.isEmpty()) { return emptyList(); diff --git a/x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/expression/FieldAttribute.java b/x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/expression/FieldAttribute.java index f8ddb43dcac8c..f28e29927b78a 100644 --- a/x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/expression/FieldAttribute.java +++ b/x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/expression/FieldAttribute.java @@ -208,7 +208,7 @@ public FieldAttribute exactAttribute() { } private FieldAttribute innerField(EsField type) { - return new FieldAttribute(source(), name(), name() + "." + type.getName(), type, nullable(), id(), synthetic()); + return new FieldAttribute(source(), fieldName(), name() + "." + type.getName(), type, nullable(), id(), synthetic()); } @Override diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/lookup-join.csv-spec b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/lookup-join.csv-spec index b9c65707146ff..4b85b21580470 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/lookup-join.csv-spec +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/lookup-join.csv-spec @@ -144,6 +144,30 @@ emp_no:integer | language_code:integer | language_name:keyword 10030 |0 | null ; +multipleLookupsAndProjects +required_capability: join_lookup_enable_pushdown_past_project +# TODO: this test, but with a lookup index that has multiple matches per row to show the first LOOKUP's effect, too. +# TODO: tests like this that will look up the exact subfield of a text field + +FROM employees +| KEEP languages, emp_no +| EVAL language_code = languages +| LOOKUP JOIN languages_lookup ON language_code +| RENAME language_name AS foo +| LOOKUP JOIN languages_lookup ON language_code +| DROP foo +| SORT emp_no +| LIMIT 5 +; + +languages:integer | emp_no:integer | language_code:integer | language_name:keyword +2 | 10001 | 2 | French +5 | 10002 | 5 | null +4 | 10003 | 4 | German +5 | 10004 | 5 | null +1 | 10005 | 1 | English +; + ########################################################################### # multiple match behavior with languages_lookup_non_unique_key index ########################################################################### diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java index b201ab7cb4afe..ee2cc9c9c47c6 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java @@ -782,6 +782,11 @@ public enum Cap { */ JOIN_LOOKUP_FIX_LIMIT_PUSHDOWN(JOIN_LOOKUP_V12.isEnabled()), + /** + * Enable Projects (KEEP/DROP and aliasing EVALs) to be pulled up past a LOOKUP JOIN. + */ + JOIN_LOOKUP_ENABLE_PUSHDOWN_PAST_PROJECT(JOIN_LOOKUP_V12.isEnabled()), + /** * Fix for https://github.com/elastic/elasticsearch/issues/117054 */ diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/AbstractLookupService.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/AbstractLookupService.java index f4bf5894a68a1..b9899f014dd18 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/AbstractLookupService.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/AbstractLookupService.java @@ -389,20 +389,26 @@ private static Operator extractFieldsOperator( ) { List fields = new ArrayList<>(extractFields.size()); for (NamedExpression extractField : extractFields) { + String physicalName = extractField instanceof FieldAttribute fa ? fa.fieldName() + : extractField instanceof Alias a ? ((NamedExpression) a.child()).name() + : extractField.name(); BlockLoader loader = shardContext.blockLoader( - extractField instanceof Alias a ? ((NamedExpression) a.child()).name() : extractField.name(), + physicalName, extractField.dataType() == DataType.UNSUPPORTED, MappedFieldType.FieldExtractPreference.NONE ); - String physicalName = extractField instanceof FieldAttribute fa ? fa.fieldName() : extractField.name(); fields.add( - new ValuesSourceReaderOperator.FieldInfo(physicalName, PlannerUtils.toElementType(extractField.dataType()), shardIdx -> { - if (shardIdx != 0) { - throw new IllegalStateException("only one shard"); + new ValuesSourceReaderOperator.FieldInfo( + extractField.name(), + PlannerUtils.toElementType(extractField.dataType()), + shardIdx -> { + if (shardIdx != 0) { + throw new IllegalStateException("only one shard"); + } + return loader; } - return loader; - }) + ) ); } return new ValuesSourceReaderOperator( diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/PushDownJoinPastProject.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/PushDownJoinPastProject.java index 56047b4f2931e..35df21b694097 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/PushDownJoinPastProject.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/PushDownJoinPastProject.java @@ -7,22 +7,175 @@ package org.elasticsearch.xpack.esql.optimizer.rules.logical; +import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.index.IndexMode; +import org.elasticsearch.xpack.esql.core.expression.Attribute; +import org.elasticsearch.xpack.esql.core.expression.AttributeSet; +import org.elasticsearch.xpack.esql.core.expression.FieldAttribute; +import org.elasticsearch.xpack.esql.core.expression.NameId; +import org.elasticsearch.xpack.esql.core.tree.NodeInfo; +import org.elasticsearch.xpack.esql.core.tree.Source; +import org.elasticsearch.xpack.esql.plan.GeneratingPlan; import org.elasticsearch.xpack.esql.plan.logical.EsRelation; import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan; import org.elasticsearch.xpack.esql.plan.logical.Project; +import org.elasticsearch.xpack.esql.plan.logical.UnaryPlan; import org.elasticsearch.xpack.esql.plan.logical.join.Join; +import org.elasticsearch.xpack.esql.plan.logical.join.JoinType; import org.elasticsearch.xpack.esql.plan.logical.join.JoinTypes; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +/** + * Pushing down {@link Join}s past {@link Project}s has the benefit that field extraction can happen later. Also, once bubbled downstream, + * multiple projects can be combined which can eliminate some fields altogether (c.f. + * {@link org.elasticsearch.xpack.esql.optimizer.rules.physical.local.InsertFieldExtraction}). Even just field extractions before joins are + * expensive because joins create new rows in case of multiple matches, which means that the extracted columns need to be deeply copied + * and blown up. + * + * This follows the same approach as {@link PushDownUtils#pushGeneratingPlanPastProjectAndOrderBy(UnaryPlan)}. To deal with name conflicts, + * we rename the fields that a {@code LOOKUP JOIN} "generates" by renaming the {@link FieldAttribute}s in the join's right hand side + * {@link EsRelation}. Once we have qualifiers, this can be simplified by just assigning temporary qualifiers and later stripping them away. + */ public final class PushDownJoinPastProject extends OptimizerRules.OptimizerRule { + + /** + * Wrapper class for a {@link Join} representing a {@code LOOKUP JOIN}, so we can treat it as if it was a {@link UnaryPlan} that is also + * a {@link GeneratingPlan}; + */ + private class JoinAsUnaryGeneratingPlan extends UnaryPlan implements GeneratingPlan { + private final Join lookupJoin; + private List lazyGeneratedAttributes; + + JoinAsUnaryGeneratingPlan(Join lookupJoin) { + super(lookupJoin.source(), lookupJoin.left()); + this.lookupJoin = lookupJoin; + } + + private JoinAsUnaryGeneratingPlan( + Source source, + LogicalPlan left, + LogicalPlan right, + JoinType type, + List matchFields, + List leftFields, + List rightFields + ) { + this(new Join(source, left, right, type, matchFields, leftFields, rightFields)); + } + + Join unwrap() { + return lookupJoin; + } + + @Override + public UnaryPlan replaceChild(LogicalPlan newChild) { + return new JoinAsUnaryGeneratingPlan(lookupJoin.replaceChildren(newChild, lookupJoin.right())); + } + + @Override + public boolean expressionsResolved() { + return lookupJoin.expressionsResolved(); + } + + @Override + protected NodeInfo info() { + return NodeInfo.create( + this, + JoinAsUnaryGeneratingPlan::new, + lookupJoin.left(), + lookupJoin.right(), + lookupJoin.config().type(), + lookupJoin.config().matchFields(), + lookupJoin.config().leftFields(), + lookupJoin.config().rightFields() + ); + } + + @Override + public String getWriteableName() { + throw new UnsupportedOperationException("lives only for a single optimizer rule application"); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + throw new UnsupportedOperationException("lives only for a single optimizer rule application"); + } + + @Override + public List generatedAttributes() { + if (lazyGeneratedAttributes == null) { + lazyGeneratedAttributes = lookupJoin.rightOutputFields(); + } + return lazyGeneratedAttributes; + } + + @Override + public JoinAsUnaryGeneratingPlan withGeneratedNames(List newNames) { + checkNumberOfNewNames(newNames); + + if (lookupJoin.right() instanceof EsRelation esRelation) { + AttributeSet generatedSet = AttributeSet.of(generatedAttributes()); + int numOutputAttributes = esRelation.output().size(); + List newAttributes = new ArrayList<>(numOutputAttributes); + // The match field from the LOOKUP JOIN's right hand side EsRelation is not added to the output from the left hand side. + // It's not part of the "generated" attributes and needs to be skipped. + int newNamesIndex = 0; + for (Attribute attr : esRelation.output()) { + if (generatedSet.contains(attr)) { + String newName = newNames.get(newNamesIndex++); + if (newName.equals(attr.name())) { + newAttributes.add(attr); + } else { + newAttributes.add(attr.withName(newName).withId(new NameId())); + } + } else { + newAttributes.add(attr); + } + } + + assert newAttributes.size() == numOutputAttributes; + return new JoinAsUnaryGeneratingPlan( + lookupJoin.replaceChildren(lookupJoin.left(), esRelation.withAttributes(newAttributes)) + ); + } + throw new IllegalStateException( + "right hand side of LOOKUP JOIN must be a relation, found [" + lookupJoin.right().getClass() + "]" + ); + } + + @Override + public int hashCode() { + return lookupJoin.hashCode(); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null || getClass() != obj.getClass()) { + return false; + } + JoinAsUnaryGeneratingPlan other = (JoinAsUnaryGeneratingPlan) obj; + + return lookupJoin.equals(other.lookupJoin); + } + } + @Override protected LogicalPlan rule(Join join) { if (join.left() instanceof Project projectChild && JoinTypes.LEFT.equals(join.config().type()) && join.right() instanceof EsRelation lookupIndex && lookupIndex.indexMode() == IndexMode.LOOKUP) { - // TODO: refactor the relevant part of pushGeneratingPlanPastProjectAndOrderBy and re-use it here. - return join; + + var joinAsGeneratingUnary = new JoinAsUnaryGeneratingPlan(join); + var pushedDown = PushDownUtils.pushGeneratingPlanPastProjectAndOrderBy(joinAsGeneratingUnary); + + return pushedDown.transformDown(JoinAsUnaryGeneratingPlan.class, JoinAsUnaryGeneratingPlan::unwrap); } return join; diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/TranslatorHandler.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/TranslatorHandler.java index 4b7af5bf49de8..5c80ea048cf81 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/TranslatorHandler.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/TranslatorHandler.java @@ -46,7 +46,7 @@ private static Query wrapFunctionQuery(Expression field, Query query) { } if (field instanceof FieldAttribute fa) { fa = fa.getExactInfo().hasExact() ? fa.exactAttribute() : fa; - return new SingleValueQuery(query, fa.name(), false); + return new SingleValueQuery(query, fa.fieldName(), false); } if (field instanceof MetadataAttribute) { return query; // MetadataAttributes are always single valued diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizerTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizerTests.java index 8619b88dd56e6..8796e1fccd743 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizerTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizerTests.java @@ -6802,6 +6802,20 @@ public void testMultipleLookupShadowing() { var limit3 = asLimit(eval.child(), 1000, false); } + /** + * Expects + * + * EsqlProject[[languages{f}#16, emp_no{f}#13, languages{f}#16 AS language_code, $$language_name$temp_name$28{f}#29 AS langua + * ge_name]] + * \_Limit[1000[INTEGER],true] + * \_Join[LEFT,[languages{f}#16],[languages{f}#16],[language_code{f}#26]] + * |_Limit[1000[INTEGER],true] + * | \_Join[LEFT,[languages{f}#16],[languages{f}#16],[language_code{f}#24]] + * | |_Limit[1000[INTEGER],false] + * | | \_EsRelation[test][_meta_field{f}#19, emp_no{f}#13, first_name{f}#14, gender{f}#15, hire_d..] + * | \_EsRelation[languages_lookup][LOOKUP][language_code{f}#24] + * \_EsRelation[languages_lookup][LOOKUP][language_code{f}#26, $$language_name$temp_name$28{f}#29] + */ public void testMultipleLookupProject() { // TODO a test case where pushing down past the RENAME would shadow // analogous to @@ -6811,6 +6825,7 @@ public void testMultipleLookupProject() { String query = """ FROM test + | KEEP languages, emp_no | EVAL language_code = languages | LOOKUP JOIN languages_lookup ON language_code | RENAME language_name AS foo From f810bea75d7ca03082b734aa8aafe80efcb5bf2b Mon Sep 17 00:00:00 2001 From: Alexander Spies Date: Wed, 7 May 2025 09:33:48 +0200 Subject: [PATCH 5/5] Update required capability for test --- .../esql/qa/testFixtures/src/main/resources/lookup-join.csv-spec | 1 + 1 file changed, 1 insertion(+) diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/lookup-join.csv-spec b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/lookup-join.csv-spec index 4b85b21580470..fdeeb3fc90e36 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/lookup-join.csv-spec +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/lookup-join.csv-spec @@ -145,6 +145,7 @@ emp_no:integer | language_code:integer | language_name:keyword ; multipleLookupsAndProjects +required_capability: join_lookup_v12 required_capability: join_lookup_enable_pushdown_past_project # TODO: this test, but with a lookup index that has multiple matches per row to show the first LOOKUP's effect, too. # TODO: tests like this that will look up the exact subfield of a text field