Push down topk through join#21621
Conversation
| } else { | ||
| &join.right | ||
| }; | ||
| if matches!(preserved_child.as_ref(), LogicalPlan::Sort(_)) { |
There was a problem hiding this comment.
This condition looks a bit broad.
If the child has no fetch limit or a larger fetch limit than the current one then pushing down the current Sort with its fetch limit would be beneficial, no ?
The optimization should be skipped only if the Sort expr is different or its fetch limit is non-zero but smaller than the current one.
gene-bordegaray
left a comment
There was a problem hiding this comment.
The idea behind the optimizer rule itself makes sense. I think there is some room to add more regression testing and ensure we uphold correctness standards for all cases. Let me know what you think :)
|
|
||
| // Create the new Sort(fetch) on the preserved child | ||
| let new_child_sort = Arc::new(LogicalPlan::Sort(SortPlan { | ||
| expr: sort.expr.clone(), |
There was a problem hiding this comment.
I don't think we shoudl just clone sort.expr here. Since the Sort can sit on top of a Projection, in this case the ORDER BY clause of the query is interpreted against the projection output columns, not directly on the join's child.
When we push down the Sort(fetch) rather than cloning the Sort columns we need to push down the columns that were projected.
I believe behavior for this right now would work like this:
Sort: b, fetch=1
Projection: -t1.b AS b
Join
The optimizer rewrites it into:
Sort: b, fetch=1
Projection: -t1.b AS b
Join
Sort: b, fetch=1 -> This is using the post-projected value!
| # Child has larger fetch: push our tighter limit | ||
| # The inner Sort(fetch=5) has a larger limit than our outer Sort(fetch=2), | ||
| # so pushing fetch=2 to the preserved child reduces data further. | ||
| query TT | ||
| EXPLAIN SELECT * FROM ( | ||
| SELECT t1.a, t1.b, t2.x | ||
| FROM (SELECT * FROM t1 ORDER BY b ASC LIMIT 5) t1 | ||
| LEFT JOIN t2 ON t1.a = t2.x | ||
| ) sub | ||
| ORDER BY b ASC LIMIT 2; | ||
| ---- | ||
| logical_plan | ||
| 01)Sort: sub.b ASC NULLS LAST, fetch=2 | ||
| 02)--SubqueryAlias: sub | ||
| 03)----Left Join: t1.a = t2.x | ||
| 04)------SubqueryAlias: t1 | ||
| 05)--------Sort: t1.b ASC NULLS LAST, fetch=5 | ||
| 06)----------TableScan: t1 projection=[a, b] | ||
| 07)------TableScan: t2 projection=[x] |
There was a problem hiding this comment.
Seems like we don't actually push down the fetch=2 tighter limit into the nested Sort here.
There was a problem hiding this comment.
It is being blocked by subqueryAlias between sort and join. I think I need to update the comment.
There was a problem hiding this comment.
Right; couldn't we push the topk down despite the alias? This seems like a fairly common query structure that it would be nice to support.
There was a problem hiding this comment.
refactored code to handle SubqueryAlias
| if join.filter.is_some() { | ||
| return Ok(Transformed::no(plan)); | ||
| } |
There was a problem hiding this comment.
Might not be necessary for this PR, but would be pretty easy to check if the filter only references non-preserved-side columns, in which case I think we can still do the pushdown?
There was a problem hiding this comment.
removed filter. added UT to verify results are correct.
| /// (`Option<TableReference>`) structurally. A `Bare("t1")` and | ||
| /// `Full { catalog, schema, table: "t1" }` are NOT equal even though they | ||
| /// refer to the same column. After resolving through SubqueryAlias the | ||
| /// variant may differ, so we compare by display string instead. |
There was a problem hiding this comment.
How Expr::to_string() helps with the missing TableReference on one of the sides ?
I don't understand how this is better than Column::eq().
There was a problem hiding this comment.
updated comment and removed to_string()
gene-bordegaray
left a comment
There was a problem hiding this comment.
this is looking good 👍
| /// Input sort exprs: [neg_b ASC] | ||
| /// Output sort exprs: [(- t1.b) ASC] | ||
| /// ``` | ||
| fn resolve_sort_exprs_through_projection( |
There was a problem hiding this comment.
may be worth adding unit tests for this guy and resolve_sort_exprs_through_subquery_alias rather than just in slt files to make behavior expectations very clear
|
@martin-g @neilconway I have addressed review comments. PTAL |
|
is this still being worked on @SubhamSinghal ? |
|
@gene-bordegaray yes, just fixed UT. Please let me know the next step. |
gene-bordegaray
left a comment
There was a problem hiding this comment.
my approval is stale, will re-review soon. request changes is only way to remove my approval it looks :(
| Arc::new(EliminateOuterJoin::new()), | ||
| // Filters can't be pushed down past Limits, we should do PushDownFilter after PushDownLimit | ||
| Arc::new(PushDownLimit::new()), | ||
| Arc::new(PushDownTopKThroughJoin::new()), |
There was a problem hiding this comment.
this runs before PushDownFilter. For Sort(fetch) -> Filter -> Join, it only fires on a later optimizer pass after the filter has moved. That works with the default max_passes = 3, but with max_passes = 1 the pushdown is missed. Should we run this after PushDownFilter?
There was a problem hiding this comment.
changed rule order to run after PushDownFilter
There was a problem hiding this comment.
Could we add a few more coverage cases? Like DESC ordering, NULLS FIRST, a multi-level outer join, a CROSS JOIN negative case, and maybe a tied-sort-key case. A custom volatile UDF case would also be useful if it is easy to add.
There was a problem hiding this comment.
Added more test cases. Would require help for custom volatile UDF case.
| let (inner_child, child_resolved_exprs) = match preserved_child.as_ref() { | ||
| LogicalPlan::SubqueryAlias(sq) => { | ||
| let exprs = | ||
| resolve_sort_exprs_through_subquery_alias(&resolved_sort_exprs, sq)?; | ||
| (sq.input.as_ref(), exprs) | ||
| } | ||
| _ => (preserved_child.as_ref(), resolved_sort_exprs.clone()), | ||
| }; |
There was a problem hiding this comment.
This only looks through one SubqueryAlias on the preserved side. If there are nested aliases, we may miss an existing inner Sort(fetch) and add another sort instead of tightening it. We should handle nested aliases.
There was a problem hiding this comment.
@kumarUjjawal Thanks for highlighting this, fixed issue with iterative traversal of nested aliases
2711783 to
036fb9f
Compare
Which issue does this PR close?
#11900
Rationale for this change
When a query has
ORDER BY <cols> LIMIT Non top of an outer join and all sort columns come from the preserved side,DataFusion currently runs the full join first, then sorts and limits. We can push a copy of the
Sort(fetch=N)to the preserved input, reducing the number of rows entering the join.Before:
After:
What changes are included in this PR?
A new logical optimizer rule
PushDownTopKThroughJointhat:Sortwithfetch = Some(N)(TopK)Projectionto find aJoinSort(fetch=N)on the preserved childAre these changes tested?
Yes through UT
Are there any user-facing changes?
No API changes.