Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1376,6 +1376,45 @@ total_duration:long | day:date | days:date
17016205 |2023-10-23T13:00:00.000Z|[2023-10-23T12:00:00.000Z, 2023-10-23T13:00:00.000Z]
;

evalBeforeDoubleInlinestats1
required_capability: inlinestats_v10

FROM employees
| EVAL salaryK = salary/1000
| INLINESTATS count = COUNT(*) BY salaryK
| INLINESTATS min = MIN(MV_COUNT(languages)) BY salaryK
| SORT emp_no
| KEEP emp_no, still_hired, count
| LIMIT 5
;

emp_no:integer |still_hired:boolean|count:long
10001 |true |1
10002 |true |3
10003 |false |2
10004 |true |2
10005 |true |1
;

evalBeforeDoubleInlinestats2
required_capability: inlinestats_v10

FROM employees
| EVAL jobs = MV_COUNT(job_positions)
| INLINESTATS count = COUNT(*) BY jobs
| INLINESTATS min = MIN(MV_COUNT(languages)) BY jobs
| SORT emp_no
| KEEP emp_no, jobs, count, min
| LIMIT 5
;

emp_no:integer |jobs:integer|count:long|min:integer
10001 |2 |18 |1
10002 |1 |27 |1
10003 |null |11 |1
10004 |4 |26 |1
10005 |null |11 |1
;

evalBeforeInlinestatsAndKeepAfter1
required_capability: inlinestats_v10
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import org.elasticsearch.xpack.esql.plan.logical.Aggregate;
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
import org.elasticsearch.xpack.esql.plan.logical.UnaryPlan;
import org.elasticsearch.xpack.esql.plan.logical.local.EmptyLocalSupplier;
import org.elasticsearch.xpack.esql.plan.logical.local.LocalRelation;
import org.elasticsearch.xpack.esql.plan.logical.local.LocalSupplier;
import org.elasticsearch.xpack.esql.planner.PlannerUtils;
Expand All @@ -37,7 +36,7 @@ public PropagateEmptyRelation() {
@Override
protected LogicalPlan rule(UnaryPlan plan, LogicalOptimizerContext ctx) {
LogicalPlan p = plan;
if (plan.child() instanceof LocalRelation local && local.supplier() == EmptyLocalSupplier.EMPTY) {
if (plan.child() instanceof LocalRelation local && local.hasEmptySupplier()) {
// only care about non-grouped aggs might return something (count)
if (plan instanceof Aggregate agg && agg.groupings().isEmpty()) {
List<Block> emptyBlocks = aggsFromEmpty(ctx.foldCtx(), agg.aggregates());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
import org.elasticsearch.xpack.esql.core.expression.Attribute;
import org.elasticsearch.xpack.esql.core.expression.AttributeSet;
import org.elasticsearch.xpack.esql.core.expression.Expressions;
import org.elasticsearch.xpack.esql.core.expression.FieldAttribute;
import org.elasticsearch.xpack.esql.core.expression.NamedExpression;
import org.elasticsearch.xpack.esql.core.util.Holder;
import org.elasticsearch.xpack.esql.plan.logical.Aggregate;
Expand All @@ -26,7 +25,6 @@
import org.elasticsearch.xpack.esql.plan.logical.Project;
import org.elasticsearch.xpack.esql.plan.logical.Sample;
import org.elasticsearch.xpack.esql.plan.logical.join.InlineJoin;
import org.elasticsearch.xpack.esql.plan.logical.join.StubRelation;
import org.elasticsearch.xpack.esql.plan.logical.local.EmptyLocalSupplier;
import org.elasticsearch.xpack.esql.plan.logical.local.LocalRelation;
import org.elasticsearch.xpack.esql.plan.logical.local.LocalSupplier;
Expand Down Expand Up @@ -79,7 +77,7 @@ private static LogicalPlan pruneColumns(LogicalPlan plan, AttributeSet.Builder u
recheck.set(false);
p = switch (p) {
case Aggregate agg -> pruneColumnsInAggregate(agg, used, inlineJoin);
case InlineJoin inj -> pruneColumnsInInlineJoin(inj, used, recheck);
case InlineJoin inj -> pruneColumnsInInlineJoinRight(inj, used, recheck);
case Eval eval -> pruneColumnsInEval(eval, used, recheck);
case Project project -> inlineJoin ? pruneColumnsInProject(project, used) : p;
case EsRelation esr -> pruneColumnsInEsRelation(esr, used);
Expand Down Expand Up @@ -123,23 +121,9 @@ private static LogicalPlan pruneColumnsInAggregate(Aggregate aggregate, Attribut
} else {
// not expecting high groups cardinality, nested loops in lists should be fine, no need for a HashSet
if (inlineJoin && aggregate.groupings().containsAll(remaining)) {
// It's an INLINEJOIN and all remaining attributes are groupings, which are already part of the IJ output (from the
// left-hand side).
// TODO: INLINESTATS: revisit condition when adding support for INLINESTATS filters
if (aggregate.child() instanceof StubRelation stub) {
var message = "Aggregate groups references ["
+ remaining
+ "] not in child's (StubRelation) output: ["
+ stub.outputSet()
+ "]";
assert stub.outputSet().containsAll(Expressions.asAttributes(remaining)) : message;

p = emptyLocalRelation(aggregate);
} else {
// There are no aggregates to compute, just output the groupings; these are already in the IJ output, so only
// restrict the output to what remained.
p = new Project(aggregate.source(), aggregate.child(), remaining);
}
// An INLINEJOIN right-hand side aggregation output had everything pruned, except for (some of the) groupings, which are
// already part of the IJ output (from the left-hand side): the agg can just be dropped entirely.
p = emptyLocalRelation(aggregate);
} else { // not an INLINEJOIN or there are actually aggregates to compute
p = aggregate.with(aggregate.groupings(), remaining);
}
Expand All @@ -148,12 +132,12 @@ private static LogicalPlan pruneColumnsInAggregate(Aggregate aggregate, Attribut
return p;
}

private static LogicalPlan pruneColumnsInInlineJoin(InlineJoin ij, AttributeSet.Builder used, Holder<Boolean> recheck) {
private static LogicalPlan pruneColumnsInInlineJoinRight(InlineJoin ij, AttributeSet.Builder used, Holder<Boolean> recheck) {
LogicalPlan p = ij;

used.addAll(ij.references());
var right = pruneColumns(ij.right(), used, true);
if (right.output().isEmpty()) {
if (right.output().isEmpty() || isLocalEmptyRelation(right)) {
p = ij.left();
recheck.set(true);
} else if (right != ij.right()) {
Expand Down Expand Up @@ -181,18 +165,13 @@ private static LogicalPlan pruneColumnsInEval(Eval eval, AttributeSet.Builder us
return p;
}

// Note: only run when the Project is a descendent of an InlineJoin.
private static LogicalPlan pruneColumnsInProject(Project project, AttributeSet.Builder used) {
LogicalPlan p = project;

var remaining = pruneUnusedAndAddReferences(project.projections(), used);
if (remaining != null) {
p = remaining.isEmpty() || remaining.stream().allMatch(FieldAttribute.class::isInstance)
? emptyLocalRelation(project)
: new Project(project.source(), project.child(), remaining);
} else if (project.output().stream().allMatch(FieldAttribute.class::isInstance)) {
// Use empty relation as a marker for a subsequent pass, in case the project is only outputting field attributes (which are
// already part of the INLINEJOIN left-hand side output).
p = emptyLocalRelation(project);
p = remaining.isEmpty() ? emptyLocalRelation(project) : new Project(project.source(), project.child(), remaining);
}

return p;
Expand All @@ -216,7 +195,11 @@ private static LogicalPlan pruneColumnsInEsRelation(EsRelation esr, AttributeSet

private static LogicalPlan emptyLocalRelation(LogicalPlan plan) {
// create an empty local relation with no attributes
return new LocalRelation(plan.source(), List.of(), EmptyLocalSupplier.EMPTY);
return new LocalRelation(plan.source(), plan.output(), EmptyLocalSupplier.EMPTY);
}

private static boolean isLocalEmptyRelation(LogicalPlan plan) {
return plan instanceof LocalRelation local && local.hasEmptySupplier();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public void writeTo(StreamOutput out) throws IOException {
if (out.getTransportVersion().onOrAfter(TransportVersions.ESQL_LOCAL_RELATION_WITH_NEW_BLOCKS)) {
out.writeNamedWriteable(supplier);
} else {
if (supplier == EmptyLocalSupplier.EMPTY) {
if (hasEmptySupplier()) {
out.writeVInt(0);
} else {// here we can only have an ImmediateLocalSupplier as this was the only implementation apart from EMPTY
((ImmediateLocalSupplier) supplier).writeTo(out);
Expand All @@ -77,6 +77,10 @@ public LocalSupplier supplier() {
return supplier;
}

public boolean hasEmptySupplier() {
return supplier == EmptyLocalSupplier.EMPTY;
}

@Override
public boolean expressionsResolved() {
return true;
Expand Down
Loading