From 47150d48bb29c18e979f603c05965b4b4c59c542 Mon Sep 17 00:00:00 2001 From: Alexander Spies Date: Thu, 23 Oct 2025 18:04:08 +0200 Subject: [PATCH] CardinalityPreserving->Streaming + reword (#137041) * Rename CardinalityPreserving -> Streaming * Reword the definition and give a precise, formal version of it so we can more easily check it against new commands. --- .../rules/logical/HoistRemoteEnrichLimit.java | 6 +- .../rules/logical/HoistRemoteEnrichTopN.java | 6 +- .../plan/logical/CardinalityPreserving.java | 34 ----------- .../xpack/esql/plan/logical/Drop.java | 2 +- .../xpack/esql/plan/logical/Enrich.java | 2 +- .../xpack/esql/plan/logical/Eval.java | 2 +- .../xpack/esql/plan/logical/Insist.java | 2 +- .../xpack/esql/plan/logical/Keep.java | 2 +- .../xpack/esql/plan/logical/Project.java | 2 +- .../xpack/esql/plan/logical/RegexExtract.java | 2 +- .../xpack/esql/plan/logical/Rename.java | 2 +- .../xpack/esql/plan/logical/Streaming.java | 56 +++++++++++++++++++ .../plan/logical/inference/InferencePlan.java | 4 +- 13 files changed, 72 insertions(+), 50 deletions(-) delete mode 100644 x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/CardinalityPreserving.java create mode 100644 x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Streaming.java diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/HoistRemoteEnrichLimit.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/HoistRemoteEnrichLimit.java index 8915777e3ec36..e6e46596bab10 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/HoistRemoteEnrichLimit.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/HoistRemoteEnrichLimit.java @@ -8,12 +8,12 @@ package org.elasticsearch.xpack.esql.optimizer.rules.logical; import org.elasticsearch.xpack.esql.optimizer.LogicalOptimizerContext; -import org.elasticsearch.xpack.esql.plan.logical.CardinalityPreserving; import org.elasticsearch.xpack.esql.plan.logical.Enrich; import org.elasticsearch.xpack.esql.plan.logical.ExecutesOn; import org.elasticsearch.xpack.esql.plan.logical.Limit; import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan; import org.elasticsearch.xpack.esql.plan.logical.PipelineBreaker; +import org.elasticsearch.xpack.esql.plan.logical.Streaming; import java.util.Collections; import java.util.Comparator; @@ -46,8 +46,8 @@ protected LogicalPlan rule(Enrich en, LogicalOptimizerContext ctx) { seenLimits.add(l); return; } - if ((p instanceof CardinalityPreserving) == false // can change the number of rows, so we can't just pull a limit from - // under it + if ((p instanceof Streaming) == false // can change the number of rows, so we can't just pull a limit from + // under it // this will fail the verifier anyway, so no need to continue || (p instanceof ExecutesOn ex && ex.executesOn() == ExecutesOn.ExecuteLocation.COORDINATOR) // This is essentially another remote enrich - let it take care of its own limits diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/HoistRemoteEnrichTopN.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/HoistRemoteEnrichTopN.java index df13ca20b954b..7ddb464b50b0e 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/HoistRemoteEnrichTopN.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/HoistRemoteEnrichTopN.java @@ -15,13 +15,13 @@ import org.elasticsearch.xpack.esql.core.expression.NamedExpression; import org.elasticsearch.xpack.esql.core.util.Holder; import org.elasticsearch.xpack.esql.expression.Order; -import org.elasticsearch.xpack.esql.plan.logical.CardinalityPreserving; import org.elasticsearch.xpack.esql.plan.logical.Enrich; import org.elasticsearch.xpack.esql.plan.logical.Eval; import org.elasticsearch.xpack.esql.plan.logical.ExecutesOn; import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan; import org.elasticsearch.xpack.esql.plan.logical.PipelineBreaker; import org.elasticsearch.xpack.esql.plan.logical.Project; +import org.elasticsearch.xpack.esql.plan.logical.Streaming; import org.elasticsearch.xpack.esql.plan.logical.TopN; import org.elasticsearch.xpack.esql.plan.logical.UnaryPlan; @@ -132,8 +132,8 @@ protected LogicalPlan rule(Enrich en) { return new Project(en.source(), copyTop, outputs); } } - if ((plan instanceof CardinalityPreserving) == false // can change the number of rows, so we can't just pull a TopN from - // under it + if ((plan instanceof Streaming) == false // can change the number of rows, so we can't just pull a TopN from + // under it // this will fail the verifier anyway, so no need to continue || (plan instanceof ExecutesOn ex && ex.executesOn() == ExecutesOn.ExecuteLocation.COORDINATOR) // This is another remote Enrich, it can handle its own limits diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/CardinalityPreserving.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/CardinalityPreserving.java deleted file mode 100644 index 89e554989db28..0000000000000 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/CardinalityPreserving.java +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License - * 2.0; you may not use this file except in compliance with the Elastic License - * 2.0. - */ - -package org.elasticsearch.xpack.esql.plan.logical; - -/** - * This interface marks a command which does not add or remove rows, and is neutral towards limit aggregation. - * This means that this sequence: - * ``` - * ... LIMIT X | MY_COMMAND - * ``` - * is safe to replace with this sequence: - * ``` - * ... local LIMIT X | MY_COMMAND | LIMIT X - * Where the local limit is applied only on the node. - * ``` - * It is not true, for example, for WHERE: - * ``` - * ... LIMIT X | WHERE side="dark" - * ``` - * If the first X rows do not contain any "dark" rows, the result is empty, however if we switch: - * ``` - * ... local LIMIT X | WHERE side="dark" | LIMIT X - * ``` - * and we have N nodes, then the first N*X rows may contain "dark" rows, and the final result is not empty in this case. - *
- * This property is important for processing Limit and TopN with remote operations such as remote ENRICH, since it allows - * us to localize the limits without changing the semantics. - */ -public interface CardinalityPreserving {} diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Drop.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Drop.java index 6412aa1890994..b40b690514b92 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Drop.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Drop.java @@ -17,7 +17,7 @@ import java.util.List; import java.util.Objects; -public class Drop extends UnaryPlan implements TelemetryAware, CardinalityPreserving, SortAgnostic { +public class Drop extends UnaryPlan implements TelemetryAware, Streaming, SortAgnostic { private final List removals; public Drop(Source source, LogicalPlan child, List removals) { diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Enrich.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Enrich.java index 89d7c5ba81234..cf8b0b5d3e709 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Enrich.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Enrich.java @@ -51,7 +51,7 @@ public class Enrich extends UnaryPlan PostOptimizationVerificationAware.CoordinatorOnly, PostAnalysisVerificationAware, TelemetryAware, - CardinalityPreserving, + Streaming, SortAgnostic, ExecutesOn { public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry( diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Eval.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Eval.java index d9ac0e720dd1f..33efd1d9d0806 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Eval.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Eval.java @@ -41,7 +41,7 @@ public class Eval extends UnaryPlan GeneratingPlan, PostAnalysisVerificationAware, TelemetryAware, - CardinalityPreserving, + Streaming, SortAgnostic { public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(LogicalPlan.class, "Eval", Eval::new); diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Insist.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Insist.java index 8601ef456bef9..63ca4e1099a5f 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Insist.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Insist.java @@ -18,7 +18,7 @@ import java.util.List; import java.util.Objects; -public class Insist extends UnaryPlan implements SurrogateLogicalPlan, CardinalityPreserving { +public class Insist extends UnaryPlan implements SurrogateLogicalPlan, Streaming { private final List insistedAttributes; private @Nullable List lazyOutput = null; diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Keep.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Keep.java index 4428e5a4ebe34..f680ef9fbc64e 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Keep.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Keep.java @@ -15,7 +15,7 @@ import java.util.List; import java.util.Objects; -public class Keep extends Project implements TelemetryAware, CardinalityPreserving, SortAgnostic { +public class Keep extends Project implements TelemetryAware, Streaming, SortAgnostic { public Keep(Source source, LogicalPlan child, List projections) { super(source, child, projections); diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Project.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Project.java index 74a99a0212349..e9334b1f99fbc 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Project.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Project.java @@ -27,7 +27,7 @@ /** * A {@code Project} is a {@code Plan} with one child. In {@code FROM idx | KEEP x, y}, the {@code KEEP} statement is a Project. */ -public class Project extends UnaryPlan implements CardinalityPreserving, SortAgnostic { +public class Project extends UnaryPlan implements Streaming, SortAgnostic { public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(LogicalPlan.class, "Project", Project::new); private final List projections; diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/RegexExtract.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/RegexExtract.java index 04720da6dca26..9bb8e1a70e3f8 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/RegexExtract.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/RegexExtract.java @@ -28,7 +28,7 @@ public abstract class RegexExtract extends UnaryPlan implements GeneratingPlan, PostAnalysisVerificationAware, - CardinalityPreserving, + Streaming, SortAgnostic { protected final Expression input; protected final List extractedFields; diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Rename.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Rename.java index 9fe49b50eeee3..e89319da3d1d4 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Rename.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Rename.java @@ -22,7 +22,7 @@ import java.util.List; import java.util.Objects; -public class Rename extends UnaryPlan implements TelemetryAware, CardinalityPreserving, SortAgnostic { +public class Rename extends UnaryPlan implements TelemetryAware, Streaming, SortAgnostic { private final List renamings; diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Streaming.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Streaming.java new file mode 100644 index 0000000000000..e1e025aecc3a4 --- /dev/null +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Streaming.java @@ -0,0 +1,56 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.esql.plan.logical; + +/** + * This interface marks commands which do not add or remove rows and aren't sensitive to the exact order of the rows. + * This is required to decide whether a command is compatible with + * {@link org.elasticsearch.xpack.esql.optimizer.rules.logical.HoistRemoteEnrichLimit} and + * {@link org.elasticsearch.xpack.esql.optimizer.rules.logical.HoistRemoteEnrichTopN}. + *

+ * For the most part, such commands can be thought of to be operating on a row-by-row basis. A more formal definition is that this command + * can be run on data nodes and this sequence + *

{@code
+ *  ... LIMIT X | MY_COMMAND
+ *  }
+ * is safe to be replaced by this sequence + *
{@code
+ *  ... local LIMIT X | MY_COMMAND | LIMIT X
+ *  }
+ * where "local" means that it's correct to apply the limit only on the data node, without a corresponding reduction on the coordinator. + * See {@link Limit#local()}. + *

+ * We also require the same condition to hold for {@code TopN}, that is, the following are equivalent + *

{@code
+ *  ... TOP N [field1, ..., fieldN] | MY_COMMAND
+ *  ... local TOP N [field1, ..., fieldN] | MY_COMMAND | TOP N [field1, ..., fieldN]
+ *  }
+ * as long as MY_COMMAND preserves the columns that we order by. + *

+ * Most commands that satisfy this will also satisfy the simpler (but stronger) conditions that the following are equivalent: + *

{@code
+ *  ... LIMIT X | MY_COMMAND
+ *  ... MY_COMMAND | LIMIT X
+ *
+ *  and
+ *
+ * ... TOP N [field1, ..., fieldN] | MY_COMMAND
+ * ... | MY_COMMAND | TOP N [field1, ..., fieldN]
+ *  }
+ *

+ * It is not true, for example, for WHERE: + *

{@code
+ *  ... TOP X [field] | WHERE side="dark"
+ *  }
+ * If the first X rows do not contain any "dark" rows, the result is empty, however if we switch: + *
{@code
+ *  ... local TOP X [field] | WHERE side="dark" | TOP X [field]
+ *  }
+ * and we have N nodes, then the first N*X rows may contain "dark" rows, and the final result is not empty in this case. + */ +public interface Streaming {} diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/inference/InferencePlan.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/inference/InferencePlan.java index 9756c0b5a8176..2b40bbebfbcd3 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/inference/InferencePlan.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/inference/InferencePlan.java @@ -13,10 +13,10 @@ import org.elasticsearch.xpack.esql.core.expression.UnresolvedAttribute; import org.elasticsearch.xpack.esql.core.tree.Source; import org.elasticsearch.xpack.esql.plan.GeneratingPlan; -import org.elasticsearch.xpack.esql.plan.logical.CardinalityPreserving; import org.elasticsearch.xpack.esql.plan.logical.ExecutesOn; import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan; import org.elasticsearch.xpack.esql.plan.logical.SortAgnostic; +import org.elasticsearch.xpack.esql.plan.logical.Streaming; import org.elasticsearch.xpack.esql.plan.logical.UnaryPlan; import java.io.IOException; @@ -25,7 +25,7 @@ public abstract class InferencePlan> extends UnaryPlan implements - CardinalityPreserving, + Streaming, SortAgnostic, GeneratingPlan>, ExecutesOn.Coordinator {