Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 45 additions & 6 deletions datafusion/datasource/src/file_scan_config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -937,14 +937,19 @@ impl DataSource for FileScanConfig {
/// │ → SortExec removed, fetch (LIMIT) pushed to DataSourceExec
/// │
/// ├─► FileSource returns Inexact
/// │ (reverse_row_groups=true)
/// │ → SortExec kept, scan optimized
/// │ (e.g. column_in_file_schema: opener will reorder RGs at runtime)
/// │ → rebuild_with_source: sort files by stats; if the post-sort
/// │ file groups are non-overlapping AND the request now validates
/// │ AND no NULLs sit in the sort columns of non-last files,
/// │ upgrade back to Exact (SortExec removed). Otherwise stays
/// │ Inexact and SortExec is kept while the scan is still
/// │ optimised via `sort_order_for_reorder` / `reverse_row_groups`.
/// │
/// └─► FileSource returns Unsupported
/// (ordering stripped because files in wrong order)
/// (e.g. expression sort key or partition column)
/// → try_sort_file_groups_by_statistics():
/// 1. Sort files within each group by min/max statistics
/// 2. Re-check: non-overlapping + ordering valid?
/// 2. Re-check: non-overlapping + ordering valid + no NULLs?
/// YES → Exact → SortExec removed
/// NO → Inexact (files reordered, Sort stays)
/// ```
Expand Down Expand Up @@ -973,8 +978,42 @@ impl DataSource for FileScanConfig {
}
}
SortOrderPushdownResult::Inexact { inner } => {
Ok(SortOrderPushdownResult::Inexact {
inner: Arc::new(self.rebuild_with_source(inner, false, order)?),
let mut config = self.rebuild_with_source(inner, false, order)?;
// `rebuild_with_source` reorders files by stats; if the
// post-sort files are non-overlapping AND the request now
// validates against the new file groups, `output_ordering`
// is preserved and we can upgrade back to Exact. This
// restores the sort-elimination behaviour that lived in
// the `Unsupported` → `try_sort_file_groups_by_statistics`
// path before #21956 routed `column_in_file_schema` cases
// here.
if config.output_ordering.is_empty() {
return Ok(SortOrderPushdownResult::Inexact {
inner: Arc::new(config),
});
}
// Upgrading to Exact: the post-sort file groups are
// non-overlapping and each file's declared ordering
// re-validates, so reading the files in their natural
// (declared-sorted) order already yields the requested
// ordering — exactly like the `Unsupported` → Exact path,
// which reads files in natural order too.
//
// Drop the runtime row-group reorder hints the Inexact
// source carried (`sort_order_for_reorder` /
// `reverse_row_groups`) by restoring the original,
// hint-free source. With the `SortExec` removed those
// hints are not just redundant but unsafe: for a DESC
// request the opener sorts row groups ASC-by-min and then
// reverses them, which reorders two row groups within a
// single file that share the same `min` incorrectly
// (e.g. a file `[10,8,8,8]` whose row groups are
// `[10,8]` and `[8,8]` would stream as `8,8,10,8`).
// The `SortExec` used to mask this; once it is gone the
// reordered stream is the final, wrong answer.
config.file_source = Arc::clone(&self.file_source);
Ok(SortOrderPushdownResult::Exact {
inner: Arc::new(config),
})
}
SortOrderPushdownResult::Unsupported => {
Expand Down
93 changes: 69 additions & 24 deletions datafusion/datasource/src/file_scan_config/sort_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,31 +138,76 @@ impl FileScanConfig {
false
};

if is_exact && all_non_overlapping {
// Truly exact: within-file ordering guaranteed and files are non-overlapping.
// Keep output_ordering so SortExec can be eliminated for each partition.
//
// We intentionally do NOT redistribute files across groups here.
// The planning-phase bin-packing may interleave file ranges across groups:
//
// Group 0: [f1(1-10), f3(21-30)] ← interleaved with group 1
// Group 1: [f2(11-20), f4(31-40)]
//
// This interleaving is actually beneficial because SPM pulls from both
// partitions concurrently, keeping parallel I/O active:
//
// SPM: pull P0 [1-10] → pull P1 [11-20] → pull P0 [21-30] → pull P1 [31-40]
// ^^^^^^^^^^^^ ^^^^^^^^^^^^
// both partitions scanning files simultaneously
//
// If we were to redistribute files consecutively:
// Group 0: [f1(1-10), f2(11-20)] ← all values < group 1
// Group 1: [f3(21-30), f4(31-40)]
// Decide whether to keep `output_ordering` (i.e. let the outer
// pushdown report `Exact` and drop `SortExec`).
//
// Two paths can produce a keep:
//
// 1. `is_exact && all_non_overlapping`: the source already had
// validated ordering and the post-sort files still don't
// overlap — Exact carries through unchanged.
//
// 2. `!is_exact && all_non_overlapping`: source returned
// `Inexact` because pre-sort `validated_output_ordering()`
// stripped the declaration (files were listed out of order
// on disk). After our stats-based sort the files are now
// non-overlapping — re-validate against the new file
// groups and, if it passes, upgrade back to Exact so the
// outer wrapper drops the `SortExec`. Without this, the
// `Inexact` branch stayed Inexact even when reorder
// restored a perfectly valid ordering, leaving an
// unnecessary `SortExec` above the source (regression
// after #21956's `column_in_file_schema` signal pushed
// this scenario into the Inexact branch instead of the
// `try_sort_file_groups_by_statistics` fallback).
//
// We intentionally do NOT redistribute files across groups here.
// The planning-phase bin-packing may interleave file ranges across groups:
//
// Group 0: [f1(1-10), f3(21-30)] ← interleaved with group 1
// Group 1: [f2(11-20), f4(31-40)]
//
// This interleaving is actually beneficial because SPM pulls from both
// partitions concurrently, keeping parallel I/O active.
let keep_ordering = match (all_non_overlapping, is_exact) {
// Files still overlap after the stats sort — the combined
// stream isn't ordered, so `output_ordering` must be dropped.
(false, _) => false,
// Source already had validated ordering and the post-sort
// files still don't overlap — Exact carries through.
(true, true) => true,
// Source returned `Inexact`; re-validate against the
// reordered file groups to decide whether to upgrade.
//
// SPM would read ALL of group 0 first (values always smaller), then group 1.
// This degrades to single-threaded sequential I/O — the other partition
// sits idle the entire time, losing the parallelism benefit.
} else {
// Same NULL guard as `try_sort_file_groups_by_statistics`:
// we cannot claim Exact if any non-last file contains
// NULLs in the sort columns. With NULLS LAST those
// NULLs sit after all non-null rows in the file, so
// when the next file's non-nulls are smaller than the
// previous file's max, they'd appear *after* the NULLs
// in the concatenated stream — breaking the ordering.
(true, false) => {
let projected_schema = new_config.projected_schema()?;
let projection_indices = new_config
.file_source
.projection()
.as_ref()
.and_then(|p| ordered_column_indices_from_projection(p));
if any_file_has_nulls_in_sort_columns(
&new_config.file_groups,
order,
&projected_schema,
projection_indices.as_deref(),
) {
false
} else {
let new_eq_props = new_config.eq_properties();
new_eq_props.ordering_satisfy(order.iter().cloned())?
}
}
};

if !keep_ordering {
new_config.output_ordering = vec![];
}

Expand Down
Loading
Loading