From 4130ce422388cf7ff00af26103ebd113f5b58c20 Mon Sep 17 00:00:00 2001 From: Qi Zhu <821684824@qq.com> Date: Thu, 26 Mar 2026 23:52:34 +0800 Subject: [PATCH 01/17] feat: sort file groups by statistics during sort pushdown Sort files within each file group by min/max statistics during sort pushdown to better align with the requested ordering. When files are non-overlapping and within-file ordering is guaranteed (e.g. Parquet with sorting_columns metadata), the SortExec is completely eliminated. Key changes: - ParquetSource::try_pushdown_sort returns Exact when natural ordering satisfies the request, enabling sort elimination - FileScanConfig sorts files within groups by statistics and verifies non-overlapping property to determine Exact vs Inexact - Multi-group files are redistributed consecutively to preserve both sort elimination and I/O parallelism across partitions - Statistics-based file reordering as fallback when FileSource returns Unsupported (benefits TopK via better dynamic filter pruning) - New sort_pushdown benchmark for measuring sort elimination speedup Closes #17348 Co-Authored-By: Claude Opus 4.6 (1M context) --- .../tests/physical_optimizer/pushdown_sort.rs | 10 +- datafusion/datasource-parquet/src/source.rs | 35 +- datafusion/datasource/src/file_scan_config.rs | 1018 +++++++++++++++-- .../physical-optimizer/src/pushdown_sort.rs | 22 +- 4 files changed, 974 insertions(+), 111 deletions(-) diff --git a/datafusion/core/tests/physical_optimizer/pushdown_sort.rs b/datafusion/core/tests/physical_optimizer/pushdown_sort.rs index d6fd4d8d00ae4..2f66aa3c3e209 100644 --- a/datafusion/core/tests/physical_optimizer/pushdown_sort.rs +++ b/datafusion/core/tests/physical_optimizer/pushdown_sort.rs @@ -255,8 +255,9 @@ fn test_prefix_match_through_transparent_nodes() { } #[test] -fn test_no_prefix_match_wrong_direction() { - // Test that prefix matching does NOT work if the direction is wrong +fn test_exact_prefix_match_same_direction() { + // Test that when the requested sort [a DESC] matches a prefix of the source's + // natural ordering [a DESC, b ASC], the Sort is eliminated (Exact pushdown). let schema = schema(); // Source has [a DESC, b ASC] ordering @@ -265,7 +266,7 @@ fn test_no_prefix_match_wrong_direction() { let source_ordering = LexOrdering::new(vec![a.clone().reverse(), b]).unwrap(); let source = parquet_exec_with_sort(schema.clone(), vec![source_ordering]); - // Request [a DESC] - same direction as source, NOT a reverse prefix + // Request [a DESC] - same direction as source prefix, Sort should be eliminated let same_direction = LexOrdering::new(vec![a.clone().reverse()]).unwrap(); let plan = sort_exec(same_direction, source); @@ -278,8 +279,7 @@ fn test_no_prefix_match_wrong_direction() { - DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 DESC NULLS LAST, b@1 ASC], file_type=parquet output: Ok: - - SortExec: expr=[a@0 DESC NULLS LAST], preserve_partitioning=[false] - - DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 DESC NULLS LAST, b@1 ASC], file_type=parquet + - DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 DESC NULLS LAST, b@1 ASC], file_type=parquet " ); } diff --git a/datafusion/datasource-parquet/src/source.rs b/datafusion/datasource-parquet/src/source.rs index 64a339009e9cb..3a64137a2a3f8 100644 --- a/datafusion/datasource-parquet/src/source.rs +++ b/datafusion/datasource-parquet/src/source.rs @@ -743,19 +743,17 @@ impl FileSource for ParquetSource { /// /// With both pieces of information, ParquetSource can decide what optimizations to apply. /// - /// # Phase 1 Behavior (Current) - /// Returns `Inexact` when reversing the row group scan order would help satisfy the - /// requested ordering. We still need a Sort operator at a higher level because: - /// - We only reverse row group read order, not rows within row groups - /// - This provides approximate ordering that benefits limit pushdown - /// - /// # Phase 2 (Future) - /// Could return `Exact` when we can guarantee perfect ordering through techniques like: - /// - File reordering based on statistics - /// - Detecting already-sorted data - /// This would allow removing the Sort operator entirely. + /// # Behavior + /// - Returns `Exact` when the file's natural ordering (from Parquet metadata) already + /// satisfies the requested ordering. This allows the Sort operator to be eliminated + /// if the files within each group are also non-overlapping (checked by FileScanConfig). + /// - Returns `Inexact` when reversing the row group scan order would help satisfy the + /// requested ordering. We still need a Sort operator at a higher level because: + /// - We only reverse row group read order, not rows within row groups + /// - This provides approximate ordering that benefits limit pushdown /// /// # Returns + /// - `Exact`: The file's natural ordering satisfies the request (within-file ordering guaranteed) /// - `Inexact`: Created an optimized source (e.g., reversed scan) that approximates the order /// - `Unsupported`: Cannot optimize for this ordering fn try_pushdown_sort( @@ -767,6 +765,16 @@ impl FileSource for ParquetSource { return Ok(SortOrderPushdownResult::Unsupported); } + // Check if the natural (non-reversed) ordering already satisfies the request. + // Parquet metadata guarantees within-file ordering, so if the ordering matches + // we can return Exact. FileScanConfig will verify that files within each group + // are non-overlapping before declaring the entire scan as Exact. + if eq_properties.ordering_satisfy(order.iter().cloned())? { + return Ok(SortOrderPushdownResult::Exact { + inner: Arc::new(self.clone()) as Arc, + }); + } + // Build new equivalence properties with the reversed ordering. // This allows us to check if the reversed ordering satisfies the request // by leveraging: @@ -811,11 +819,6 @@ impl FileSource for ParquetSource { Ok(SortOrderPushdownResult::Inexact { inner: Arc::new(new_source) as Arc, }) - - // TODO Phase 2: Add support for other optimizations: - // - File reordering based on min/max statistics - // - Detection of exact ordering (return Exact to remove Sort operator) - // - Partial sort pushdown for prefix matches } fn apply_expressions( diff --git a/datafusion/datasource/src/file_scan_config.rs b/datafusion/datasource/src/file_scan_config.rs index 78da70402f93d..a8de21348689d 100644 --- a/datafusion/datasource/src/file_scan_config.rs +++ b/datafusion/datasource/src/file_scan_config.rs @@ -900,28 +900,91 @@ impl DataSource for FileScanConfig { } } + /// Push sort requirements into file-based data sources. + /// + /// # Sort Pushdown Architecture + /// + /// ```text + /// Query: SELECT ... ORDER BY col ASC [LIMIT N] + /// + /// PushdownSort optimizer + /// │ + /// ▼ + /// FileScanConfig::try_pushdown_sort() + /// │ + /// ├─► FileSource::try_pushdown_sort() + /// │ │ + /// │ ├─ natural ordering matches? ──► Exact + /// │ │ (e.g. Parquet WITH ORDER) │ + /// │ │ ▼ + /// │ │ rebuild_with_source(exact=true) + /// │ │ ├─ sort files by stats within groups + /// │ │ ├─ verify non-overlapping + /// │ │ ├─ redistribute across groups (consecutive) + /// │ │ └─► keep output_ordering → SortExec removed + /// │ │ + /// │ ├─ reversed ordering matches? ──► Inexact + /// │ │ (reverse_row_groups=true) │ + /// │ │ ▼ + /// │ │ rebuild_with_source(exact=false) + /// │ │ ├─ sort files by stats + /// │ │ └─► clear output_ordering → SortExec kept + /// │ │ + /// │ └─ neither ──► Unsupported + /// │ + /// └─► try_sort_file_groups_by_statistics() + /// (best-effort: reorder files by min/max stats) + /// └─► Inexact if reordered, Unsupported if already in order + /// ``` + /// + /// # Result Plans + /// + /// ```text + /// Exact (single partition): DataSourceExec [files sorted, non-overlapping] + /// Exact (multi partition): SPM ─► DataSourceExec [group 0] | [group 1] + /// Inexact (reverse scan): SortExec ─► DataSourceExec [reverse_row_groups] + /// Inexact (stats reorder): SortExec ─► DataSourceExec [files reordered] + /// ``` + /// + /// # Trade-offs + /// + /// - **Exact + single partition**: No sort, no merge, but sequential I/O only. + /// Best for LIMIT queries (reads minimal data and stops). + /// - **Exact + multi partition**: No sort per partition, cheap O(n) SPM merge, + /// parallel I/O. Best for full scans on large datasets. + /// - **Inexact**: Sort still required, but statistics-aware file ordering helps + /// TopK discard data earlier via dynamic filters. fn try_pushdown_sort( &self, order: &[PhysicalSortExpr], ) -> Result>> { - // Delegate to FileSource to see if it can optimize for the requested ordering. let pushdown_result = self .file_source .try_pushdown_sort(order, &self.eq_properties())?; match pushdown_result { SortOrderPushdownResult::Exact { inner } => { - Ok(SortOrderPushdownResult::Exact { - inner: self.rebuild_with_source(inner, true, order)?, - }) + let config = self.rebuild_with_source(inner, true, order)?; + // rebuild_with_source keeps output_ordering only when all groups + // are non-overlapping. If output_ordering was cleared, files + // overlap despite within-file ordering → downgrade to Inexact. + if config.output_ordering.is_empty() { + Ok(SortOrderPushdownResult::Inexact { + inner: Arc::new(config), + }) + } else { + Ok(SortOrderPushdownResult::Exact { + inner: Arc::new(config), + }) + } } SortOrderPushdownResult::Inexact { inner } => { Ok(SortOrderPushdownResult::Inexact { - inner: self.rebuild_with_source(inner, false, order)?, + inner: Arc::new(self.rebuild_with_source(inner, false, order)?), }) } SortOrderPushdownResult::Unsupported => { - Ok(SortOrderPushdownResult::Unsupported) + self.try_sort_file_groups_by_statistics(order) } } } @@ -947,6 +1010,13 @@ impl DataSource for FileScanConfig { } } +/// Result of sorting files within groups by their min/max statistics. +struct SortedFileGroups { + file_groups: Vec, + any_reordered: bool, + all_non_overlapping: bool, +} + impl FileScanConfig { /// Returns only the output orderings that are validated against actual /// file group statistics. @@ -1247,26 +1317,51 @@ impl FileScanConfig { &self.file_source } - /// Helper: Rebuild FileScanConfig with new file source + /// Rebuild FileScanConfig after sort pushdown, applying file-level optimizations. + /// + /// This is the core of sort pushdown for file-based sources. It performs + /// three optimizations depending on the pushdown result: + /// + /// ```text + /// ┌─────────────────────────────────────────────────────────────┐ + /// │ rebuild_with_source │ + /// │ │ + /// │ 1. Reverse file groups (if DESC matches reversed ordering) │ + /// │ 2. Sort files within groups by min/max statistics │ + /// │ 3. If Exact + non-overlapping: │ + /// │ a. Redistribute files consecutively across groups │ + /// │ b. Keep output_ordering → SortExec eliminated │ + /// │ Otherwise: clear output_ordering → SortExec stays │ + /// └─────────────────────────────────────────────────────────────┘ + /// ``` + /// + /// # Why sort files by statistics? + /// + /// Files within a partition (file group) are read sequentially. By sorting + /// them so that file_i.max <= file_{i+1}.min, the combined output stream + /// is already in order — no SortExec needed for that partition. + /// + /// Even when files overlap (Inexact), statistics-based ordering helps + /// TopK/LIMIT queries: reading low-value files first lets dynamic filters + /// prune high-value files earlier. + /// + /// # Why redistribute across groups? + /// + /// `split_groups_by_statistics` uses bin-packing to balance group sizes, + /// which can interleave file ranges across groups. We fix this by + /// assigning consecutive files to consecutive groups, so groups are + /// ordered relative to each other. This preserves parallel I/O while + /// ensuring SPM's merge is a cheap sequential read. fn rebuild_with_source( &self, new_file_source: Arc, is_exact: bool, order: &[PhysicalSortExpr], - ) -> Result> { + ) -> Result { let mut new_config = self.clone(); // Reverse file order (within each group) if the caller is requesting a reversal of this // scan's declared output ordering. - // - // Historically this function always reversed `file_groups` because it was only reached - // via `FileSource::try_reverse_output` (where a reversal was the only supported - // optimization). - // - // Now that `FileSource::try_pushdown_sort` is generic, we must not assume reversal: other - // optimizations may become possible (e.g. already-sorted data, statistics-based file - // reordering). Therefore we only reverse files when it is known to help satisfy the - // requested ordering. let reverse_file_groups = if self.output_ordering.is_empty() { false } else if let Some(requested) = LexOrdering::new(order.iter().cloned()) { @@ -1293,13 +1388,294 @@ impl FileScanConfig { new_config.file_source = new_file_source; - // Phase 1: Clear output_ordering for Inexact - // (we're only reversing row groups, not guaranteeing perfect ordering) - if !is_exact { + // Sort files within groups by statistics when not reversing + let all_non_overlapping = if !reverse_file_groups { + if let Some(sort_order) = LexOrdering::new(order.iter().cloned()) { + let projected_schema = new_config.projected_schema()?; + let projection_indices = new_config + .file_source + .projection() + .as_ref() + .and_then(|p| ordered_column_indices_from_projection(p)); + let result = Self::sort_files_within_groups_by_statistics( + &new_config.file_groups, + &sort_order, + &projected_schema, + projection_indices.as_deref(), + ); + new_config.file_groups = result.file_groups; + result.all_non_overlapping + } else { + false + } + } else { + // When reversing, check if reversed groups are non-overlapping + if let Some(sort_order) = LexOrdering::new(order.iter().cloned()) { + let projected_schema = new_config.projected_schema()?; + let projection_indices = new_config + .file_source + .projection() + .as_ref() + .and_then(|p| ordered_column_indices_from_projection(p)); + let result = Self::sort_files_within_groups_by_statistics( + &new_config.file_groups, + &sort_order, + &projected_schema, + projection_indices.as_deref(), + ); + result.all_non_overlapping + } else { + false + } + }; + + if is_exact && all_non_overlapping { + // Truly exact: within-file ordering guaranteed and files are non-overlapping. + // + // When there are multiple groups, redistribute files using consecutive + // assignment so that each group remains non-overlapping AND groups are + // ordered relative to each other. This enables: + // - No SortExec per partition (files in each group are sorted & non-overlapping) + // - SPM cheaply merges ordered streams (O(n) merge) + // - Parallel I/O across partitions + // + // Before (bin-packing may interleave): + // Group 0: [file_01(1-10), file_03(21-30)] ← gap, interleaved with group 1 + // Group 1: [file_02(11-20), file_04(31-40)] + // + // After (consecutive assignment): + // Group 0: [file_01(1-10), file_02(11-20)] ← consecutive, ordered + // Group 1: [file_03(21-30), file_04(31-40)] ← consecutive, ordered + if new_config.file_groups.len() > 1 + && let Some(sort_order) = LexOrdering::new(order.iter().cloned()) + { + let projected_schema = new_config.projected_schema()?; + let projection_indices = new_config + .file_source + .projection() + .as_ref() + .and_then(|p| ordered_column_indices_from_projection(p)); + let num_groups = new_config.file_groups.len(); + new_config.file_groups = + Self::redistribute_files_across_groups_by_statistics( + &new_config.file_groups, + &sort_order, + &projected_schema, + projection_indices.as_deref(), + num_groups, + ); + } + } else { new_config.output_ordering = vec![]; } - Ok(Arc::new(new_config)) + Ok(new_config) + } + + /// Sort files within each file group by their min/max statistics. + /// + /// No files are moved between groups — parallelism and group composition + /// are unchanged. Groups where statistics are unavailable are kept as-is. + /// + /// ```text + /// Before: Group [file_c(20-30), file_a(0-9), file_b(10-19)] + /// After: Group [file_a(0-9), file_b(10-19), file_c(20-30)] + /// ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + /// sorted by min value, non-overlapping → Exact + /// ``` + fn sort_files_within_groups_by_statistics( + file_groups: &[FileGroup], + sort_order: &LexOrdering, + projected_schema: &SchemaRef, + projection_indices: Option<&[usize]>, + ) -> SortedFileGroups { + let mut any_reordered = false; + let mut confirmed_non_overlapping: usize = 0; + let mut new_groups = Vec::with_capacity(file_groups.len()); + + for group in file_groups { + if group.len() <= 1 { + new_groups.push(group.clone()); + confirmed_non_overlapping += 1; + continue; + } + + let files: Vec<_> = group.iter().collect(); + + let statistics = match MinMaxStatistics::new_from_files( + sort_order, + projected_schema, + projection_indices, + files.iter().copied(), + ) { + Ok(stats) => stats, + Err(e) => { + log::trace!( + "Cannot sort file group by statistics: {e}. Keeping original order." + ); + new_groups.push(group.clone()); + continue; + } + }; + + let sorted_indices = statistics.min_values_sorted(); + + let already_sorted = sorted_indices + .iter() + .enumerate() + .all(|(pos, (idx, _))| pos == *idx); + + let sorted_group: FileGroup = if already_sorted { + group.clone() + } else { + any_reordered = true; + sorted_indices + .iter() + .map(|(idx, _)| files[*idx].clone()) + .collect() + }; + + let sorted_files: Vec<_> = sorted_group.iter().collect(); + let is_non_overlapping = match MinMaxStatistics::new_from_files( + sort_order, + projected_schema, + projection_indices, + sorted_files.iter().copied(), + ) { + Ok(stats) => stats.is_sorted(), + Err(_) => false, + }; + + if is_non_overlapping { + confirmed_non_overlapping += 1; + } + + new_groups.push(sorted_group); + } + + SortedFileGroups { + file_groups: new_groups, + any_reordered, + all_non_overlapping: confirmed_non_overlapping == file_groups.len(), + } + } + + /// Redistribute files across groups using consecutive assignment. + /// + /// `split_groups_by_statistics` uses bin-packing which balances group sizes + /// but can interleave file ranges. This method fixes that by assigning + /// consecutive sorted files to consecutive groups: + /// + /// ```text + /// Input (bin-packed, interleaved): + /// Group 0: [f1(0-9), f3(20-29)] max(f1)=9 but f3=20 > Group1.f2=10 + /// Group 1: [f2(10-19), f4(30-39)] groups overlap! + /// + /// After global sort + consecutive assignment: + /// Group 0: [f1(0-9), f2(10-19)] max=19 + /// Group 1: [f3(20-29), f4(30-39)] min=20 > 19 ✓ groups are ordered! + /// + /// Resulting plan: + /// SPM [col ASC] ← O(n) merge, reads group 0 then group 1 + /// DataSourceExec [f1, f2] ← parallel I/O, no SortExec + /// DataSourceExec [f3, f4] ← parallel I/O, no SortExec + /// ``` + /// + /// Falls back to the original groups if statistics are unavailable. + fn redistribute_files_across_groups_by_statistics( + file_groups: &[FileGroup], + sort_order: &LexOrdering, + projected_schema: &SchemaRef, + projection_indices: Option<&[usize]>, + num_groups: usize, + ) -> Vec { + if num_groups <= 1 { + return file_groups.to_vec(); + } + + // Flatten all files + let all_files: Vec<_> = file_groups.iter().flat_map(|g| g.iter()).collect(); + if all_files.is_empty() { + return file_groups.to_vec(); + } + + // Sort globally by statistics + let statistics = match MinMaxStatistics::new_from_files( + sort_order, + projected_schema, + projection_indices, + all_files.iter().copied(), + ) { + Ok(stats) => stats, + Err(_) => return file_groups.to_vec(), + }; + + let sorted_indices = statistics.min_values_sorted(); + + // Assign consecutive files to groups + let total = sorted_indices.len(); + let base_size = total / num_groups; + let remainder = total % num_groups; + + let mut new_groups = Vec::with_capacity(num_groups); + let mut offset = 0; + for i in 0..num_groups { + // First `remainder` groups get one extra file + let group_size = base_size + if i < remainder { 1 } else { 0 }; + if group_size == 0 { + continue; + } + let group: FileGroup = sorted_indices[offset..offset + group_size] + .iter() + .map(|(idx, _)| all_files[*idx].clone()) + .collect(); + new_groups.push(group); + offset += group_size; + } + + new_groups + } + + /// Last-resort optimization when FileSource returns `Unsupported`. + /// + /// Even without within-file ordering guarantees, reordering files by + /// min/max statistics still helps: TopK queries with dynamic filters + /// can prune files earlier when files are read in approximate order. + /// + /// Returns `Inexact` (SortExec stays) since we cannot guarantee ordering. + /// Returns `Unsupported` if no files were actually reordered. + fn try_sort_file_groups_by_statistics( + &self, + order: &[PhysicalSortExpr], + ) -> Result>> { + let Some(sort_order) = LexOrdering::new(order.iter().cloned()) else { + return Ok(SortOrderPushdownResult::Unsupported); + }; + + let projected_schema = self.projected_schema()?; + let projection_indices = self + .file_source + .projection() + .as_ref() + .and_then(|p| ordered_column_indices_from_projection(p)); + + let result = Self::sort_files_within_groups_by_statistics( + &self.file_groups, + &sort_order, + &projected_schema, + projection_indices.as_deref(), + ); + + if !result.any_reordered { + return Ok(SortOrderPushdownResult::Unsupported); + } + + let mut new_config = self.clone(); + new_config.file_groups = result.file_groups; + new_config.output_ordering = vec![]; + Ok(SortOrderPushdownResult::Inexact { + inner: Arc::new(new_config), + }) } } @@ -1401,65 +1777,6 @@ fn validate_orderings( .collect() } -/// The various listing tables does not attempt to read all files -/// concurrently, instead they will read files in sequence within a -/// partition. This is an important property as it allows plans to -/// run against 1000s of files and not try to open them all -/// concurrently. -/// -/// However, it means if we assign more than one file to a partition -/// the output sort order will not be preserved as illustrated in the -/// following diagrams: -/// -/// When only 1 file is assigned to each partition, each partition is -/// correctly sorted on `(A, B, C)` -/// -/// ```text -/// ┏ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ┓ -/// ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐ ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┌ ─ ─ ─ ─ ─ ─ ─ ─ ┐ -/// ┃ ┌───────────────┐ ┌──────────────┐ │ ┌──────────────┐ │ ┌─────────────┐ ┃ -/// │ │ 1.parquet │ │ │ │ 2.parquet │ │ │ 3.parquet │ │ │ 4.parquet │ │ -/// ┃ │ Sort: A, B, C │ │Sort: A, B, C │ │ │Sort: A, B, C │ │ │Sort: A, B, C│ ┃ -/// │ └───────────────┘ │ │ └──────────────┘ │ └──────────────┘ │ └─────────────┘ │ -/// ┃ │ │ ┃ -/// │ │ │ │ │ │ -/// ┃ │ │ ┃ -/// │ │ │ │ │ │ -/// ┃ │ │ ┃ -/// │ │ │ │ │ │ -/// ┃ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┃ -/// DataFusion DataFusion DataFusion DataFusion -/// ┃ Partition 1 Partition 2 Partition 3 Partition 4 ┃ -/// ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ -/// -/// DataSourceExec -/// ``` -/// -/// However, when more than 1 file is assigned to each partition, each -/// partition is NOT correctly sorted on `(A, B, C)`. Once the second -/// file is scanned, the same values for A, B and C can be repeated in -/// the same sorted stream -/// -///```text -/// ┏ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ -/// ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐ ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┃ -/// ┃ ┌───────────────┐ ┌──────────────┐ │ -/// │ │ 1.parquet │ │ │ │ 2.parquet │ ┃ -/// ┃ │ Sort: A, B, C │ │Sort: A, B, C │ │ -/// │ └───────────────┘ │ │ └──────────────┘ ┃ -/// ┃ ┌───────────────┐ ┌──────────────┐ │ -/// │ │ 3.parquet │ │ │ │ 4.parquet │ ┃ -/// ┃ │ Sort: A, B, C │ │Sort: A, B, C │ │ -/// │ └───────────────┘ │ │ └──────────────┘ ┃ -/// ┃ │ -/// │ │ │ ┃ -/// ┃ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘ -/// DataFusion DataFusion ┃ -/// ┃ Partition 1 Partition 2 -/// ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ┛ -/// -/// DataSourceExec -/// ``` fn get_projected_output_ordering( base_config: &FileScanConfig, projected_schema: &SchemaRef, @@ -2611,4 +2928,545 @@ mod tests { Ok(()) } + + fn make_file_with_stats(name: &str, min: f64, max: f64) -> PartitionedFile { + PartitionedFile::new(name.to_string(), 1024).with_statistics(Arc::new( + Statistics { + num_rows: Precision::Exact(100), + total_byte_size: Precision::Exact(1024), + column_statistics: vec![ColumnStatistics { + null_count: Precision::Exact(0), + min_value: Precision::Exact(ScalarValue::Float64(Some(min))), + max_value: Precision::Exact(ScalarValue::Float64(Some(max))), + ..Default::default() + }], + }, + )) + } + + #[derive(Clone)] + struct ExactSortPushdownSource { + metrics: ExecutionPlanMetricsSet, + table_schema: TableSchema, + } + + impl ExactSortPushdownSource { + fn new(table_schema: TableSchema) -> Self { + Self { + metrics: ExecutionPlanMetricsSet::new(), + table_schema, + } + } + } + + impl FileSource for ExactSortPushdownSource { + fn create_file_opener( + &self, + _object_store: Arc, + _base_config: &FileScanConfig, + _partition: usize, + ) -> Result> { + unimplemented!() + } + + fn as_any(&self) -> &dyn Any { + self + } + + fn table_schema(&self) -> &TableSchema { + &self.table_schema + } + + fn with_batch_size(&self, _batch_size: usize) -> Arc { + Arc::new(self.clone()) + } + + fn metrics(&self) -> &ExecutionPlanMetricsSet { + &self.metrics + } + + fn file_type(&self) -> &str { + "mock_exact" + } + + fn try_pushdown_sort( + &self, + _order: &[PhysicalSortExpr], + _eq_properties: &EquivalenceProperties, + ) -> Result>> { + Ok(SortOrderPushdownResult::Exact { + inner: Arc::new(self.clone()) as Arc, + }) + } + + fn apply_expressions( + &self, + _f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result, + ) -> Result { + Ok(TreeNodeRecursion::Continue) + } + } + + #[test] + fn sort_pushdown_unsupported_source_files_get_sorted() -> Result<()> { + let file_schema = + Arc::new(Schema::new(vec![Field::new("a", DataType::Float64, false)])); + let table_schema = TableSchema::new(Arc::clone(&file_schema), vec![]); + let file_source = Arc::new(MockSource::new(table_schema)); + + let file_groups = vec![FileGroup::new(vec![ + make_file_with_stats("file3", 20.0, 30.0), + make_file_with_stats("file1", 0.0, 9.0), + make_file_with_stats("file2", 10.0, 19.0), + ])]; + + let sort_expr = PhysicalSortExpr::new_default(Arc::new(Column::new("a", 0))); + let config = + FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), file_source) + .with_file_groups(file_groups) + .build(); + + let result = config.try_pushdown_sort(&[sort_expr])?; + let SortOrderPushdownResult::Inexact { inner } = result else { + panic!("Expected Inexact result, got {result:?}"); + }; + let pushed_config = inner + .as_any() + .downcast_ref::() + .expect("Expected FileScanConfig"); + let files = pushed_config.file_groups[0].files(); + assert_eq!(files[0].object_meta.location.as_ref(), "file1"); + assert_eq!(files[1].object_meta.location.as_ref(), "file2"); + assert_eq!(files[2].object_meta.location.as_ref(), "file3"); + assert!(pushed_config.output_ordering.is_empty()); + Ok(()) + } + + #[test] + fn sort_pushdown_unsupported_source_already_sorted() -> Result<()> { + let file_schema = + Arc::new(Schema::new(vec![Field::new("a", DataType::Float64, false)])); + let table_schema = TableSchema::new(Arc::clone(&file_schema), vec![]); + let file_source = Arc::new(MockSource::new(table_schema)); + + let file_groups = vec![FileGroup::new(vec![ + make_file_with_stats("file1", 0.0, 9.0), + make_file_with_stats("file2", 10.0, 19.0), + make_file_with_stats("file3", 20.0, 30.0), + ])]; + + let sort_expr = PhysicalSortExpr::new_default(Arc::new(Column::new("a", 0))); + let config = + FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), file_source) + .with_file_groups(file_groups) + .build(); + + let result = config.try_pushdown_sort(&[sort_expr])?; + assert!(matches!(result, SortOrderPushdownResult::Unsupported)); + Ok(()) + } + + #[test] + fn sort_pushdown_unsupported_source_descending_sort() -> Result<()> { + let file_schema = + Arc::new(Schema::new(vec![Field::new("a", DataType::Float64, false)])); + let table_schema = TableSchema::new(Arc::clone(&file_schema), vec![]); + let file_source = Arc::new(MockSource::new(table_schema)); + + let file_groups = vec![FileGroup::new(vec![ + make_file_with_stats("file1", 0.0, 9.0), + make_file_with_stats("file3", 20.0, 30.0), + make_file_with_stats("file2", 10.0, 19.0), + ])]; + + let sort_expr = PhysicalSortExpr::new( + Arc::new(Column::new("a", 0)), + arrow::compute::SortOptions { + descending: true, + nulls_first: true, + }, + ); + let config = + FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), file_source) + .with_file_groups(file_groups) + .build(); + + let result = config.try_pushdown_sort(&[sort_expr])?; + let SortOrderPushdownResult::Inexact { inner } = result else { + panic!("Expected Inexact result"); + }; + let pushed_config = inner + .as_any() + .downcast_ref::() + .expect("Expected FileScanConfig"); + let files = pushed_config.file_groups[0].files(); + assert_eq!(files[0].object_meta.location.as_ref(), "file3"); + assert_eq!(files[1].object_meta.location.as_ref(), "file2"); + assert_eq!(files[2].object_meta.location.as_ref(), "file1"); + Ok(()) + } + + #[test] + fn sort_pushdown_exact_source_non_overlapping_returns_exact() -> Result<()> { + let file_schema = + Arc::new(Schema::new(vec![Field::new("a", DataType::Float64, false)])); + let table_schema = TableSchema::new(Arc::clone(&file_schema), vec![]); + let file_source = Arc::new(ExactSortPushdownSource::new(table_schema)); + + let sort_expr = PhysicalSortExpr::new_default(Arc::new(Column::new("a", 0))); + + let file_groups = vec![FileGroup::new(vec![ + make_file_with_stats("file1", 0.0, 9.0), + make_file_with_stats("file2", 10.0, 19.0), + make_file_with_stats("file3", 20.0, 30.0), + ])]; + + let config = + FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), file_source) + .with_file_groups(file_groups) + .with_output_ordering(vec![ + LexOrdering::new(vec![sort_expr.clone()]).unwrap(), + ]) + .build(); + + let result = config.try_pushdown_sort(&[sort_expr])?; + let SortOrderPushdownResult::Exact { inner } = result else { + panic!("Expected Exact result, got {result:?}"); + }; + let pushed_config = inner + .as_any() + .downcast_ref::() + .expect("Expected FileScanConfig"); + assert!(!pushed_config.output_ordering.is_empty()); + Ok(()) + } + + #[test] + fn sort_pushdown_exact_source_overlapping_downgraded_to_inexact() -> Result<()> { + let file_schema = + Arc::new(Schema::new(vec![Field::new("a", DataType::Float64, false)])); + let table_schema = TableSchema::new(Arc::clone(&file_schema), vec![]); + let file_source = Arc::new(ExactSortPushdownSource::new(table_schema)); + + let sort_expr = PhysicalSortExpr::new_default(Arc::new(Column::new("a", 0))); + + let file_groups = vec![FileGroup::new(vec![ + make_file_with_stats("file1", 0.0, 15.0), + make_file_with_stats("file2", 10.0, 25.0), + make_file_with_stats("file3", 20.0, 30.0), + ])]; + + let config = + FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), file_source) + .with_file_groups(file_groups) + .with_output_ordering(vec![ + LexOrdering::new(vec![sort_expr.clone()]).unwrap(), + ]) + .build(); + + let result = config.try_pushdown_sort(&[sort_expr])?; + let SortOrderPushdownResult::Inexact { inner } = result else { + panic!("Expected Inexact (downgraded), got {result:?}"); + }; + let pushed_config = inner + .as_any() + .downcast_ref::() + .expect("Expected FileScanConfig"); + assert!(pushed_config.output_ordering.is_empty()); + Ok(()) + } + + #[test] + fn sort_pushdown_exact_source_out_of_order_returns_exact() -> Result<()> { + let file_schema = + Arc::new(Schema::new(vec![Field::new("a", DataType::Float64, false)])); + let table_schema = TableSchema::new(Arc::clone(&file_schema), vec![]); + let file_source = Arc::new(ExactSortPushdownSource::new(table_schema)); + + let sort_expr = PhysicalSortExpr::new_default(Arc::new(Column::new("a", 0))); + + let file_groups = vec![FileGroup::new(vec![ + make_file_with_stats("file3", 20.0, 30.0), + make_file_with_stats("file1", 0.0, 9.0), + make_file_with_stats("file2", 10.0, 19.0), + ])]; + + let config = + FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), file_source) + .with_file_groups(file_groups) + .with_output_ordering(vec![ + LexOrdering::new(vec![sort_expr.clone()]).unwrap(), + ]) + .build(); + + let result = config.try_pushdown_sort(&[sort_expr])?; + let SortOrderPushdownResult::Exact { inner } = result else { + panic!("Expected Exact result, got {result:?}"); + }; + let pushed_config = inner + .as_any() + .downcast_ref::() + .expect("Expected FileScanConfig"); + let files = pushed_config.file_groups[0].files(); + assert_eq!(files[0].object_meta.location.as_ref(), "file1"); + assert_eq!(files[1].object_meta.location.as_ref(), "file2"); + assert_eq!(files[2].object_meta.location.as_ref(), "file3"); + assert!(!pushed_config.output_ordering.is_empty()); + Ok(()) + } + + #[test] + fn sort_pushdown_unsupported_source_single_file_groups() -> Result<()> { + let file_schema = + Arc::new(Schema::new(vec![Field::new("a", DataType::Float64, false)])); + let table_schema = TableSchema::new(Arc::clone(&file_schema), vec![]); + let file_source = Arc::new(MockSource::new(table_schema)); + + let file_groups = vec![ + FileGroup::new(vec![make_file_with_stats("file1", 0.0, 9.0)]), + FileGroup::new(vec![make_file_with_stats("file2", 10.0, 19.0)]), + ]; + + let sort_expr = PhysicalSortExpr::new_default(Arc::new(Column::new("a", 0))); + let config = + FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), file_source) + .with_file_groups(file_groups) + .build(); + + let result = config.try_pushdown_sort(&[sort_expr])?; + assert!( + matches!(result, SortOrderPushdownResult::Unsupported), + "Expected Unsupported for single-file groups" + ); + Ok(()) + } + + #[test] + fn sort_pushdown_unsupported_source_multiple_groups() -> Result<()> { + let file_schema = + Arc::new(Schema::new(vec![Field::new("a", DataType::Float64, false)])); + let table_schema = TableSchema::new(Arc::clone(&file_schema), vec![]); + let file_source = Arc::new(MockSource::new(table_schema)); + + let file_groups = vec![ + FileGroup::new(vec![ + make_file_with_stats("file_b", 10.0, 19.0), + make_file_with_stats("file_a", 0.0, 9.0), + ]), + FileGroup::new(vec![ + make_file_with_stats("file_d", 30.0, 39.0), + make_file_with_stats("file_c", 20.0, 29.0), + ]), + ]; + + let sort_expr = PhysicalSortExpr::new_default(Arc::new(Column::new("a", 0))); + let config = + FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), file_source) + .with_file_groups(file_groups) + .build(); + + let result = config.try_pushdown_sort(&[sort_expr])?; + let SortOrderPushdownResult::Inexact { inner } = result else { + panic!("Expected Inexact result"); + }; + let pushed_config = inner + .as_any() + .downcast_ref::() + .expect("Expected FileScanConfig"); + let files0 = pushed_config.file_groups[0].files(); + assert_eq!(files0[0].object_meta.location.as_ref(), "file_a"); + assert_eq!(files0[1].object_meta.location.as_ref(), "file_b"); + let files1 = pushed_config.file_groups[1].files(); + assert_eq!(files1[0].object_meta.location.as_ref(), "file_c"); + assert_eq!(files1[1].object_meta.location.as_ref(), "file_d"); + Ok(()) + } + + #[test] + fn sort_pushdown_unsupported_source_partial_statistics() -> Result<()> { + let file_schema = + Arc::new(Schema::new(vec![Field::new("a", DataType::Float64, false)])); + let table_schema = TableSchema::new(Arc::clone(&file_schema), vec![]); + let file_source = Arc::new(MockSource::new(table_schema)); + + let file_groups = vec![ + FileGroup::new(vec![ + make_file_with_stats("file_b", 10.0, 19.0), + make_file_with_stats("file_a", 0.0, 9.0), + ]), + FileGroup::new(vec![ + PartitionedFile::new("file_d".to_string(), 1024), + PartitionedFile::new("file_c".to_string(), 1024), + ]), + ]; + + let sort_expr = PhysicalSortExpr::new_default(Arc::new(Column::new("a", 0))); + let config = + FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), file_source) + .with_file_groups(file_groups) + .build(); + + let result = config.try_pushdown_sort(&[sort_expr])?; + let SortOrderPushdownResult::Inexact { inner } = result else { + panic!("Expected Inexact result"); + }; + let pushed_config = inner + .as_any() + .downcast_ref::() + .expect("Expected FileScanConfig"); + let files0 = pushed_config.file_groups[0].files(); + assert_eq!(files0[0].object_meta.location.as_ref(), "file_a"); + assert_eq!(files0[1].object_meta.location.as_ref(), "file_b"); + let files1 = pushed_config.file_groups[1].files(); + assert_eq!(files1[0].object_meta.location.as_ref(), "file_d"); + assert_eq!(files1[1].object_meta.location.as_ref(), "file_c"); + Ok(()) + } + + #[test] + fn sort_pushdown_inexact_source_with_statistics_sorting() -> Result<()> { + let file_schema = + Arc::new(Schema::new(vec![Field::new("a", DataType::Float64, false)])); + let table_schema = TableSchema::new(Arc::clone(&file_schema), vec![]); + let file_source = Arc::new(InexactSortPushdownSource::new(table_schema)); + + let file_groups = vec![FileGroup::new(vec![ + make_file_with_stats("file2", 10.0, 19.0), + make_file_with_stats("file1", 0.0, 9.0), + ])]; + + let sort_expr = PhysicalSortExpr::new_default(Arc::new(Column::new("a", 0))); + let config = + FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), file_source) + .with_file_groups(file_groups) + .build(); + + let result = config.try_pushdown_sort(&[sort_expr])?; + let SortOrderPushdownResult::Inexact { inner } = result else { + panic!("Expected Inexact result"); + }; + let pushed_config = inner + .as_any() + .downcast_ref::() + .expect("Expected FileScanConfig"); + let files = pushed_config.file_groups[0].files(); + assert_eq!(files[0].object_meta.location.as_ref(), "file1"); + assert_eq!(files[1].object_meta.location.as_ref(), "file2"); + assert!(pushed_config.output_ordering.is_empty()); + Ok(()) + } + + #[test] + fn sort_pushdown_exact_multi_group_redistributes_consecutively() -> Result<()> { + // ExactSortPushdownSource + 4 non-overlapping files in 2 interleaved groups + // → files should be redistributed so groups are consecutive and ordered + let file_schema = + Arc::new(Schema::new(vec![Field::new("a", DataType::Float64, false)])); + let table_schema = TableSchema::new(Arc::clone(&file_schema), vec![]); + let file_source = Arc::new(ExactSortPushdownSource::new(table_schema)); + + let sort_expr = PhysicalSortExpr::new_default(Arc::new(Column::new("a", 0))); + + // 2 groups with interleaved ranges (simulating bin-packing result): + // Group 0: [file_01(0-9), file_03(20-29)] + // Group 1: [file_02(10-19), file_04(30-39)] + let file_groups = vec![ + FileGroup::new(vec![ + make_file_with_stats("file_01", 0.0, 9.0), + make_file_with_stats("file_03", 20.0, 29.0), + ]), + FileGroup::new(vec![ + make_file_with_stats("file_02", 10.0, 19.0), + make_file_with_stats("file_04", 30.0, 39.0), + ]), + ]; + + let config = + FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), file_source) + .with_file_groups(file_groups) + .with_output_ordering(vec![ + LexOrdering::new(vec![sort_expr.clone()]).unwrap(), + ]) + .build(); + + let result = config.try_pushdown_sort(&[sort_expr])?; + let SortOrderPushdownResult::Exact { inner } = result else { + panic!("Expected Exact result, got {result:?}"); + }; + let pushed_config = inner + .as_any() + .downcast_ref::() + .expect("Expected FileScanConfig"); + + // Should still have 2 groups (preserving parallelism) + assert_eq!(pushed_config.file_groups.len(), 2); + + // Group 0 should have consecutive files [file_01, file_02] + let files0 = pushed_config.file_groups[0].files(); + assert_eq!(files0[0].object_meta.location.as_ref(), "file_01"); + assert_eq!(files0[1].object_meta.location.as_ref(), "file_02"); + + // Group 1 should have consecutive files [file_03, file_04] + let files1 = pushed_config.file_groups[1].files(); + assert_eq!(files1[0].object_meta.location.as_ref(), "file_03"); + assert_eq!(files1[1].object_meta.location.as_ref(), "file_04"); + + // output_ordering preserved + assert!(!pushed_config.output_ordering.is_empty()); + Ok(()) + } + + #[test] + fn sort_pushdown_exact_multi_group_uneven_distribution() -> Result<()> { + // 5 files across 2 groups → group 0 gets 3 files, group 1 gets 2 + let file_schema = + Arc::new(Schema::new(vec![Field::new("a", DataType::Float64, false)])); + let table_schema = TableSchema::new(Arc::clone(&file_schema), vec![]); + let file_source = Arc::new(ExactSortPushdownSource::new(table_schema)); + + let sort_expr = PhysicalSortExpr::new_default(Arc::new(Column::new("a", 0))); + + let file_groups = vec![ + FileGroup::new(vec![ + make_file_with_stats("f1", 0.0, 9.0), + make_file_with_stats("f3", 20.0, 29.0), + make_file_with_stats("f5", 40.0, 49.0), + ]), + FileGroup::new(vec![ + make_file_with_stats("f2", 10.0, 19.0), + make_file_with_stats("f4", 30.0, 39.0), + ]), + ]; + + let config = + FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), file_source) + .with_file_groups(file_groups) + .with_output_ordering(vec![ + LexOrdering::new(vec![sort_expr.clone()]).unwrap(), + ]) + .build(); + + let result = config.try_pushdown_sort(&[sort_expr])?; + let SortOrderPushdownResult::Exact { inner } = result else { + panic!("Expected Exact result, got {result:?}"); + }; + let pushed_config = inner + .as_any() + .downcast_ref::() + .expect("Expected FileScanConfig"); + + assert_eq!(pushed_config.file_groups.len(), 2); + // Group 0: 3 files (5/2 = 2 base + 1 remainder) + let files0 = pushed_config.file_groups[0].files(); + assert_eq!(files0.len(), 3); + assert_eq!(files0[0].object_meta.location.as_ref(), "f1"); + assert_eq!(files0[1].object_meta.location.as_ref(), "f2"); + assert_eq!(files0[2].object_meta.location.as_ref(), "f3"); + // Group 1: 2 files + let files1 = pushed_config.file_groups[1].files(); + assert_eq!(files1.len(), 2); + assert_eq!(files1[0].object_meta.location.as_ref(), "f4"); + assert_eq!(files1[1].object_meta.location.as_ref(), "f5"); + Ok(()) + } } diff --git a/datafusion/physical-optimizer/src/pushdown_sort.rs b/datafusion/physical-optimizer/src/pushdown_sort.rs index 20c53ef31645f..cce895371f988 100644 --- a/datafusion/physical-optimizer/src/pushdown_sort.rs +++ b/datafusion/physical-optimizer/src/pushdown_sort.rs @@ -35,19 +35,21 @@ //! - `Inexact`: Keep Sort but use optimized input (enables early termination for TopK) //! - `Unsupported`: No change //! -//! ## Current capabilities (Phase 1) +//! ## Capabilities //! -//! - Reverse scan optimization: when required sort is the reverse of the data source's +//! - **Sort elimination**: when data source's natural ordering already satisfies the +//! request (e.g., Parquet files with matching `WITH ORDER`), return `Exact` and +//! remove the `SortExec` entirely +//! - **Reverse scan optimization**: when required sort is the reverse of the data source's //! natural ordering, enable reverse scanning (reading row groups in reverse order) -//! - Supports prefix matching: if data has ordering [A DESC, B ASC] and query needs -//! [A ASC], reversing gives [A ASC, B DESC] which satisfies the requirement +//! - **Statistics-based file reordering**: sort files within each group by their min/max +//! statistics to approximate the requested order, improving TopK and limit performance +//! - **Non-overlapping detection**: when files have non-overlapping ranges and matching +//! within-file ordering, the combined scan is `Exact` (sort eliminated) +//! - **Prefix matching**: if data has ordering [A DESC, B ASC] and query needs +//! [A DESC], the existing ordering satisfies the requirement //! -//! TODO Issue: -//! ## Future enhancements (Phase 2), -//! -//! - File reordering based on statistics -//! - Return `Exact` when files are known to be perfectly sorted -//! - Complete Sort elimination when ordering is guaranteed +//! Related issue: use crate::PhysicalOptimizerRule; use datafusion_common::Result; From e2ac2283ff811a79414f0b28e55c028040c76485 Mon Sep 17 00:00:00 2001 From: Qi Zhu <821684824@qq.com> Date: Fri, 27 Mar 2026 00:18:13 +0800 Subject: [PATCH 02/17] address review: remove unnecessary stats computation in reverse path, improve docs - Remove dead stats computation in reverse_file_groups branch (reverse path is always Inexact, so all_non_overlapping is unused) - Add reverse prefix matching documentation to pushdown_sort module Co-Authored-By: Claude Opus 4.6 (1M context) --- datafusion/datasource/src/file_scan_config.rs | 23 ++++--------------- .../physical-optimizer/src/pushdown_sort.rs | 4 +++- 2 files changed, 8 insertions(+), 19 deletions(-) diff --git a/datafusion/datasource/src/file_scan_config.rs b/datafusion/datasource/src/file_scan_config.rs index a8de21348689d..7607204eb4365 100644 --- a/datafusion/datasource/src/file_scan_config.rs +++ b/datafusion/datasource/src/file_scan_config.rs @@ -1409,24 +1409,11 @@ impl FileScanConfig { false } } else { - // When reversing, check if reversed groups are non-overlapping - if let Some(sort_order) = LexOrdering::new(order.iter().cloned()) { - let projected_schema = new_config.projected_schema()?; - let projection_indices = new_config - .file_source - .projection() - .as_ref() - .and_then(|p| ordered_column_indices_from_projection(p)); - let result = Self::sort_files_within_groups_by_statistics( - &new_config.file_groups, - &sort_order, - &projected_schema, - projection_indices.as_deref(), - ); - result.all_non_overlapping - } else { - false - } + // When reversing, files are already reversed above. We skip + // statistics-based sorting here because it would undo the reversal. + // Note: reverse path is always Inexact, so all_non_overlapping + // is not used (is_exact is false). + false }; if is_exact && all_non_overlapping { diff --git a/datafusion/physical-optimizer/src/pushdown_sort.rs b/datafusion/physical-optimizer/src/pushdown_sort.rs index cce895371f988..5a201ac2d62f9 100644 --- a/datafusion/physical-optimizer/src/pushdown_sort.rs +++ b/datafusion/physical-optimizer/src/pushdown_sort.rs @@ -47,7 +47,9 @@ //! - **Non-overlapping detection**: when files have non-overlapping ranges and matching //! within-file ordering, the combined scan is `Exact` (sort eliminated) //! - **Prefix matching**: if data has ordering [A DESC, B ASC] and query needs -//! [A DESC], the existing ordering satisfies the requirement +//! [A DESC], the existing ordering satisfies the requirement (`Exact`). +//! If the query needs [A ASC] (reverse of the prefix), a reverse scan is +//! used (`Inexact`, `SortExec` retained) //! //! Related issue: From 21da48f726357f3cd6581f9a0237a82bc9161600 Mon Sep 17 00:00:00 2001 From: Qi Zhu <821684824@qq.com> Date: Fri, 27 Mar 2026 00:19:46 +0800 Subject: [PATCH 03/17] add test: reverse path preserves file order, does not apply stats sorting Co-Authored-By: Claude Opus 4.6 (1M context) --- datafusion/datasource/src/file_scan_config.rs | 47 +++++++++++++++++++ 1 file changed, 47 insertions(+) diff --git a/datafusion/datasource/src/file_scan_config.rs b/datafusion/datasource/src/file_scan_config.rs index 7607204eb4365..95f45c48fc89c 100644 --- a/datafusion/datasource/src/file_scan_config.rs +++ b/datafusion/datasource/src/file_scan_config.rs @@ -3456,4 +3456,51 @@ mod tests { assert_eq!(files1[1].object_meta.location.as_ref(), "f5"); Ok(()) } + + #[test] + fn sort_pushdown_reverse_preserves_file_order_with_stats() -> Result<()> { + // Reverse scan should reverse file order but NOT apply statistics-based + // sorting (which would undo the reversal). The result is Inexact. + let file_schema = + Arc::new(Schema::new(vec![Field::new("a", DataType::Float64, false)])); + let table_schema = TableSchema::new(Arc::clone(&file_schema), vec![]); + let file_source = Arc::new(InexactSortPushdownSource::new(table_schema)); + + let sort_expr = PhysicalSortExpr::new_default(Arc::new(Column::new("a", 0))); + + // Files with stats, in ASC order. Output ordering is [a ASC]. + let file_groups = vec![FileGroup::new(vec![ + make_file_with_stats("file1", 0.0, 9.0), + make_file_with_stats("file2", 10.0, 19.0), + make_file_with_stats("file3", 20.0, 30.0), + ])]; + + let config = + FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), file_source) + .with_file_groups(file_groups) + .with_output_ordering(vec![ + LexOrdering::new(vec![sort_expr.clone()]).unwrap(), + ]) + .build(); + + // Request DESC → reverse path + let result = config.try_pushdown_sort(&[sort_expr.reverse()])?; + let SortOrderPushdownResult::Inexact { inner } = result else { + panic!("Expected Inexact for reverse scan, got {result:?}"); + }; + let pushed_config = inner + .as_any() + .downcast_ref::() + .expect("Expected FileScanConfig"); + + // Files should be reversed (not re-sorted by stats) + let files = pushed_config.file_groups[0].files(); + assert_eq!(files[0].object_meta.location.as_ref(), "file3"); + assert_eq!(files[1].object_meta.location.as_ref(), "file2"); + assert_eq!(files[2].object_meta.location.as_ref(), "file1"); + + // output_ordering cleared (Inexact) + assert!(pushed_config.output_ordering.is_empty()); + Ok(()) + } } From 25938bb0812f9ca03e4069348a85faa8150659df Mon Sep 17 00:00:00 2001 From: Qi Zhu <821684824@qq.com> Date: Fri, 27 Mar 2026 17:10:27 +0800 Subject: [PATCH 04/17] restore doc comment with ASCII diagrams for get_projected_output_ordering Co-Authored-By: Claude Opus 4.6 (1M context) --- datafusion/datasource/src/file_scan_config.rs | 59 +++++++++++++++++++ 1 file changed, 59 insertions(+) diff --git a/datafusion/datasource/src/file_scan_config.rs b/datafusion/datasource/src/file_scan_config.rs index 95f45c48fc89c..27c8b1738123c 100644 --- a/datafusion/datasource/src/file_scan_config.rs +++ b/datafusion/datasource/src/file_scan_config.rs @@ -1764,6 +1764,65 @@ fn validate_orderings( .collect() } +/// The various listing tables does not attempt to read all files +/// concurrently, instead they will read files in sequence within a +/// partition. This is an important property as it allows plans to +/// run against 1000s of files and not try to open them all +/// concurrently. +/// +/// However, it means if we assign more than one file to a partition +/// the output sort order will not be preserved as illustrated in the +/// following diagrams: +/// +/// When only 1 file is assigned to each partition, each partition is +/// correctly sorted on `(A, B, C)` +/// +/// ```text +/// ┏ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ┓ +/// ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐ ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┌ ─ ─ ─ ─ ─ ─ ─ ─ ┐ +/// ┃ ┌───────────────┐ ┌──────────────┐ │ ┌──────────────┐ │ ┌─────────────┐ ┃ +/// │ │ 1.parquet │ │ │ │ 2.parquet │ │ │ 3.parquet │ │ │ 4.parquet │ │ +/// ┃ │ Sort: A, B, C │ │Sort: A, B, C │ │ │Sort: A, B, C │ │ │Sort: A, B, C│ ┃ +/// │ └───────────────┘ │ │ └──────────────┘ │ └──────────────┘ │ └─────────────┘ │ +/// ┃ │ │ ┃ +/// │ │ │ │ │ │ +/// ┃ │ │ ┃ +/// │ │ │ │ │ │ +/// ┃ │ │ ┃ +/// │ │ │ │ │ │ +/// ┃ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┃ +/// DataFusion DataFusion DataFusion DataFusion +/// ┃ Partition 1 Partition 2 Partition 3 Partition 4 ┃ +/// ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ +/// +/// DataSourceExec +/// ``` +/// +/// However, when more than 1 file is assigned to each partition, each +/// partition is NOT correctly sorted on `(A, B, C)`. Once the second +/// file is scanned, the same values for A, B and C can be repeated in +/// the same sorted stream +/// +///```text +/// ┏ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ +/// ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐ ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┃ +/// ┃ ┌───────────────┐ ┌──────────────┐ │ +/// │ │ 1.parquet │ │ │ │ 2.parquet │ ┃ +/// ┃ │ Sort: A, B, C │ │Sort: A, B, C │ │ +/// │ └───────────────┘ │ │ └──────────────┘ ┃ +/// ┃ ┌───────────────┐ ┌──────────────┐ │ +/// │ │ 3.parquet │ │ │ │ 4.parquet │ ┃ +/// ┃ │ Sort: A, B, C │ │Sort: A, B, C │ │ +/// │ └───────────────┘ │ │ └──────────────┘ ┃ +/// ┃ │ +/// │ │ │ ┃ +/// ┃ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘ +/// DataFusion DataFusion ┃ +/// ┃ Partition 1 Partition 2 +/// ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ┛ +/// +/// DataSourceExec +/// ``` fn get_projected_output_ordering( base_config: &FileScanConfig, projected_schema: &SchemaRef, From 29c526bb153cace4e97e4722088d089e3e8c8645 Mon Sep 17 00:00:00 2001 From: Qi Zhu <821684824@qq.com> Date: Fri, 27 Mar 2026 17:11:28 +0800 Subject: [PATCH 05/17] docs: add non-overlapping exception to partition ordering diagram The existing doc comment explains that multi-file partitions break output ordering. Add a note about the exception: when sort pushdown verifies files are non-overlapping via statistics, output_ordering is preserved and SortExec can be eliminated. Co-Authored-By: Claude Opus 4.6 (1M context) --- datafusion/datasource/src/file_scan_config.rs | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/datafusion/datasource/src/file_scan_config.rs b/datafusion/datasource/src/file_scan_config.rs index 27c8b1738123c..06be269ff2abf 100644 --- a/datafusion/datasource/src/file_scan_config.rs +++ b/datafusion/datasource/src/file_scan_config.rs @@ -1823,6 +1823,22 @@ fn validate_orderings( /// /// DataSourceExec /// ``` +/// +/// **Exception**: When files within a partition are **non-overlapping** (verified +/// via min/max statistics) and each file is internally sorted, the combined +/// output is still correctly sorted. Sort pushdown +/// ([`FileScanConfig::try_pushdown_sort`]) detects this case and preserves +/// `output_ordering`, allowing `SortExec` to be eliminated entirely. +/// +/// ```text +/// Partition 1 (files sorted by stats, non-overlapping): +/// ┌──────────────────┐ ┌──────────────────┐ ┌──────────────────┐ +/// │ 1.parquet │ │ 2.parquet │ │ 3.parquet │ +/// │ A: [1..100] │ │ A: [101..200] │ │ A: [201..300] │ +/// │ Sort: A, B, C │ │ Sort: A, B, C │ │ Sort: A, B, C │ +/// └──────────────────┘ └──────────────────┘ └──────────────────┘ +/// max(1) <= min(2) ✓ max(2) <= min(3) ✓ → output_ordering preserved +/// ``` fn get_projected_output_ordering( base_config: &FileScanConfig, projected_schema: &SchemaRef, From aa290d40eca3a2a1b5b6ab3c4d5b261ec85ec434 Mon Sep 17 00:00:00 2001 From: Qi Zhu <821684824@qq.com> Date: Sat, 28 Mar 2026 11:51:58 +0800 Subject: [PATCH 06/17] remove redistribute_files_across_groups_by_statistics Redistributing files consecutively across groups makes SPM read all of partition 0 before starting partition 1 (since all values in group 0 < group 1), degrading multi-partition execution to single-threaded sequential I/O. The interleaved assignment from planning-phase bin-packing is actually beneficial: SPM alternates pulling from both partitions, keeping parallel I/O active on both simultaneously. Co-Authored-By: Claude Opus 4.6 (1M context) --- datafusion/datasource/src/file_scan_config.rs | 214 +++--------------- .../sqllogictest/test_files/sort_pushdown.slt | 8 +- 2 files changed, 36 insertions(+), 186 deletions(-) diff --git a/datafusion/datasource/src/file_scan_config.rs b/datafusion/datasource/src/file_scan_config.rs index 06be269ff2abf..b9aceade26293 100644 --- a/datafusion/datasource/src/file_scan_config.rs +++ b/datafusion/datasource/src/file_scan_config.rs @@ -920,7 +920,6 @@ impl DataSource for FileScanConfig { /// │ │ rebuild_with_source(exact=true) /// │ │ ├─ sort files by stats within groups /// │ │ ├─ verify non-overlapping - /// │ │ ├─ redistribute across groups (consecutive) /// │ │ └─► keep output_ordering → SortExec removed /// │ │ /// │ ├─ reversed ordering matches? ──► Inexact @@ -1329,8 +1328,7 @@ impl FileScanConfig { /// │ 1. Reverse file groups (if DESC matches reversed ordering) │ /// │ 2. Sort files within groups by min/max statistics │ /// │ 3. If Exact + non-overlapping: │ - /// │ a. Redistribute files consecutively across groups │ - /// │ b. Keep output_ordering → SortExec eliminated │ + /// │ Keep output_ordering → SortExec eliminated │ /// │ Otherwise: clear output_ordering → SortExec stays │ /// └─────────────────────────────────────────────────────────────┘ /// ``` @@ -1344,14 +1342,6 @@ impl FileScanConfig { /// Even when files overlap (Inexact), statistics-based ordering helps /// TopK/LIMIT queries: reading low-value files first lets dynamic filters /// prune high-value files earlier. - /// - /// # Why redistribute across groups? - /// - /// `split_groups_by_statistics` uses bin-packing to balance group sizes, - /// which can interleave file ranges across groups. We fix this by - /// assigning consecutive files to consecutive groups, so groups are - /// ordered relative to each other. This preserves parallel I/O while - /// ensuring SPM's merge is a cheap sequential read. fn rebuild_with_source( &self, new_file_source: Arc, @@ -1418,40 +1408,28 @@ impl FileScanConfig { if is_exact && all_non_overlapping { // Truly exact: within-file ordering guaranteed and files are non-overlapping. + // Keep output_ordering so SortExec can be eliminated for each partition. // - // When there are multiple groups, redistribute files using consecutive - // assignment so that each group remains non-overlapping AND groups are - // ordered relative to each other. This enables: - // - No SortExec per partition (files in each group are sorted & non-overlapping) - // - SPM cheaply merges ordered streams (O(n) merge) - // - Parallel I/O across partitions + // We intentionally do NOT redistribute files across groups here. + // The planning-phase bin-packing may interleave file ranges across groups: // - // Before (bin-packing may interleave): - // Group 0: [file_01(1-10), file_03(21-30)] ← gap, interleaved with group 1 - // Group 1: [file_02(11-20), file_04(31-40)] + // Group 0: [f1(1-10), f3(21-30)] ← interleaved with group 1 + // Group 1: [f2(11-20), f4(31-40)] // - // After (consecutive assignment): - // Group 0: [file_01(1-10), file_02(11-20)] ← consecutive, ordered - // Group 1: [file_03(21-30), file_04(31-40)] ← consecutive, ordered - if new_config.file_groups.len() > 1 - && let Some(sort_order) = LexOrdering::new(order.iter().cloned()) - { - let projected_schema = new_config.projected_schema()?; - let projection_indices = new_config - .file_source - .projection() - .as_ref() - .and_then(|p| ordered_column_indices_from_projection(p)); - let num_groups = new_config.file_groups.len(); - new_config.file_groups = - Self::redistribute_files_across_groups_by_statistics( - &new_config.file_groups, - &sort_order, - &projected_schema, - projection_indices.as_deref(), - num_groups, - ); - } + // This interleaving is actually beneficial because SPM pulls from both + // partitions concurrently, keeping parallel I/O active: + // + // SPM: pull P0 [1-10] → pull P1 [11-20] → pull P0 [21-30] → pull P1 [31-40] + // ^^^^^^^^^^^^ ^^^^^^^^^^^^ + // both partitions scanning files simultaneously + // + // If we were to redistribute files consecutively: + // Group 0: [f1(1-10), f2(11-20)] ← all values < group 1 + // Group 1: [f3(21-30), f4(31-40)] + // + // SPM would read ALL of group 0 first (values always smaller), then group 1. + // This degrades to single-threaded sequential I/O — the other partition + // sits idle the entire time, losing the parallelism benefit. } else { new_config.output_ordering = vec![]; } @@ -1547,82 +1525,6 @@ impl FileScanConfig { } } - /// Redistribute files across groups using consecutive assignment. - /// - /// `split_groups_by_statistics` uses bin-packing which balances group sizes - /// but can interleave file ranges. This method fixes that by assigning - /// consecutive sorted files to consecutive groups: - /// - /// ```text - /// Input (bin-packed, interleaved): - /// Group 0: [f1(0-9), f3(20-29)] max(f1)=9 but f3=20 > Group1.f2=10 - /// Group 1: [f2(10-19), f4(30-39)] groups overlap! - /// - /// After global sort + consecutive assignment: - /// Group 0: [f1(0-9), f2(10-19)] max=19 - /// Group 1: [f3(20-29), f4(30-39)] min=20 > 19 ✓ groups are ordered! - /// - /// Resulting plan: - /// SPM [col ASC] ← O(n) merge, reads group 0 then group 1 - /// DataSourceExec [f1, f2] ← parallel I/O, no SortExec - /// DataSourceExec [f3, f4] ← parallel I/O, no SortExec - /// ``` - /// - /// Falls back to the original groups if statistics are unavailable. - fn redistribute_files_across_groups_by_statistics( - file_groups: &[FileGroup], - sort_order: &LexOrdering, - projected_schema: &SchemaRef, - projection_indices: Option<&[usize]>, - num_groups: usize, - ) -> Vec { - if num_groups <= 1 { - return file_groups.to_vec(); - } - - // Flatten all files - let all_files: Vec<_> = file_groups.iter().flat_map(|g| g.iter()).collect(); - if all_files.is_empty() { - return file_groups.to_vec(); - } - - // Sort globally by statistics - let statistics = match MinMaxStatistics::new_from_files( - sort_order, - projected_schema, - projection_indices, - all_files.iter().copied(), - ) { - Ok(stats) => stats, - Err(_) => return file_groups.to_vec(), - }; - - let sorted_indices = statistics.min_values_sorted(); - - // Assign consecutive files to groups - let total = sorted_indices.len(); - let base_size = total / num_groups; - let remainder = total % num_groups; - - let mut new_groups = Vec::with_capacity(num_groups); - let mut offset = 0; - for i in 0..num_groups { - // First `remainder` groups get one extra file - let group_size = base_size + if i < remainder { 1 } else { 0 }; - if group_size == 0 { - continue; - } - let group: FileGroup = sorted_indices[offset..offset + group_size] - .iter() - .map(|(idx, _)| all_files[*idx].clone()) - .collect(); - new_groups.push(group); - offset += group_size; - } - - new_groups - } - /// Last-resort optimization when FileSource returns `Unsupported`. /// /// Even without within-file ordering guarantees, reordering files by @@ -3419,9 +3321,12 @@ mod tests { } #[test] - fn sort_pushdown_exact_multi_group_redistributes_consecutively() -> Result<()> { - // ExactSortPushdownSource + 4 non-overlapping files in 2 interleaved groups - // → files should be redistributed so groups are consecutive and ordered + fn sort_pushdown_exact_multi_group_preserves_parallelism() -> Result<()> { + // ExactSortPushdownSource + 4 non-overlapping files in 2 interleaved groups. + // Groups should NOT be redistributed — interleaved groups allow SPM to + // pull from both partitions concurrently, keeping parallel I/O active. + // Redistributing consecutively would make SPM read one partition at a + // time (all values in group 0 < group 1), degrading to single-threaded I/O. let file_schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Float64, false)])); let table_schema = TableSchema::new(Arc::clone(&file_schema), vec![]); @@ -3460,78 +3365,23 @@ mod tests { .downcast_ref::() .expect("Expected FileScanConfig"); - // Should still have 2 groups (preserving parallelism) + // 2 groups preserved (parallelism maintained) assert_eq!(pushed_config.file_groups.len(), 2); - // Group 0 should have consecutive files [file_01, file_02] + // Files within each group are sorted by stats, but groups are NOT + // redistributed — interleaved assignment from bin-packing is kept let files0 = pushed_config.file_groups[0].files(); assert_eq!(files0[0].object_meta.location.as_ref(), "file_01"); - assert_eq!(files0[1].object_meta.location.as_ref(), "file_02"); - - // Group 1 should have consecutive files [file_03, file_04] + assert_eq!(files0[1].object_meta.location.as_ref(), "file_03"); let files1 = pushed_config.file_groups[1].files(); - assert_eq!(files1[0].object_meta.location.as_ref(), "file_03"); + assert_eq!(files1[0].object_meta.location.as_ref(), "file_02"); assert_eq!(files1[1].object_meta.location.as_ref(), "file_04"); - // output_ordering preserved + // output_ordering preserved (Exact, each group internally non-overlapping) assert!(!pushed_config.output_ordering.is_empty()); Ok(()) } - #[test] - fn sort_pushdown_exact_multi_group_uneven_distribution() -> Result<()> { - // 5 files across 2 groups → group 0 gets 3 files, group 1 gets 2 - let file_schema = - Arc::new(Schema::new(vec![Field::new("a", DataType::Float64, false)])); - let table_schema = TableSchema::new(Arc::clone(&file_schema), vec![]); - let file_source = Arc::new(ExactSortPushdownSource::new(table_schema)); - - let sort_expr = PhysicalSortExpr::new_default(Arc::new(Column::new("a", 0))); - - let file_groups = vec![ - FileGroup::new(vec![ - make_file_with_stats("f1", 0.0, 9.0), - make_file_with_stats("f3", 20.0, 29.0), - make_file_with_stats("f5", 40.0, 49.0), - ]), - FileGroup::new(vec![ - make_file_with_stats("f2", 10.0, 19.0), - make_file_with_stats("f4", 30.0, 39.0), - ]), - ]; - - let config = - FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), file_source) - .with_file_groups(file_groups) - .with_output_ordering(vec![ - LexOrdering::new(vec![sort_expr.clone()]).unwrap(), - ]) - .build(); - - let result = config.try_pushdown_sort(&[sort_expr])?; - let SortOrderPushdownResult::Exact { inner } = result else { - panic!("Expected Exact result, got {result:?}"); - }; - let pushed_config = inner - .as_any() - .downcast_ref::() - .expect("Expected FileScanConfig"); - - assert_eq!(pushed_config.file_groups.len(), 2); - // Group 0: 3 files (5/2 = 2 base + 1 remainder) - let files0 = pushed_config.file_groups[0].files(); - assert_eq!(files0.len(), 3); - assert_eq!(files0[0].object_meta.location.as_ref(), "f1"); - assert_eq!(files0[1].object_meta.location.as_ref(), "f2"); - assert_eq!(files0[2].object_meta.location.as_ref(), "f3"); - // Group 1: 2 files - let files1 = pushed_config.file_groups[1].files(); - assert_eq!(files1.len(), 2); - assert_eq!(files1[0].object_meta.location.as_ref(), "f4"); - assert_eq!(files1[1].object_meta.location.as_ref(), "f5"); - Ok(()) - } - #[test] fn sort_pushdown_reverse_preserves_file_order_with_stats() -> Result<()> { // Reverse scan should reverse file order but NOT apply statistics-based diff --git a/datafusion/sqllogictest/test_files/sort_pushdown.slt b/datafusion/sqllogictest/test_files/sort_pushdown.slt index c6d5df0d0dfee..3ed45eb03760c 100644 --- a/datafusion/sqllogictest/test_files/sort_pushdown.slt +++ b/datafusion/sqllogictest/test_files/sort_pushdown.slt @@ -1109,7 +1109,7 @@ logical_plan 02)--TableScan: reversed_parquet projection=[id, value] physical_plan 01)SortExec: expr=[id@0 ASC NULLS LAST], preserve_partitioning=[false] -02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/reversed/a_high.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/reversed/b_mid.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/reversed/c_low.parquet]]}, projection=[id, value], file_type=parquet +02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/reversed/c_low.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/reversed/b_mid.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/reversed/a_high.parquet]]}, projection=[id, value], file_type=parquet # Test 4.2: Results must be correct query II @@ -1193,7 +1193,7 @@ logical_plan 02)--TableScan: reversed_with_order_parquet projection=[id, value] physical_plan 01)SortExec: expr=[id@0 ASC NULLS LAST], preserve_partitioning=[false] -02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/reversed/a_high.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/reversed/b_mid.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/reversed/c_low.parquet]]}, projection=[id, value], file_type=parquet +02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/reversed/c_low.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/reversed/b_mid.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/reversed/a_high.parquet]]}, projection=[id, value], file_type=parquet # Test 6.2: Results must be correct query II @@ -1332,7 +1332,7 @@ logical_plan 02)--TableScan: desc_reversed_parquet projection=[id, value] physical_plan 01)SortExec: expr=[id@0 DESC], preserve_partitioning=[false] -02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/desc_reversed/a_low.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/desc_reversed/b_high.parquet]]}, projection=[id, value], file_type=parquet +02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/desc_reversed/b_high.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/desc_reversed/a_low.parquet]]}, projection=[id, value], file_type=parquet # Test 8.2: Results must be correct query II @@ -1775,7 +1775,7 @@ logical_plan 02)--TableScan: tb_overlap projection=[id, value] physical_plan 01)SortExec: TopK(fetch=5), expr=[id@0 DESC, value@1 DESC], preserve_partitioning=[false] -02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/tb_overlap/file_x.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/tb_overlap/file_y.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/tb_overlap/file_z.parquet]]}, projection=[id, value], file_type=parquet, predicate=DynamicFilter [ empty ] +02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/tb_overlap/file_z.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/tb_overlap/file_y.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/tb_overlap/file_x.parquet]]}, projection=[id, value], file_type=parquet, predicate=DynamicFilter [ empty ] query II SELECT * FROM tb_overlap ORDER BY id DESC, value DESC LIMIT 5; From b3ed744039134da86366c5fd1b3b7864d761db1f Mon Sep 17 00:00:00 2001 From: Qi Zhu <821684824@qq.com> Date: Tue, 31 Mar 2026 10:38:31 +0800 Subject: [PATCH 07/17] fix: re-check ordering after statistics-based file sorting When FileSource returns Unsupported (because validated_output_ordering stripped the ordering due to wrong file order), sort files by statistics and re-check: if files are now non-overlapping and output_ordering is valid, upgrade to Exact (eliminating SortExec). This handles the key case where files have correct within-file ordering (Parquet sorting_columns metadata) but were listed in wrong order (e.g., alphabetical order doesn't match sort key order). Co-Authored-By: Claude Opus 4.6 (1M context) --- datafusion/datasource/src/file_scan_config.rs | 36 ++++++++++++++++--- .../sqllogictest/test_files/sort_pushdown.slt | 22 +++++------- 2 files changed, 40 insertions(+), 18 deletions(-) diff --git a/datafusion/datasource/src/file_scan_config.rs b/datafusion/datasource/src/file_scan_config.rs index b9aceade26293..99221012eea55 100644 --- a/datafusion/datasource/src/file_scan_config.rs +++ b/datafusion/datasource/src/file_scan_config.rs @@ -1527,12 +1527,21 @@ impl FileScanConfig { /// Last-resort optimization when FileSource returns `Unsupported`. /// - /// Even without within-file ordering guarantees, reordering files by - /// min/max statistics still helps: TopK queries with dynamic filters - /// can prune files earlier when files are read in approximate order. + /// FileSource may return `Unsupported` because `eq_properties` had no + /// ordering — which happens when `validated_output_ordering()` stripped + /// the ordering because files were in the wrong order. After sorting + /// files by statistics, the ordering may become valid again. /// - /// Returns `Inexact` (SortExec stays) since we cannot guarantee ordering. - /// Returns `Unsupported` if no files were actually reordered. + /// This method: + /// 1. Sorts files within groups by min/max statistics + /// 2. Re-checks if the sorted file order makes `output_ordering` valid + /// 3. If valid AND non-overlapping → `Exact` (SortExec eliminated!) + /// 4. If files were reordered but ordering not valid → `Inexact` + /// 5. If no files were reordered → `Unsupported` + /// + /// This handles the key case where files have correct within-file ordering + /// (e.g., Parquet sorting_columns metadata) but were listed in wrong order + /// (e.g., alphabetical order doesn't match sort key order). fn try_sort_file_groups_by_statistics( &self, order: &[PhysicalSortExpr], @@ -1561,6 +1570,23 @@ impl FileScanConfig { let mut new_config = self.clone(); new_config.file_groups = result.file_groups; + + // Re-check: now that files are sorted, does output_ordering become valid? + // This handles the case where validated_output_ordering() previously + // stripped the ordering because files were in the wrong order. + if result.all_non_overlapping && !self.output_ordering.is_empty() { + // Files are now non-overlapping and we have declared output_ordering. + // Re-ask the FileSource if this ordering satisfies the request, + // using eq_properties computed from the NEW (sorted) file groups. + let new_eq_props = new_config.eq_properties(); + if new_eq_props.ordering_satisfy(order.iter().cloned())? { + // The sorted file order makes the ordering valid → Exact! + return Ok(SortOrderPushdownResult::Exact { + inner: Arc::new(new_config), + }); + } + } + new_config.output_ordering = vec![]; Ok(SortOrderPushdownResult::Inexact { inner: Arc::new(new_config), diff --git a/datafusion/sqllogictest/test_files/sort_pushdown.slt b/datafusion/sqllogictest/test_files/sort_pushdown.slt index 3ed45eb03760c..cba95c13e3dcb 100644 --- a/datafusion/sqllogictest/test_files/sort_pushdown.slt +++ b/datafusion/sqllogictest/test_files/sort_pushdown.slt @@ -1100,16 +1100,15 @@ CREATE EXTERNAL TABLE reversed_parquet(id INT, value INT) STORED AS PARQUET LOCATION 'test_files/scratch/sort_pushdown/reversed/'; -# Test 4.1: SortExec must be present because files are not in inter-file order +# Test 4.1: Files sorted by statistics → non-overlapping → SortExec eliminated +# (files reordered from [a_high, b_mid, c_low] to [c_low, b_mid, a_high]) query TT EXPLAIN SELECT * FROM reversed_parquet ORDER BY id ASC; ---- logical_plan 01)Sort: reversed_parquet.id ASC NULLS LAST 02)--TableScan: reversed_parquet projection=[id, value] -physical_plan -01)SortExec: expr=[id@0 ASC NULLS LAST], preserve_partitioning=[false] -02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/reversed/c_low.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/reversed/b_mid.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/reversed/a_high.parquet]]}, projection=[id, value], file_type=parquet +physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/reversed/c_low.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/reversed/b_mid.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/reversed/a_high.parquet]]}, projection=[id, value], output_ordering=[id@0 ASC NULLS LAST], file_type=parquet # Test 4.2: Results must be correct query II @@ -1184,16 +1183,15 @@ STORED AS PARQUET LOCATION 'test_files/scratch/sort_pushdown/reversed/' WITH ORDER (id ASC); -# Test 6.1: SortExec must be present despite WITH ORDER +# Test 6.1: Files sorted by statistics → non-overlapping → SortExec eliminated +# WITH ORDER declared + files reordered to correct order query TT EXPLAIN SELECT * FROM reversed_with_order_parquet ORDER BY id ASC; ---- logical_plan 01)Sort: reversed_with_order_parquet.id ASC NULLS LAST 02)--TableScan: reversed_with_order_parquet projection=[id, value] -physical_plan -01)SortExec: expr=[id@0 ASC NULLS LAST], preserve_partitioning=[false] -02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/reversed/c_low.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/reversed/b_mid.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/reversed/a_high.parquet]]}, projection=[id, value], file_type=parquet +physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/reversed/c_low.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/reversed/b_mid.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/reversed/a_high.parquet]]}, projection=[id, value], output_ordering=[id@0 ASC NULLS LAST], file_type=parquet # Test 6.2: Results must be correct query II @@ -1322,17 +1320,15 @@ STORED AS PARQUET LOCATION 'test_files/scratch/sort_pushdown/desc_reversed/' WITH ORDER (id DESC); -# Test 8.1: SortExec must be present — files are in wrong inter-file DESC order -# (a_low has 1-3, b_high has 7-9; for DESC, b_high should come first) +# Test 8.1: Files sorted by statistics → non-overlapping → SortExec eliminated +# (files reordered: b_high(7-9) before a_low(1-3) for DESC order) query TT EXPLAIN SELECT * FROM desc_reversed_parquet ORDER BY id DESC; ---- logical_plan 01)Sort: desc_reversed_parquet.id DESC NULLS FIRST 02)--TableScan: desc_reversed_parquet projection=[id, value] -physical_plan -01)SortExec: expr=[id@0 DESC], preserve_partitioning=[false] -02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/desc_reversed/b_high.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/desc_reversed/a_low.parquet]]}, projection=[id, value], file_type=parquet +physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/desc_reversed/b_high.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/desc_reversed/a_low.parquet]]}, projection=[id, value], output_ordering=[id@0 DESC], file_type=parquet # Test 8.2: Results must be correct query II From 5718f8ae0eae5123c69a8a1f2c1705f17a601532 Mon Sep 17 00:00:00 2001 From: Qi Zhu <821684824@qq.com> Date: Thu, 2 Apr 2026 15:14:52 +0800 Subject: [PATCH 08/17] fix: preserve fetch (LIMIT) when eliminating SortExec via Exact pushdown MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When PushdownSort returns Exact and removes SortExec, the fetch (LIMIT) from the original SortExec was lost. DataSourceExec then reads all data instead of stopping early. Now passes sort_exec.fetch() to inner.with_fetch() so DataSourceExec gets limit=N and stops reading after N rows. Benchmark impact (6M rows, reversed file names): Q2 ORDER BY LIMIT 100: main 1237ms → 9ms (99% faster) Q4 SELECT * LIMIT 100: main 5971ms → 24ms (99.6% faster) Co-Authored-By: Claude Opus 4.6 (1M context) --- datafusion/physical-optimizer/src/pushdown_sort.rs | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/datafusion/physical-optimizer/src/pushdown_sort.rs b/datafusion/physical-optimizer/src/pushdown_sort.rs index 5a201ac2d62f9..6a3ef9e06a04e 100644 --- a/datafusion/physical-optimizer/src/pushdown_sort.rs +++ b/datafusion/physical-optimizer/src/pushdown_sort.rs @@ -99,7 +99,14 @@ impl PhysicalOptimizerRule for PushdownSort { // Each node type defines its own pushdown behavior via try_pushdown_sort() match sort_input.try_pushdown_sort(required_ordering)? { SortOrderPushdownResult::Exact { inner } => { - // Data source guarantees perfect ordering - remove the Sort operator + // Data source guarantees perfect ordering - remove the Sort operator. + // Preserve the fetch (LIMIT) from the original SortExec so the + // data source can stop reading early. + let inner = if let Some(fetch) = sort_exec.fetch() { + inner.with_fetch(Some(fetch)).unwrap_or(inner) + } else { + inner + }; Ok(Transformed::yes(inner)) } SortOrderPushdownResult::Inexact { inner } => { From 28661d4f03edb244085c45a19189b57d001f6f93 Mon Sep 17 00:00:00 2001 From: Qi Zhu <821684824@qq.com> Date: Fri, 3 Apr 2026 10:23:06 +0800 Subject: [PATCH 09/17] docs: accurate PR description and code comments for sort pushdown MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Update architecture comments and module docs to reflect actual behavior: - Core value: fix wrong file order via statistics sorting → Exact upgrade - EnforceSorting already handles correct file order cases - PushdownSort only triggers when byte-range split hasn't happened - Removed misleading claims about multi-partition optimization Co-Authored-By: Claude Opus 4.6 (1M context) --- datafusion/datasource/src/file_scan_config.rs | 80 +++++++++---------- .../physical-optimizer/src/pushdown_sort.rs | 12 ++- 2 files changed, 42 insertions(+), 50 deletions(-) diff --git a/datafusion/datasource/src/file_scan_config.rs b/datafusion/datasource/src/file_scan_config.rs index 99221012eea55..0ceaef1fd0bdb 100644 --- a/datafusion/datasource/src/file_scan_config.rs +++ b/datafusion/datasource/src/file_scan_config.rs @@ -902,57 +902,51 @@ impl DataSource for FileScanConfig { /// Push sort requirements into file-based data sources. /// - /// # Sort Pushdown Architecture + /// # When does this matter? /// - /// ```text - /// Query: SELECT ... ORDER BY col ASC [LIMIT N] + /// `EnforceSorting` (which runs before `PushdownSort`) already eliminates + /// `SortExec` when `validated_output_ordering()` confirms the file order is + /// correct. However, if files are listed in wrong order (e.g., alphabetical + /// order doesn't match sort key order), `validated_output_ordering()` strips + /// the ordering and `EnforceSorting` cannot help. /// - /// PushdownSort optimizer - /// │ - /// ▼ - /// FileScanConfig::try_pushdown_sort() - /// │ - /// ├─► FileSource::try_pushdown_sort() - /// │ │ - /// │ ├─ natural ordering matches? ──► Exact - /// │ │ (e.g. Parquet WITH ORDER) │ - /// │ │ ▼ - /// │ │ rebuild_with_source(exact=true) - /// │ │ ├─ sort files by stats within groups - /// │ │ ├─ verify non-overlapping - /// │ │ └─► keep output_ordering → SortExec removed - /// │ │ - /// │ ├─ reversed ordering matches? ──► Inexact - /// │ │ (reverse_row_groups=true) │ - /// │ │ ▼ - /// │ │ rebuild_with_source(exact=false) - /// │ │ ├─ sort files by stats - /// │ │ └─► clear output_ordering → SortExec kept - /// │ │ - /// │ └─ neither ──► Unsupported - /// │ - /// └─► try_sort_file_groups_by_statistics() - /// (best-effort: reorder files by min/max stats) - /// └─► Inexact if reordered, Unsupported if already in order - /// ``` + /// This is where `PushdownSort` adds value: it **sorts files by statistics** + /// to fix the ordering, then re-checks — enabling sort elimination even when + /// files were originally in wrong order. /// - /// # Result Plans + /// # Architecture /// /// ```text - /// Exact (single partition): DataSourceExec [files sorted, non-overlapping] - /// Exact (multi partition): SPM ─► DataSourceExec [group 0] | [group 1] - /// Inexact (reverse scan): SortExec ─► DataSourceExec [reverse_row_groups] - /// Inexact (stats reorder): SortExec ─► DataSourceExec [files reordered] + /// PushdownSort optimizer finds SortExec + /// │ + /// ▼ + /// FileScanConfig::try_pushdown_sort() + /// │ + /// ├─► FileSource returns Exact + /// │ (natural ordering already satisfies request) + /// │ → rebuild_with_source: sort files by stats, verify non-overlapping + /// │ → SortExec removed, fetch (LIMIT) pushed to DataSourceExec + /// │ + /// ├─► FileSource returns Inexact + /// │ (reverse_row_groups=true) + /// │ → SortExec kept, scan optimized + /// │ + /// └─► FileSource returns Unsupported + /// (ordering was stripped because files in wrong order) + /// → try_sort_file_groups_by_statistics(): + /// 1. Sort files within groups by min/max statistics + /// 2. Re-check: are files now non-overlapping + ordering valid? + /// YES → upgrade to Exact → SortExec removed + /// NO → Inexact (files reordered but Sort stays) /// ``` /// - /// # Trade-offs + /// # Note on multi-partition plans /// - /// - **Exact + single partition**: No sort, no merge, but sequential I/O only. - /// Best for LIMIT queries (reads minimal data and stops). - /// - **Exact + multi partition**: No sort per partition, cheap O(n) SPM merge, - /// parallel I/O. Best for full scans on large datasets. - /// - **Inexact**: Sort still required, but statistics-aware file ordering helps - /// TopK discard data earlier via dynamic filters. + /// In the default configuration, `EnforceDistribution` byte-range splits + /// files into single-file groups before `PushdownSort` runs. Single-file + /// groups pass `validated_output_ordering()` trivially, so `EnforceSorting` + /// already eliminates `SortExec`. In this case, `PushdownSort` finds no + /// `SortExec` and does nothing. fn try_pushdown_sort( &self, order: &[PhysicalSortExpr], diff --git a/datafusion/physical-optimizer/src/pushdown_sort.rs b/datafusion/physical-optimizer/src/pushdown_sort.rs index 6a3ef9e06a04e..4aa7dcbdf7288 100644 --- a/datafusion/physical-optimizer/src/pushdown_sort.rs +++ b/datafusion/physical-optimizer/src/pushdown_sort.rs @@ -37,15 +37,13 @@ //! //! ## Capabilities //! -//! - **Sort elimination**: when data source's natural ordering already satisfies the -//! request (e.g., Parquet files with matching `WITH ORDER`), return `Exact` and -//! remove the `SortExec` entirely +//! - **Statistics-based file sorting + sort elimination**: when files within a +//! partition are non-overlapping and internally sorted but listed in wrong order, +//! sort them by min/max statistics to fix the ordering. After sorting, the +//! ordering becomes valid and `SortExec` can be removed entirely. Also preserves +//! `fetch` (LIMIT) from the eliminated `SortExec` for early termination. //! - **Reverse scan optimization**: when required sort is the reverse of the data source's //! natural ordering, enable reverse scanning (reading row groups in reverse order) -//! - **Statistics-based file reordering**: sort files within each group by their min/max -//! statistics to approximate the requested order, improving TopK and limit performance -//! - **Non-overlapping detection**: when files have non-overlapping ranges and matching -//! within-file ordering, the combined scan is `Exact` (sort eliminated) //! - **Prefix matching**: if data has ordering [A DESC, B ASC] and query needs //! [A DESC], the existing ordering satisfies the requirement (`Exact`). //! If the query needs [A ASC] (reverse of the prefix), a reverse scan is From 911f0ddb7d7242d3fe2ee062036a5b7cc2918814 Mon Sep 17 00:00:00 2001 From: Qi Zhu <821684824@qq.com> Date: Fri, 3 Apr 2026 10:32:10 +0800 Subject: [PATCH 10/17] docs: fix PR description and code comments to accurately describe sort pushdown behavior --- datafusion/datasource/src/file_scan_config.rs | 38 +++++++------------ .../physical-optimizer/src/pushdown_sort.rs | 11 ++++-- 2 files changed, 20 insertions(+), 29 deletions(-) diff --git a/datafusion/datasource/src/file_scan_config.rs b/datafusion/datasource/src/file_scan_config.rs index 0ceaef1fd0bdb..96a7448a7ac04 100644 --- a/datafusion/datasource/src/file_scan_config.rs +++ b/datafusion/datasource/src/file_scan_config.rs @@ -902,19 +902,15 @@ impl DataSource for FileScanConfig { /// Push sort requirements into file-based data sources. /// - /// # When does this matter? + /// # Sort Pushdown Architecture /// - /// `EnforceSorting` (which runs before `PushdownSort`) already eliminates - /// `SortExec` when `validated_output_ordering()` confirms the file order is - /// correct. However, if files are listed in wrong order (e.g., alphabetical - /// order doesn't match sort key order), `validated_output_ordering()` strips - /// the ordering and `EnforceSorting` cannot help. + /// When a partition (file group) contains multiple files in wrong order, + /// `validated_output_ordering()` strips the ordering and `EnforceSorting` + /// inserts a `SortExec`. This optimizer fixes the file order by sorting + /// files within each group by min/max statistics, enabling sort elimination. /// - /// This is where `PushdownSort` adds value: it **sorts files by statistics** - /// to fix the ordering, then re-checks — enabling sort elimination even when - /// files were originally in wrong order. - /// - /// # Architecture + /// This applies to both single-partition and multi-partition plans — any + /// file group with multiple files in wrong order benefits. /// /// ```text /// PushdownSort optimizer finds SortExec @@ -923,7 +919,7 @@ impl DataSource for FileScanConfig { /// FileScanConfig::try_pushdown_sort() /// │ /// ├─► FileSource returns Exact - /// │ (natural ordering already satisfies request) + /// │ (natural ordering satisfies request) /// │ → rebuild_with_source: sort files by stats, verify non-overlapping /// │ → SortExec removed, fetch (LIMIT) pushed to DataSourceExec /// │ @@ -932,21 +928,13 @@ impl DataSource for FileScanConfig { /// │ → SortExec kept, scan optimized /// │ /// └─► FileSource returns Unsupported - /// (ordering was stripped because files in wrong order) + /// (ordering stripped because files in wrong order) /// → try_sort_file_groups_by_statistics(): - /// 1. Sort files within groups by min/max statistics - /// 2. Re-check: are files now non-overlapping + ordering valid? - /// YES → upgrade to Exact → SortExec removed - /// NO → Inexact (files reordered but Sort stays) + /// 1. Sort files within each group by min/max statistics + /// 2. Re-check: non-overlapping + ordering valid? + /// YES → Exact → SortExec removed + /// NO → Inexact (files reordered, Sort stays) /// ``` - /// - /// # Note on multi-partition plans - /// - /// In the default configuration, `EnforceDistribution` byte-range splits - /// files into single-file groups before `PushdownSort` runs. Single-file - /// groups pass `validated_output_ordering()` trivially, so `EnforceSorting` - /// already eliminates `SortExec`. In this case, `PushdownSort` finds no - /// `SortExec` and does nothing. fn try_pushdown_sort( &self, order: &[PhysicalSortExpr], diff --git a/datafusion/physical-optimizer/src/pushdown_sort.rs b/datafusion/physical-optimizer/src/pushdown_sort.rs index 4aa7dcbdf7288..7aaa510c854f7 100644 --- a/datafusion/physical-optimizer/src/pushdown_sort.rs +++ b/datafusion/physical-optimizer/src/pushdown_sort.rs @@ -37,11 +37,14 @@ //! //! ## Capabilities //! -//! - **Statistics-based file sorting + sort elimination**: when files within a -//! partition are non-overlapping and internally sorted but listed in wrong order, -//! sort them by min/max statistics to fix the ordering. After sorting, the -//! ordering becomes valid and `SortExec` can be removed entirely. Also preserves +//! - **Sort elimination**: when a data source's natural ordering satisfies the +//! request, return `Exact` and remove the `SortExec` entirely. Preserves //! `fetch` (LIMIT) from the eliminated `SortExec` for early termination. +//! - **Statistics-based file sorting**: sort files within each partition by +//! min/max statistics. When files are non-overlapping but listed in wrong +//! order (e.g., alphabetical order ≠ sort key order), this fixes the ordering +//! and enables sort elimination. Works for both single-partition and +//! multi-partition plans with multi-file groups. //! - **Reverse scan optimization**: when required sort is the reverse of the data source's //! natural ordering, enable reverse scanning (reading row groups in reverse order) //! - **Prefix matching**: if data has ordering [A DESC, B ASC] and query needs From 35a581cf7dd9bf2dab780e76abcb31aa39c56d61 Mon Sep 17 00:00:00 2001 From: Qi Zhu <821684824@qq.com> Date: Fri, 3 Apr 2026 14:21:50 +0800 Subject: [PATCH 11/17] perf: increase SortPreservingMergeExec prefetch buffer from 1 to 16 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When SPM reads directly from I/O-bound sources (e.g., DataSourceExec without SortExec buffering), the merge loop stalls waiting for Parquet I/O on each poll. Increasing the prefetch buffer lets background tasks read ahead while the merge processes previous batches. Local benchmark (release, 16 partitions, sort_pushdown_sorted): Q1 full scan: Main 110ms → PR 82ms (1.3x faster) Q3 SELECT *: Main 239ms → PR 228ms (1.05x faster) Q2/Q4 LIMIT: 3-7ms (unchanged, already fast) --- datafusion/physical-plan/src/sorts/sort_preserving_merge.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs b/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs index b77cced35504b..c2a9369c723ee 100644 --- a/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs +++ b/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs @@ -361,7 +361,7 @@ impl ExecutionPlan for SortPreservingMergeExec { .map(|partition| { let stream = self.input.execute(partition, Arc::clone(&context))?; - Ok(spawn_buffered(stream, 1)) + Ok(spawn_buffered(stream, 16)) }) .collect::>()?; From 59e99a4a435c2d6891671e3dca78709133047f34 Mon Sep 17 00:00:00 2001 From: Qi Zhu <821684824@qq.com> Date: Sat, 4 Apr 2026 11:27:29 +0800 Subject: [PATCH 12/17] fix: prevent sort elimination when files have NULLs in sort columns Add null-count check in try_sort_file_groups_by_statistics to avoid claiming Exact when any file contains NULLs. With NULLS LAST/FIRST, eliminating SortExec could produce wrong row ordering across files. Includes unit tests and SLT Test F for NULL handling coverage. --- datafusion/datasource/src/file_scan_config.rs | 156 +++++++++++++++++- .../sqllogictest/test_files/sort_pushdown.slt | 72 ++++++++ 2 files changed, 226 insertions(+), 2 deletions(-) diff --git a/datafusion/datasource/src/file_scan_config.rs b/datafusion/datasource/src/file_scan_config.rs index 96a7448a7ac04..976e1158f5eb7 100644 --- a/datafusion/datasource/src/file_scan_config.rs +++ b/datafusion/datasource/src/file_scan_config.rs @@ -27,6 +27,7 @@ use crate::{ use arrow::datatypes::FieldRef; use arrow::datatypes::{DataType, Schema, SchemaRef}; use datafusion_common::config::ConfigOptions; +use datafusion_common::stats::Precision; use datafusion_common::tree_node::TreeNodeRecursion; use datafusion_common::{ Constraints, Result, ScalarValue, Statistics, internal_datafusion_err, internal_err, @@ -1556,8 +1557,25 @@ impl FileScanConfig { // Re-check: now that files are sorted, does output_ordering become valid? // This handles the case where validated_output_ordering() previously // stripped the ordering because files were in the wrong order. - if result.all_non_overlapping && !self.output_ordering.is_empty() { - // Files are now non-overlapping and we have declared output_ordering. + // + // IMPORTANT: We cannot claim Exact if any file in a non-last position + // contains NULLs in the sort columns. With NULLS LAST, NULLs within + // a file are placed after all non-null values. If the next file has + // non-null values smaller than the previous file's max, those values + // would incorrectly appear after the NULLs. Similarly for NULLS FIRST. + // + // Conservative approach: if any file has nulls in the sort columns, + // do not claim Exact. The SortExec will handle NULL ordering correctly. + if result.all_non_overlapping + && !self.output_ordering.is_empty() + && !Self::any_file_has_nulls_in_sort_columns( + &new_config.file_groups, + order, + &projected_schema, + projection_indices.as_deref(), + ) + { + // Files are now non-overlapping, no NULLs in sort columns. // Re-ask the FileSource if this ordering satisfies the request, // using eq_properties computed from the NEW (sorted) file groups. let new_eq_props = new_config.eq_properties(); @@ -1574,6 +1592,43 @@ impl FileScanConfig { inner: Arc::new(new_config), }) } + + /// Check if any file in any group has nulls in the sort columns. + fn any_file_has_nulls_in_sort_columns( + file_groups: &[FileGroup], + order: &[PhysicalSortExpr], + projected_schema: &SchemaRef, + projection_indices: Option<&[usize]>, + ) -> bool { + let Some(sort_columns) = + sort_columns_from_physical_sort_exprs_nullable(order, projected_schema) + else { + return true; // Can't determine, assume nulls exist + }; + + for group in file_groups { + for file in group.iter() { + let Some(stats) = file.statistics.as_ref() else { + return true; // No stats, assume nulls exist + }; + for col in &sort_columns { + let stat_idx = projection_indices + .map(|p| p[col.index()]) + .unwrap_or_else(|| col.index()); + if stat_idx >= stats.column_statistics.len() { + return true; + } + let col_stats = &stats.column_statistics[stat_idx]; + match &col_stats.null_count { + Precision::Exact(0) => {} // No nulls, safe + Precision::Exact(_) => return true, // Has nulls + _ => return true, // Unknown null count, assume nulls + } + } + } + } + false + } } impl Debug for FileScanConfig { @@ -1629,6 +1684,17 @@ fn ordered_column_indices_from_projection( .collect::>>() } +/// Extract Column references from sort expressions for null checking. +fn sort_columns_from_physical_sort_exprs_nullable( + order: &[PhysicalSortExpr], + _schema: &SchemaRef, +) -> Option> { + order + .iter() + .map(|expr| expr.expr.as_any().downcast_ref::().cloned()) + .collect() +} + /// Check whether a given ordering is valid for all file groups by verifying /// that files within each group are sorted according to their min/max statistics. /// @@ -3436,4 +3502,90 @@ mod tests { assert!(pushed_config.output_ordering.is_empty()); Ok(()) } + + /// Helper: create a PartitionedFile with stats including null count + fn make_file_with_null_stats( + name: &str, + min: f64, + max: f64, + null_count: usize, + ) -> PartitionedFile { + PartitionedFile::new(name.to_string(), 1024).with_statistics(Arc::new( + Statistics { + num_rows: Precision::Exact(100), + total_byte_size: Precision::Exact(1024), + column_statistics: vec![ColumnStatistics { + null_count: Precision::Exact(null_count), + min_value: Precision::Exact(ScalarValue::Float64(Some(min))), + max_value: Precision::Exact(ScalarValue::Float64(Some(max))), + ..Default::default() + }], + }, + )) + } + + #[test] + fn sort_pushdown_unsupported_with_nulls_does_not_upgrade_to_exact() -> Result<()> { + // Files are non-overlapping but one has NULLs. + // Should NOT upgrade to Exact — NULLs would appear in wrong position. + let file_schema = + Arc::new(Schema::new(vec![Field::new("a", DataType::Float64, true)])); + let table_schema = TableSchema::new(Arc::clone(&file_schema), vec![]); + let file_source = Arc::new(MockSource::new(table_schema)); + + let sort_expr = PhysicalSortExpr::new_default(Arc::new(Column::new("a", 0))); + + // Files in wrong order (high min first) to trigger reordering + let file_groups = vec![FileGroup::new(vec![ + make_file_with_null_stats("b_no_nulls", 10.0, 19.0, 0), + make_file_with_null_stats("a_with_nulls", 0.0, 9.0, 5), // has NULLs + ])]; + + let config = + FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), file_source) + .with_file_groups(file_groups) + .with_output_ordering(vec![ + LexOrdering::new(vec![sort_expr.clone()]).unwrap(), + ]) + .build(); + + let result = config.try_pushdown_sort(&[sort_expr])?; + // Should be Inexact (not Exact) because of NULLs + assert!( + matches!(result, SortOrderPushdownResult::Inexact { .. }), + "Expected Inexact due to NULLs, got {result:?}" + ); + Ok(()) + } + + #[test] + fn sort_pushdown_unsupported_no_nulls_upgrades_to_exact() -> Result<()> { + // Files are non-overlapping, no NULLs → should upgrade to Exact + let file_schema = + Arc::new(Schema::new(vec![Field::new("a", DataType::Float64, true)])); + let table_schema = TableSchema::new(Arc::clone(&file_schema), vec![]); + let file_source = Arc::new(MockSource::new(table_schema)); + + let sort_expr = PhysicalSortExpr::new_default(Arc::new(Column::new("a", 0))); + + let file_groups = vec![FileGroup::new(vec![ + make_file_with_null_stats("b_high", 10.0, 19.0, 0), + make_file_with_null_stats("a_low", 0.0, 9.0, 0), + ])]; + + let config = + FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), file_source) + .with_file_groups(file_groups) + .with_output_ordering(vec![ + LexOrdering::new(vec![sort_expr.clone()]).unwrap(), + ]) + .build(); + + let result = config.try_pushdown_sort(&[sort_expr])?; + assert!( + matches!(result, SortOrderPushdownResult::Exact { .. }), + "Expected Exact (no NULLs), got {result:?}" + ); + Ok(()) + } } diff --git a/datafusion/sqllogictest/test_files/sort_pushdown.slt b/datafusion/sqllogictest/test_files/sort_pushdown.slt index cba95c13e3dcb..370bdaff47130 100644 --- a/datafusion/sqllogictest/test_files/sort_pushdown.slt +++ b/datafusion/sqllogictest/test_files/sort_pushdown.slt @@ -2088,6 +2088,78 @@ DROP TABLE te_src_c; statement ok DROP TABLE te_inferred_multi; +# =========================================================== +# Test F: NULL handling — sort pushdown must not eliminate Sort +# when files contain NULLs in sort columns, because NULL ordering +# (NULLS FIRST/LAST) across files requires a full sort. +# =========================================================== + +# Test F.1: NULLS LAST — file with NULL must not cause wrong ordering +statement ok +CREATE TABLE null_src_a(id INT) AS VALUES (1), (NULL); + +statement ok +CREATE TABLE null_src_b(id INT) AS VALUES (2), (3); + +query I +COPY (SELECT * FROM null_src_a ORDER BY id ASC NULLS LAST) +TO 'test_files/scratch/sort_pushdown/tf_nulls/b_null_tail.parquet'; +---- +2 + +query I +COPY (SELECT * FROM null_src_b ORDER BY id ASC NULLS LAST) +TO 'test_files/scratch/sort_pushdown/tf_nulls/a_nonnull.parquet'; +---- +2 + +statement ok +CREATE EXTERNAL TABLE tf_nulls_last(id INT) +STORED AS PARQUET +LOCATION 'test_files/scratch/sort_pushdown/tf_nulls/' +WITH ORDER (id ASC NULLS LAST); + +# With target_partitions=1, files end up in separate groups via +# split_groups_by_statistics. EnforceSorting eliminates SortExec, +# SPM merges the two sorted streams. +query TT +EXPLAIN SELECT * FROM tf_nulls_last ORDER BY id ASC NULLS LAST; +---- +logical_plan +01)Sort: tf_nulls_last.id ASC NULLS LAST +02)--TableScan: tf_nulls_last projection=[id] +physical_plan +01)SortPreservingMergeExec: [id@0 ASC NULLS LAST] +02)--DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/tf_nulls/a_nonnull.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/tf_nulls/b_null_tail.parquet]]}, projection=[id], output_ordering=[id@0 ASC NULLS LAST], file_type=parquet + +# Results must be correct: NULLs at the end +query I +SELECT * FROM tf_nulls_last ORDER BY id ASC NULLS LAST; +---- +1 +2 +3 +NULL + +# Test F.2: NULLS FIRST — NULLs should come first +query I +SELECT * FROM tf_nulls_last ORDER BY id ASC NULLS FIRST; +---- +NULL +1 +2 +3 + +# Cleanup Test F +statement ok +DROP TABLE null_src_a; + +statement ok +DROP TABLE null_src_b; + +statement ok +DROP TABLE tf_nulls_last; + # Reset settings (SLT runner uses target_partitions=4, not system default) statement ok SET datafusion.execution.target_partitions = 4; From c1d0a3395a4077b43338010af367671773a5d1ff Mon Sep 17 00:00:00 2001 From: Qi Zhu <821684824@qq.com> Date: Sat, 4 Apr 2026 11:54:44 +0800 Subject: [PATCH 13/17] fix comments --- datafusion/sqllogictest/test_files/sort_pushdown.slt | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/datafusion/sqllogictest/test_files/sort_pushdown.slt b/datafusion/sqllogictest/test_files/sort_pushdown.slt index 370bdaff47130..8a7df04fae3de 100644 --- a/datafusion/sqllogictest/test_files/sort_pushdown.slt +++ b/datafusion/sqllogictest/test_files/sort_pushdown.slt @@ -1100,7 +1100,8 @@ CREATE EXTERNAL TABLE reversed_parquet(id INT, value INT) STORED AS PARQUET LOCATION 'test_files/scratch/sort_pushdown/reversed/'; -# Test 4.1: Files sorted by statistics → non-overlapping → SortExec eliminated +# Test 4.1: PushdownSort reorders files by min/max statistics so they are +# already in correct sort order → non-overlapping → no SortExec needed. # (files reordered from [a_high, b_mid, c_low] to [c_low, b_mid, a_high]) query TT EXPLAIN SELECT * FROM reversed_parquet ORDER BY id ASC; From 7cadeca0927f5b4cd231f09146f6e0790b58e496 Mon Sep 17 00:00:00 2001 From: Qi Zhu <821684824@qq.com> Date: Sat, 4 Apr 2026 12:58:07 +0800 Subject: [PATCH 14/17] refactor: scope SPM prefetch buffer to sort elimination path only MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add prefetch field to SortPreservingMergeExec (default 1) instead of hardcoding buffer=16 globally. PushdownSort sets prefetch=16 only when eliminating SortExec under SPM, avoiding impact on unrelated queries. Also detect SPM → SortExec(preserve_partitioning) pattern directly in PushdownSort to handle sort elimination and prefetch in one pass. --- .../physical-optimizer/src/pushdown_sort.rs | 58 ++++++++++++++++++- .../src/sorts/sort_preserving_merge.rs | 32 +++++++++- 2 files changed, 87 insertions(+), 3 deletions(-) diff --git a/datafusion/physical-optimizer/src/pushdown_sort.rs b/datafusion/physical-optimizer/src/pushdown_sort.rs index 7aaa510c854f7..4ede724913e09 100644 --- a/datafusion/physical-optimizer/src/pushdown_sort.rs +++ b/datafusion/physical-optimizer/src/pushdown_sort.rs @@ -61,8 +61,13 @@ use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; use datafusion_physical_plan::ExecutionPlan; use datafusion_physical_plan::SortOrderPushdownResult; use datafusion_physical_plan::sorts::sort::SortExec; +use datafusion_physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec; use std::sync::Arc; +/// Prefetch buffer size for SortPreservingMergeExec when sort elimination +/// removes the buffering SortExec between SPM and DataSourceExec. +const SPM_PREFETCH_AFTER_SORT_ELIMINATION: usize = 16; + /// A PhysicalOptimizerRule that attempts to push down sort requirements to data sources. /// /// See module-level documentation for details. @@ -87,8 +92,59 @@ impl PhysicalOptimizerRule for PushdownSort { } // Use transform_down to find and optimize all SortExec nodes (including nested ones) + // Also handles SPM → SortExec pattern to set prefetch when sort is eliminated plan.transform_down(|plan: Arc| { - // Check if this is a SortExec + // Pattern 1: SPM → SortExec(preserve_partitioning) + // When we eliminate the SortExec, SPM loses its memory buffer and reads + // directly from I/O-bound sources. Set a larger prefetch to pipeline I/O. + if let Some(spm) = plan.downcast_ref::() + && let Some(sort_child) = spm.input().downcast_ref::() + && sort_child.preserve_partitioning() + { + let sort_input = Arc::clone(sort_child.input()); + let required_ordering = sort_child.expr(); + match sort_input.try_pushdown_sort(required_ordering)? { + SortOrderPushdownResult::Exact { inner } => { + let inner = if let Some(fetch) = sort_child.fetch() { + inner.with_fetch(Some(fetch)).unwrap_or(inner) + } else { + inner + }; + let new_spm = SortPreservingMergeExec::new( + spm.expr().clone(), + inner, + ) + .with_fetch(spm.fetch()) + .with_round_robin_repartition( + spm.enable_round_robin_repartition(), + ) + .with_prefetch(SPM_PREFETCH_AFTER_SORT_ELIMINATION); + return Ok(Transformed::yes(Arc::new(new_spm))); + } + SortOrderPushdownResult::Inexact { inner } => { + let new_sort = SortExec::new( + required_ordering.clone(), + inner, + ) + .with_fetch(sort_child.fetch()) + .with_preserve_partitioning(true); + let new_spm = SortPreservingMergeExec::new( + spm.expr().clone(), + Arc::new(new_sort), + ) + .with_fetch(spm.fetch()) + .with_round_robin_repartition( + spm.enable_round_robin_repartition(), + ); + return Ok(Transformed::yes(Arc::new(new_spm))); + } + SortOrderPushdownResult::Unsupported => { + return Ok(Transformed::no(plan)); + } + } + } + + // Pattern 2: Standalone SortExec (no SPM parent) let Some(sort_exec) = plan.downcast_ref::() else { return Ok(Transformed::no(plan)); }; diff --git a/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs b/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs index c2a9369c723ee..0f4ef00e2d844 100644 --- a/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs +++ b/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs @@ -100,6 +100,12 @@ pub struct SortPreservingMergeExec { /// /// See [`Self::with_round_robin_repartition`] for more information. enable_round_robin_repartition: bool, + /// Number of batches to prefetch from each input partition. + /// + /// When SPM reads directly from I/O-bound sources (e.g., after sort + /// elimination removes a buffering SortExec), a larger prefetch allows + /// pipelining I/O with merge computation. Defaults to 1. + prefetch: usize, } impl SortPreservingMergeExec { @@ -113,6 +119,7 @@ impl SortPreservingMergeExec { fetch: None, cache: Arc::new(cache), enable_round_robin_repartition: true, + prefetch: 1, } } @@ -139,6 +146,21 @@ impl SortPreservingMergeExec { self } + /// Sets the number of batches to prefetch from each input partition. + /// + /// A larger value allows pipelining I/O with merge computation, which + /// helps when inputs are I/O-bound (e.g., reading directly from + /// DataSourceExec without a buffering SortExec in between). + pub fn with_prefetch(mut self, prefetch: usize) -> Self { + self.prefetch = prefetch; + self + } + + /// Returns the prefetch buffer size + pub fn prefetch(&self) -> usize { + self.prefetch + } + /// Input schema pub fn input(&self) -> &Arc { &self.input @@ -154,6 +176,11 @@ impl SortPreservingMergeExec { self.fetch } + /// Whether round-robin repartition is enabled + pub fn enable_round_robin_repartition(&self) -> bool { + self.enable_round_robin_repartition + } + /// Creates the cache object that stores the plan properties /// such as schema, equivalence properties, ordering, partitioning, etc. fn compute_properties( @@ -250,7 +277,8 @@ impl ExecutionPlan for SortPreservingMergeExec { metrics: self.metrics.clone(), fetch: limit, cache: Arc::clone(&self.cache), - enable_round_robin_repartition: true, + enable_round_robin_repartition: self.enable_round_robin_repartition, + prefetch: self.prefetch, })) } @@ -361,7 +389,7 @@ impl ExecutionPlan for SortPreservingMergeExec { .map(|partition| { let stream = self.input.execute(partition, Arc::clone(&context))?; - Ok(spawn_buffered(stream, 16)) + Ok(spawn_buffered(stream, self.prefetch)) }) .collect::>()?; From 63fd896986b91f18b6dc5d526309e92cbfb2e533 Mon Sep 17 00:00:00 2001 From: Qi Zhu <821684824@qq.com> Date: Sat, 4 Apr 2026 13:04:55 +0800 Subject: [PATCH 15/17] fmt --- .../physical-optimizer/src/pushdown_sort.rs | 25 ++++++++----------- 1 file changed, 10 insertions(+), 15 deletions(-) diff --git a/datafusion/physical-optimizer/src/pushdown_sort.rs b/datafusion/physical-optimizer/src/pushdown_sort.rs index 4ede724913e09..1fce932b960d2 100644 --- a/datafusion/physical-optimizer/src/pushdown_sort.rs +++ b/datafusion/physical-optimizer/src/pushdown_sort.rs @@ -110,24 +110,19 @@ impl PhysicalOptimizerRule for PushdownSort { } else { inner }; - let new_spm = SortPreservingMergeExec::new( - spm.expr().clone(), - inner, - ) - .with_fetch(spm.fetch()) - .with_round_robin_repartition( - spm.enable_round_robin_repartition(), - ) - .with_prefetch(SPM_PREFETCH_AFTER_SORT_ELIMINATION); + let new_spm = + SortPreservingMergeExec::new(spm.expr().clone(), inner) + .with_fetch(spm.fetch()) + .with_round_robin_repartition( + spm.enable_round_robin_repartition(), + ) + .with_prefetch(SPM_PREFETCH_AFTER_SORT_ELIMINATION); return Ok(Transformed::yes(Arc::new(new_spm))); } SortOrderPushdownResult::Inexact { inner } => { - let new_sort = SortExec::new( - required_ordering.clone(), - inner, - ) - .with_fetch(sort_child.fetch()) - .with_preserve_partitioning(true); + let new_sort = SortExec::new(required_ordering.clone(), inner) + .with_fetch(sort_child.fetch()) + .with_preserve_partitioning(true); let new_spm = SortPreservingMergeExec::new( spm.expr().clone(), Arc::new(new_sort), From eb1af953831801549f1bfb52d279503cdf4f89af Mon Sep 17 00:00:00 2001 From: Qi Zhu <821684824@qq.com> Date: Mon, 6 Apr 2026 10:30:24 +0800 Subject: [PATCH 16/17] refactor: replace SPM prefetch with BufferExec for sort elimination MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When PushdownSort eliminates SortExec under SortPreservingMergeExec, insert BufferExec(8MB) to replace SortExec's buffering role. This is strictly better: same I/O pipelining with bounded memory, no sort computation, and no impact on other SPM usage. Revert SPM prefetch field — SPM stays at spawn_buffered(1) for all cases. BufferExec only inserted by PushdownSort when needed. --- .../physical-optimizer/src/pushdown_sort.rs | 36 +++--- .../src/sorts/sort_preserving_merge.rs | 30 +---- .../sqllogictest/test_files/sort_pushdown.slt | 110 ++++++++++++++++++ 3 files changed, 132 insertions(+), 44 deletions(-) diff --git a/datafusion/physical-optimizer/src/pushdown_sort.rs b/datafusion/physical-optimizer/src/pushdown_sort.rs index 1fce932b960d2..f00be064e0501 100644 --- a/datafusion/physical-optimizer/src/pushdown_sort.rs +++ b/datafusion/physical-optimizer/src/pushdown_sort.rs @@ -60,13 +60,19 @@ use datafusion_common::config::ConfigOptions; use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; use datafusion_physical_plan::ExecutionPlan; use datafusion_physical_plan::SortOrderPushdownResult; +use datafusion_physical_plan::buffer::BufferExec; use datafusion_physical_plan::sorts::sort::SortExec; use datafusion_physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec; use std::sync::Arc; -/// Prefetch buffer size for SortPreservingMergeExec when sort elimination -/// removes the buffering SortExec between SPM and DataSourceExec. -const SPM_PREFETCH_AFTER_SORT_ELIMINATION: usize = 16; +/// Buffer capacity (in bytes) inserted between SPM and DataSourceExec when +/// sort elimination removes the buffering SortExec. +/// +/// SortExec buffers all input data in memory (with spill support) before +/// outputting sorted results. When we eliminate SortExec, SPM reads directly +/// from I/O-bound sources. BufferExec compensates by buffering batches in +/// the background, allowing I/O to pipeline with merge computation. +const BUFFER_CAPACITY_AFTER_SORT_ELIMINATION: usize = 8 * 1024 * 1024; // 8 MB /// A PhysicalOptimizerRule that attempts to push down sort requirements to data sources. /// @@ -92,11 +98,11 @@ impl PhysicalOptimizerRule for PushdownSort { } // Use transform_down to find and optimize all SortExec nodes (including nested ones) - // Also handles SPM → SortExec pattern to set prefetch when sort is eliminated + // Also handles SPM → SortExec pattern to insert BufferExec when sort is eliminated plan.transform_down(|plan: Arc| { // Pattern 1: SPM → SortExec(preserve_partitioning) // When we eliminate the SortExec, SPM loses its memory buffer and reads - // directly from I/O-bound sources. Set a larger prefetch to pipeline I/O. + // directly from I/O-bound sources. Insert a BufferExec to compensate. if let Some(spm) = plan.downcast_ref::() && let Some(sort_child) = spm.input().downcast_ref::() && sort_child.preserve_partitioning() @@ -110,13 +116,16 @@ impl PhysicalOptimizerRule for PushdownSort { } else { inner }; + // Insert BufferExec to replace SortExec's buffering role. + // SortExec buffered all data in memory; BufferExec provides + // bounded buffering so SPM doesn't stall on I/O. + let buffered: Arc = Arc::new(BufferExec::new( + inner, + BUFFER_CAPACITY_AFTER_SORT_ELIMINATION, + )); let new_spm = - SortPreservingMergeExec::new(spm.expr().clone(), inner) - .with_fetch(spm.fetch()) - .with_round_robin_repartition( - spm.enable_round_robin_repartition(), - ) - .with_prefetch(SPM_PREFETCH_AFTER_SORT_ELIMINATION); + SortPreservingMergeExec::new(spm.expr().clone(), buffered) + .with_fetch(spm.fetch()); return Ok(Transformed::yes(Arc::new(new_spm))); } SortOrderPushdownResult::Inexact { inner } => { @@ -127,10 +136,7 @@ impl PhysicalOptimizerRule for PushdownSort { spm.expr().clone(), Arc::new(new_sort), ) - .with_fetch(spm.fetch()) - .with_round_robin_repartition( - spm.enable_round_robin_repartition(), - ); + .with_fetch(spm.fetch()); return Ok(Transformed::yes(Arc::new(new_spm))); } SortOrderPushdownResult::Unsupported => { diff --git a/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs b/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs index 0f4ef00e2d844..13c28ccb10991 100644 --- a/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs +++ b/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs @@ -100,12 +100,6 @@ pub struct SortPreservingMergeExec { /// /// See [`Self::with_round_robin_repartition`] for more information. enable_round_robin_repartition: bool, - /// Number of batches to prefetch from each input partition. - /// - /// When SPM reads directly from I/O-bound sources (e.g., after sort - /// elimination removes a buffering SortExec), a larger prefetch allows - /// pipelining I/O with merge computation. Defaults to 1. - prefetch: usize, } impl SortPreservingMergeExec { @@ -119,7 +113,6 @@ impl SortPreservingMergeExec { fetch: None, cache: Arc::new(cache), enable_round_robin_repartition: true, - prefetch: 1, } } @@ -146,21 +139,6 @@ impl SortPreservingMergeExec { self } - /// Sets the number of batches to prefetch from each input partition. - /// - /// A larger value allows pipelining I/O with merge computation, which - /// helps when inputs are I/O-bound (e.g., reading directly from - /// DataSourceExec without a buffering SortExec in between). - pub fn with_prefetch(mut self, prefetch: usize) -> Self { - self.prefetch = prefetch; - self - } - - /// Returns the prefetch buffer size - pub fn prefetch(&self) -> usize { - self.prefetch - } - /// Input schema pub fn input(&self) -> &Arc { &self.input @@ -176,11 +154,6 @@ impl SortPreservingMergeExec { self.fetch } - /// Whether round-robin repartition is enabled - pub fn enable_round_robin_repartition(&self) -> bool { - self.enable_round_robin_repartition - } - /// Creates the cache object that stores the plan properties /// such as schema, equivalence properties, ordering, partitioning, etc. fn compute_properties( @@ -278,7 +251,6 @@ impl ExecutionPlan for SortPreservingMergeExec { fetch: limit, cache: Arc::clone(&self.cache), enable_round_robin_repartition: self.enable_round_robin_repartition, - prefetch: self.prefetch, })) } @@ -389,7 +361,7 @@ impl ExecutionPlan for SortPreservingMergeExec { .map(|partition| { let stream = self.input.execute(partition, Arc::clone(&context))?; - Ok(spawn_buffered(stream, self.prefetch)) + Ok(spawn_buffered(stream, 1)) }) .collect::>()?; diff --git a/datafusion/sqllogictest/test_files/sort_pushdown.slt b/datafusion/sqllogictest/test_files/sort_pushdown.slt index 8a7df04fae3de..d399394bef8a4 100644 --- a/datafusion/sqllogictest/test_files/sort_pushdown.slt +++ b/datafusion/sqllogictest/test_files/sort_pushdown.slt @@ -2161,6 +2161,116 @@ DROP TABLE null_src_b; statement ok DROP TABLE tf_nulls_last; +# =========================================================== +# Test G: BufferExec insertion when sort elimination removes +# SortExec under SortPreservingMergeExec. +# +# When PushdownSort eliminates SortExec(preserve_partitioning=true), +# SPM loses SortExec's memory buffer. A BufferExec is inserted to +# compensate, allowing I/O pipelining with merge computation. +# =========================================================== + +# Create files with reversed naming: c_low has smallest values, +# a_high has largest — alphabetical order ≠ sort key order. +statement ok +CREATE TABLE tg_src_low(id INT, value INT) AS VALUES (1, 100), (2, 200), (3, 300); + +statement ok +CREATE TABLE tg_src_mid(id INT, value INT) AS VALUES (4, 400), (5, 500), (6, 600); + +statement ok +CREATE TABLE tg_src_high(id INT, value INT) AS VALUES (7, 700), (8, 800), (9, 900), (10, 1000); + +query I +COPY (SELECT * FROM tg_src_low ORDER BY id ASC) +TO 'test_files/scratch/sort_pushdown/tg_buffer/c_low.parquet'; +---- +3 + +query I +COPY (SELECT * FROM tg_src_mid ORDER BY id ASC) +TO 'test_files/scratch/sort_pushdown/tg_buffer/b_mid.parquet'; +---- +3 + +query I +COPY (SELECT * FROM tg_src_high ORDER BY id ASC) +TO 'test_files/scratch/sort_pushdown/tg_buffer/a_high.parquet'; +---- +4 + +# Use target_partitions=2 so files are split across 2 groups. +# Files are in wrong alphabetical order → validated_output_ordering strips ordering +# → EnforceSorting adds SortExec(preserve_partitioning) + SPM +# → PushdownSort eliminates SortExec and inserts BufferExec +statement ok +SET datafusion.execution.target_partitions = 2; + +statement ok +CREATE EXTERNAL TABLE tg_buffer(id INT, value INT) +STORED AS PARQUET +LOCATION 'test_files/scratch/sort_pushdown/tg_buffer/' +WITH ORDER (id ASC); + +# Test G.1: BufferExec appears between SPM and DataSourceExec +query TT +EXPLAIN SELECT * FROM tg_buffer ORDER BY id ASC; +---- +logical_plan +01)Sort: tg_buffer.id ASC NULLS LAST +02)--TableScan: tg_buffer projection=[id, value] +physical_plan +01)SortPreservingMergeExec: [id@0 ASC NULLS LAST] +02)--BufferExec: capacity=8388608 +03)----DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/tg_buffer/b_mid.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/tg_buffer/a_high.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/tg_buffer/c_low.parquet]]}, projection=[id, value], output_ordering=[id@0 ASC NULLS LAST], file_type=parquet + +# Verify correctness +query II +SELECT * FROM tg_buffer ORDER BY id ASC; +---- +1 100 +2 200 +3 300 +4 400 +5 500 +6 600 +7 700 +8 800 +9 900 +10 1000 + +# Test G.2: LIMIT query with BufferExec +query TT +EXPLAIN SELECT * FROM tg_buffer ORDER BY id ASC LIMIT 3; +---- +logical_plan +01)Sort: tg_buffer.id ASC NULLS LAST, fetch=3 +02)--TableScan: tg_buffer projection=[id, value] +physical_plan +01)SortPreservingMergeExec: [id@0 ASC NULLS LAST], fetch=3 +02)--BufferExec: capacity=8388608 +03)----DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/tg_buffer/b_mid.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/tg_buffer/a_high.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/tg_buffer/c_low.parquet]]}, projection=[id, value], limit=3, output_ordering=[id@0 ASC NULLS LAST], file_type=parquet + +query II +SELECT * FROM tg_buffer ORDER BY id ASC LIMIT 3; +---- +1 100 +2 200 +3 300 + +# Cleanup Test G +statement ok +DROP TABLE tg_src_low; + +statement ok +DROP TABLE tg_src_mid; + +statement ok +DROP TABLE tg_src_high; + +statement ok +DROP TABLE tg_buffer; + # Reset settings (SLT runner uses target_partitions=4, not system default) statement ok SET datafusion.execution.target_partitions = 4; From 790cbce49faa827ec11830074cd2a00f80016da0 Mon Sep 17 00:00:00 2001 From: Qi Zhu <821684824@qq.com> Date: Mon, 6 Apr 2026 12:53:41 +0800 Subject: [PATCH 17/17] perf: increase BufferExec capacity from 8MB to 64MB for sort elimination 8MB was too small for wide-row full scans (Q3: SELECT * with 16 columns), causing SPM to stall on I/O. 64MB per partition is still strictly less than the SortExec it replaces (which buffers entire partition in memory). BufferExec integrates with MemoryPool so it won't cause OOM. --- .../physical-optimizer/src/pushdown_sort.rs | 19 ++++++++++++------- .../sqllogictest/test_files/sort_pushdown.slt | 4 ++-- 2 files changed, 14 insertions(+), 9 deletions(-) diff --git a/datafusion/physical-optimizer/src/pushdown_sort.rs b/datafusion/physical-optimizer/src/pushdown_sort.rs index f00be064e0501..308e91d0df145 100644 --- a/datafusion/physical-optimizer/src/pushdown_sort.rs +++ b/datafusion/physical-optimizer/src/pushdown_sort.rs @@ -65,14 +65,19 @@ use datafusion_physical_plan::sorts::sort::SortExec; use datafusion_physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec; use std::sync::Arc; -/// Buffer capacity (in bytes) inserted between SPM and DataSourceExec when -/// sort elimination removes the buffering SortExec. +/// Per-partition buffer capacity (in bytes) inserted between SPM and +/// DataSourceExec when sort elimination removes the buffering SortExec. /// -/// SortExec buffers all input data in memory (with spill support) before -/// outputting sorted results. When we eliminate SortExec, SPM reads directly -/// from I/O-bound sources. BufferExec compensates by buffering batches in -/// the background, allowing I/O to pipeline with merge computation. -const BUFFER_CAPACITY_AFTER_SORT_ELIMINATION: usize = 8 * 1024 * 1024; // 8 MB +/// SortExec buffers all input data in memory (potentially GB per partition) +/// before outputting sorted results. When we eliminate SortExec, SPM reads +/// directly from I/O-bound sources. BufferExec compensates with bounded +/// buffering, allowing I/O to pipeline with merge computation. +/// +/// This is strictly less memory than the SortExec it replaces, and only +/// inserted when PushdownSort eliminates a SortExec — no impact on other +/// query plans. BufferExec also integrates with MemoryPool, so it respects +/// the global memory limit and won't cause OOM. +const BUFFER_CAPACITY_AFTER_SORT_ELIMINATION: usize = 64 * 1024 * 1024; // 64 MB /// A PhysicalOptimizerRule that attempts to push down sort requirements to data sources. /// diff --git a/datafusion/sqllogictest/test_files/sort_pushdown.slt b/datafusion/sqllogictest/test_files/sort_pushdown.slt index d399394bef8a4..e9d4e221e1ddb 100644 --- a/datafusion/sqllogictest/test_files/sort_pushdown.slt +++ b/datafusion/sqllogictest/test_files/sort_pushdown.slt @@ -2221,7 +2221,7 @@ logical_plan 02)--TableScan: tg_buffer projection=[id, value] physical_plan 01)SortPreservingMergeExec: [id@0 ASC NULLS LAST] -02)--BufferExec: capacity=8388608 +02)--BufferExec: capacity=67108864 03)----DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/tg_buffer/b_mid.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/tg_buffer/a_high.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/tg_buffer/c_low.parquet]]}, projection=[id, value], output_ordering=[id@0 ASC NULLS LAST], file_type=parquet # Verify correctness @@ -2248,7 +2248,7 @@ logical_plan 02)--TableScan: tg_buffer projection=[id, value] physical_plan 01)SortPreservingMergeExec: [id@0 ASC NULLS LAST], fetch=3 -02)--BufferExec: capacity=8388608 +02)--BufferExec: capacity=67108864 03)----DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/tg_buffer/b_mid.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/tg_buffer/a_high.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/tg_buffer/c_low.parquet]]}, projection=[id, value], limit=3, output_ordering=[id@0 ASC NULLS LAST], file_type=parquet query II