-
Notifications
You must be signed in to change notification settings - Fork 25.3k
ESQL: Improve local folding of aggregates #103670
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Introduce a local rule for folding aggregates to take into account the aggregate intermediate state as oppose to that on the coordinator.
Hi @costin, I've created a changelog YAML for you. |
Pinging @elastic/es-ql (Team:QL) |
Pinging @elastic/elasticsearch-esql (:Query Languages/ES|QL) |
/** | ||
* Local aggregation can only produce intermediate state that get wired into the global agg. | ||
*/ | ||
private static class LocalPropagateEmptyRelation extends PropagateEmptyRelation { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The gist of this PR - override the global rule with a specific local rule that takes into account the intermediate aggregation and also takes care of handling the boolean types (seen).
This removes the weird logic downstream in the LocalExecutionPlanner as its much more contained.
int i = 0; | ||
for (var agg : aggs) { | ||
// there needs to be an alias | ||
if (agg instanceof Alias a && a.child() instanceof AggregateFunction aggFunc) { | ||
List<Attribute> output = AbstractPhysicalOperationProviders.intermediateAttributes(List.of(agg), List.of()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This section is now moved in the local rule.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good and simplifies the code a bit, thumbs up!
List<Attribute> output = AbstractPhysicalOperationProviders.intermediateAttributes(List.of(agg), List.of()); | ||
for (Attribute o : output) { | ||
DataType dataType = o.dataType(); | ||
// look for count(literal) with literal != null |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this comment is misplaced and should go on top of line 181.
if (aggFunc instanceof Count count && (count.foldable() == false || count.fold() != null)) { | ||
wrapper.accept(0L); | ||
} else { | ||
// otherwise just put null |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
super nit: comment not useful; also could be ternary just as in the super aggOutput
; in fact, since it's the same here, we could even factor this out into a helper method.
@@ -1941,6 +2045,7 @@ private PhysicalPlan physicalPlan(String query) { | |||
var logical = logicalOptimizer.optimize(analyzer.analyze(parser.createStatement(query))); | |||
// System.out.println("Logical\n" + logical); | |||
var physical = mapper.map(logical); | |||
// System.out.println(physical); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Leftover
...in/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/PhysicalPlanOptimizerTests.java
Show resolved
Hide resolved
* \_AggregateExec[[],[COUNT(emp_no{f}#6) AS c],FINAL,null] | ||
* \_ExchangeExec[[count{r}#16, seen{r}#17],true] | ||
* \_FragmentExec[filter=null, estimatedRowSize=0, fragment=[ | ||
* Aggregate[[],[COUNT(emp_no{f}#6) AS c]] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you put this line on the same as the one above? It will fit the IDE line size limit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
💔 Backport failed
You can use sqren/backport to manually backport by running |
Introduce a local rule for folding aggregates to take into account the aggregate intermediate state as oppose to that on the coordinator. (cherry picked from commit cf77469)
Backport elastic#103670 to 8.12
Introduce a local rule for folding aggregates to take into account the
aggregate intermediate state as oppose to that on the coordinator.