Fix PushdownSort dropping LIMIT when eliminating SortExec#21744
Fix PushdownSort dropping LIMIT when eliminating SortExec#21744sgrebnov wants to merge 3 commits intoapache:mainfrom
Conversation
xudong963
left a comment
There was a problem hiding this comment.
LGTM, would be better if there are some slt tests
Also, do you want to include it into new minor release?
| let inner = if let Some(fetch) = sort_child.fetch() { | ||
| inner.with_fetch(Some(fetch)).unwrap_or(inner) | ||
| inner.with_fetch(Some(fetch)).unwrap_or_else(|| { | ||
| Arc::new(GlobalLimitExec::new(inner, 0, Some(fetch))) |
There was a problem hiding this comment.
Line 102 mentions multi-partitioning (sort_child.preserve_partitioning()) but GlobalLimitExec requires single partitioning - https://docs.rs/datafusion-physical-plan/53.1.0/src/datafusion_physical_plan/limit.rs.html#170
There was a problem hiding this comment.
@martin-g - great point, thank you. Updated to use LocalLimitExec. This is consistent with how enforce_sorting handles this.
// If the sort has a fetch, we need to add a limit:
if properties.output_partitioning().partition_count() == 1 {
let mut global_limit =
GlobalLimitExec::new(Arc::clone(sort_input), 0, Some(fetch));
global_limit.set_required_ordering(required_ordering);
Arc::new(global_limit)
} else {
let mut local_limit = LocalLimitExec::new(Arc::clone(sort_input), fetch);
local_limit.set_required_ordering(required_ordering);
Arc::new(local_limit)
}
Note: I didn't add set_required_ordering here because the Exact result means the source's plan properties already guarantee the ordering, but happy to add it for consistency / if required - please let me know.
alamb
left a comment
There was a problem hiding this comment.
Thanks @sgrebnov and @martin-g and @xudong963
@xudong963 — There's an existing SLT test (sort_pushdown.slt Test 1.3) that already covers the Exact pushdown + LIMIT case where the source supports |
Which issue does this PR close?
When
PushdownSortremoves aSortExecbecause a source returnsExact(guaranteeing ordering), anyfetch(LIMIT) on theSortExecis silently dropped if the underlying plan does not supportwith_fetch().For example,
ProjectionExecsupportstry_pushdown_sort(delegating to its child) but does not implementwith_fetch(). A plan likeSortExec(fetch=10) → ProjectionExec → sourcethat gets sort-eliminated loses the limit.What changes are included in this PR?
In the
Exactbranch ofPushdownSort, when the eliminatedSortExeccarried afetch:with_fetch()on the pushed-down source firstwith_fetch()returnsNone, fall back to wrapping withGlobalLimitExecAre these changes tested?
Yes. Three new unit tests:
test_sort_pushdown_exact_no_fetch_no_limit— Exact elimination without fetch: no limit wrapper addedtest_sort_pushdown_exact_preserves_fetch_with_global_limit— Exact elimination with fetch, source does NOT supportwith_fetch():GlobalLimitExecwrapper addedtest_sort_pushdown_exact_preserves_fetch_with_source_support— Exact elimination with fetch, source supportswith_fetch(): limit pushed into source directlyAre there any user-facing changes?
No.