+ * For the most part, such commands can be thought of to be operating on a row-by-row basis. A more formal definition is that this command + * can be run on data nodes and this sequence + *
{@code
+ * ... LIMIT X | MY_COMMAND
+ * }
+ * is safe to be replaced by this sequence
+ * {@code
+ * ... local LIMIT X | MY_COMMAND | LIMIT X
+ * }
+ * where "local" means that it's correct to apply the limit only on the data node, without a corresponding reduction on the coordinator.
+ * See {@link Limit#local()}.
+ * + * We also require the same condition to hold for {@code TopN}, that is, the following are equivalent + *
{@code
+ * ... TOP N [field1, ..., fieldN] | MY_COMMAND
+ * ... local TOP N [field1, ..., fieldN] | MY_COMMAND | TOP N [field1, ..., fieldN]
+ * }
+ * as long as MY_COMMAND preserves the columns that we order by.
+ * + * Most commands that satisfy this will also satisfy the simpler (but stronger) conditions that the following are equivalent: + *
{@code
+ * ... LIMIT X | MY_COMMAND
+ * ... MY_COMMAND | LIMIT X
+ *
+ * and
+ *
+ * ... TOP N [field1, ..., fieldN] | MY_COMMAND
+ * ... | MY_COMMAND | TOP N [field1, ..., fieldN]
+ * }
+ * + * It is not true, for example, for WHERE: + *
{@code
+ * ... TOP X [field] | WHERE side="dark"
+ * }
+ * If the first X rows do not contain any "dark" rows, the result is empty, however if we switch:
+ * {@code
+ * ... local TOP X [field] | WHERE side="dark" | TOP X [field]
+ * }
+ * and we have N nodes, then the first N*X rows may contain "dark" rows, and the final result is not empty in this case.
+ */
+public interface Streaming {}
diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/inference/InferencePlan.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/inference/InferencePlan.java
index 9756c0b5a8176..2b40bbebfbcd3 100644
--- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/inference/InferencePlan.java
+++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/inference/InferencePlan.java
@@ -13,10 +13,10 @@
import org.elasticsearch.xpack.esql.core.expression.UnresolvedAttribute;
import org.elasticsearch.xpack.esql.core.tree.Source;
import org.elasticsearch.xpack.esql.plan.GeneratingPlan;
-import org.elasticsearch.xpack.esql.plan.logical.CardinalityPreserving;
import org.elasticsearch.xpack.esql.plan.logical.ExecutesOn;
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
import org.elasticsearch.xpack.esql.plan.logical.SortAgnostic;
+import org.elasticsearch.xpack.esql.plan.logical.Streaming;
import org.elasticsearch.xpack.esql.plan.logical.UnaryPlan;
import java.io.IOException;
@@ -25,7 +25,7 @@
public abstract class InferencePlan