Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,12 @@
package org.elasticsearch.xpack.esql.optimizer.rules.logical;

import org.elasticsearch.xpack.esql.optimizer.LogicalOptimizerContext;
import org.elasticsearch.xpack.esql.plan.logical.CardinalityPreserving;
import org.elasticsearch.xpack.esql.plan.logical.Enrich;
import org.elasticsearch.xpack.esql.plan.logical.ExecutesOn;
import org.elasticsearch.xpack.esql.plan.logical.Limit;
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
import org.elasticsearch.xpack.esql.plan.logical.PipelineBreaker;
import org.elasticsearch.xpack.esql.plan.logical.Streaming;

import java.util.Collections;
import java.util.Comparator;
Expand Down Expand Up @@ -46,8 +46,8 @@ protected LogicalPlan rule(Enrich en, LogicalOptimizerContext ctx) {
seenLimits.add(l);
return;
}
if ((p instanceof CardinalityPreserving) == false // can change the number of rows, so we can't just pull a limit from
// under it
if ((p instanceof Streaming) == false // can change the number of rows, so we can't just pull a limit from
// under it
// this will fail the verifier anyway, so no need to continue
|| (p instanceof ExecutesOn ex && ex.executesOn() == ExecutesOn.ExecuteLocation.COORDINATOR)
// This is essentially another remote enrich - let it take care of its own limits
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,13 @@
import org.elasticsearch.xpack.esql.core.expression.NamedExpression;
import org.elasticsearch.xpack.esql.core.util.Holder;
import org.elasticsearch.xpack.esql.expression.Order;
import org.elasticsearch.xpack.esql.plan.logical.CardinalityPreserving;
import org.elasticsearch.xpack.esql.plan.logical.Enrich;
import org.elasticsearch.xpack.esql.plan.logical.Eval;
import org.elasticsearch.xpack.esql.plan.logical.ExecutesOn;
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
import org.elasticsearch.xpack.esql.plan.logical.PipelineBreaker;
import org.elasticsearch.xpack.esql.plan.logical.Project;
import org.elasticsearch.xpack.esql.plan.logical.Streaming;
import org.elasticsearch.xpack.esql.plan.logical.TopN;
import org.elasticsearch.xpack.esql.plan.logical.UnaryPlan;

Expand Down Expand Up @@ -132,8 +132,8 @@ protected LogicalPlan rule(Enrich en) {
return new Project(en.source(), copyTop, outputs);
}
}
if ((plan instanceof CardinalityPreserving) == false // can change the number of rows, so we can't just pull a TopN from
// under it
if ((plan instanceof Streaming) == false // can change the number of rows, so we can't just pull a TopN from
// under it
// this will fail the verifier anyway, so no need to continue
|| (plan instanceof ExecutesOn ex && ex.executesOn() == ExecutesOn.ExecuteLocation.COORDINATOR)
// This is another remote Enrich, it can handle its own limits
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import java.util.List;
import java.util.Objects;

public class Drop extends UnaryPlan implements TelemetryAware, CardinalityPreserving, SortAgnostic {
public class Drop extends UnaryPlan implements TelemetryAware, Streaming, SortAgnostic {
private final List<NamedExpression> removals;

public Drop(Source source, LogicalPlan child, List<NamedExpression> removals) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public class Enrich extends UnaryPlan
PostOptimizationVerificationAware.CoordinatorOnly,
PostAnalysisVerificationAware,
TelemetryAware,
CardinalityPreserving,
Streaming,
SortAgnostic,
ExecutesOn {
public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public class Eval extends UnaryPlan
GeneratingPlan<Eval>,
PostAnalysisVerificationAware,
TelemetryAware,
CardinalityPreserving,
Streaming,
SortAgnostic {
public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(LogicalPlan.class, "Eval", Eval::new);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import java.util.List;
import java.util.Objects;

public class Insist extends UnaryPlan implements SurrogateLogicalPlan, CardinalityPreserving {
public class Insist extends UnaryPlan implements SurrogateLogicalPlan, Streaming {
private final List<? extends Attribute> insistedAttributes;
private @Nullable List<Attribute> lazyOutput = null;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
import java.util.List;
import java.util.Objects;

public class Keep extends Project implements TelemetryAware, CardinalityPreserving, SortAgnostic {
public class Keep extends Project implements TelemetryAware, Streaming, SortAgnostic {

public Keep(Source source, LogicalPlan child, List<? extends NamedExpression> projections) {
super(source, child, projections);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
/**
* A {@code Project} is a {@code Plan} with one child. In {@code FROM idx | KEEP x, y}, the {@code KEEP} statement is a Project.
*/
public class Project extends UnaryPlan implements CardinalityPreserving, SortAgnostic {
public class Project extends UnaryPlan implements Streaming, SortAgnostic {
public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(LogicalPlan.class, "Project", Project::new);

private final List<? extends NamedExpression> projections;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public abstract class RegexExtract extends UnaryPlan
implements
GeneratingPlan<RegexExtract>,
PostAnalysisVerificationAware,
CardinalityPreserving,
Streaming,
SortAgnostic {
protected final Expression input;
protected final List<Attribute> extractedFields;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import java.util.List;
import java.util.Objects;

public class Rename extends UnaryPlan implements TelemetryAware, CardinalityPreserving, SortAgnostic {
public class Rename extends UnaryPlan implements TelemetryAware, Streaming, SortAgnostic {

private final List<Alias> renamings;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.esql.plan.logical;

/**
* This interface marks commands which do not add or remove rows and aren't sensitive to the exact order of the rows.
* This is required to decide whether a command is compatible with
* {@link org.elasticsearch.xpack.esql.optimizer.rules.logical.HoistRemoteEnrichLimit} and
* {@link org.elasticsearch.xpack.esql.optimizer.rules.logical.HoistRemoteEnrichTopN}.
* <p>
* For the most part, such commands can be thought of to be operating on a row-by-row basis. A more formal definition is that this command
* can be run on data nodes and this sequence
* <pre>{@code
* ... LIMIT X | MY_COMMAND
* }</pre>
* is safe to be replaced by this sequence
* <pre>{@code
* ... local LIMIT X | MY_COMMAND | LIMIT X
* }</pre>
* where "local" means that it's correct to apply the limit only on the data node, without a corresponding reduction on the coordinator.
* See {@link Limit#local()}.
* <p>
* We also require the same condition to hold for {@code TopN}, that is, the following are equivalent
* <pre>{@code
* ... TOP N [field1, ..., fieldN] | MY_COMMAND
* ... local TOP N [field1, ..., fieldN] | MY_COMMAND | TOP N [field1, ..., fieldN]
* }</pre>
* as long as MY_COMMAND preserves the columns that we order by.
* <p>
* Most commands that satisfy this will also satisfy the simpler (but stronger) conditions that the following are equivalent:
* <pre>{@code
* ... LIMIT X | MY_COMMAND
* ... MY_COMMAND | LIMIT X
*
* and
*
* ... TOP N [field1, ..., fieldN] | MY_COMMAND
* ... | MY_COMMAND | TOP N [field1, ..., fieldN]
* }</pre>
* <p>
* It is not true, for example, for WHERE:
* <pre>{@code
* ... TOP X [field] | WHERE side="dark"
* }</pre>
* If the first X rows do not contain any "dark" rows, the result is empty, however if we switch:
* <pre>{@code
* ... local TOP X [field] | WHERE side="dark" | TOP X [field]
* }</pre>
* and we have N nodes, then the first N*X rows may contain "dark" rows, and the final result is not empty in this case.
*/
public interface Streaming {}
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@
import org.elasticsearch.xpack.esql.core.expression.UnresolvedAttribute;
import org.elasticsearch.xpack.esql.core.tree.Source;
import org.elasticsearch.xpack.esql.plan.GeneratingPlan;
import org.elasticsearch.xpack.esql.plan.logical.CardinalityPreserving;
import org.elasticsearch.xpack.esql.plan.logical.ExecutesOn;
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
import org.elasticsearch.xpack.esql.plan.logical.SortAgnostic;
import org.elasticsearch.xpack.esql.plan.logical.Streaming;
import org.elasticsearch.xpack.esql.plan.logical.UnaryPlan;

import java.io.IOException;
Expand All @@ -25,7 +25,7 @@

public abstract class InferencePlan<PlanType extends InferencePlan<PlanType>> extends UnaryPlan
implements
CardinalityPreserving,
Streaming,
SortAgnostic,
GeneratingPlan<InferencePlan<PlanType>>,
ExecutesOn.Coordinator {
Expand Down