Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -557,6 +557,19 @@ config_namespace! {
/// batches and merged.
pub sort_in_place_threshold_bytes: usize, default = 1024 * 1024

/// Maximum buffer capacity (in bytes) per partition for BufferExec
/// inserted during sort pushdown optimization.
///
/// When PushdownSort eliminates a SortExec under SortPreservingMergeExec,
/// a BufferExec is inserted to replace SortExec's buffering role. This
/// prevents I/O stalls by allowing the scan to run ahead of the merge.
///
/// This uses strictly less memory than the SortExec it replaces (which
/// buffers the entire partition). The buffer respects the global memory
/// pool limit. Setting this to a large value is safe — actual memory
/// usage is bounded by partition size and global memory limits.
pub sort_pushdown_buffer_capacity: usize, default = 1024 * 1024 * 1024
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR increases the size buffer because

64MB was too small for wide-row scans (16-column TPC-H SELECT * queries showed I/O stalls)

To be clear to anyone reading this, what will be on main is still better than 53.0.0 because prior to #21182 DataFusion would have sorted the entire thing (rather than just buffering it)

A fixed size like this is likely to buffer more than required for narrow cases

I suspect a better solution than a fixed size buffer would be some calculation based on the actual size of the data (e.g. the number of rows to buffer). However, that is tricky to compute / constrain memory when large strings are involved.

We probably would need to have both a row limit and a memory cap and pick the smaller of the two.

We can perhaps do this as a follow on issue/PR

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point! A dynamic approach with both a row limit and a memory cap (whichever is reached first) would adapt better to different row widths. Created #21440 to track this.


/// Maximum size in bytes for individual spill files before rotating to a new file.
///
/// When operators spill data to disk (e.g., RepartitionExec), they write
Expand Down
22 changes: 4 additions & 18 deletions datafusion/physical-optimizer/src/pushdown_sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,20 +65,6 @@ use datafusion_physical_plan::sorts::sort::SortExec;
use datafusion_physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
use std::sync::Arc;

/// Per-partition buffer capacity (in bytes) inserted between SPM and
/// DataSourceExec when sort elimination removes the buffering SortExec.
///
/// SortExec buffers all input data in memory (potentially GB per partition)
/// before outputting sorted results. When we eliminate SortExec, SPM reads
/// directly from I/O-bound sources. BufferExec compensates with bounded
/// buffering, allowing I/O to pipeline with merge computation.
///
/// This is strictly less memory than the SortExec it replaces, and only
/// inserted when PushdownSort eliminates a SortExec — no impact on other
/// query plans. BufferExec also integrates with MemoryPool, so it respects
/// the global memory limit and won't cause OOM.
const BUFFER_CAPACITY_AFTER_SORT_ELIMINATION: usize = 64 * 1024 * 1024; // 64 MB

/// A PhysicalOptimizerRule that attempts to push down sort requirements to data sources.
///
/// See module-level documentation for details.
Expand All @@ -102,6 +88,8 @@ impl PhysicalOptimizerRule for PushdownSort {
return Ok(plan);
}

let buffer_capacity = config.execution.sort_pushdown_buffer_capacity;

// Use transform_down to find and optimize all SortExec nodes (including nested ones)
// Also handles SPM → SortExec pattern to insert BufferExec when sort is eliminated
plan.transform_down(|plan: Arc<dyn ExecutionPlan>| {
Expand All @@ -124,10 +112,8 @@ impl PhysicalOptimizerRule for PushdownSort {
// Insert BufferExec to replace SortExec's buffering role.
// SortExec buffered all data in memory; BufferExec provides
// bounded buffering so SPM doesn't stall on I/O.
let buffered: Arc<dyn ExecutionPlan> = Arc::new(BufferExec::new(
inner,
BUFFER_CAPACITY_AFTER_SORT_ELIMINATION,
));
let buffered: Arc<dyn ExecutionPlan> =
Arc::new(BufferExec::new(inner, buffer_capacity));
let new_spm =
SortPreservingMergeExec::new(spm.expr().clone(), buffered)
.with_fetch(spm.fetch());
Expand Down
2 changes: 2 additions & 0 deletions datafusion/sqllogictest/test_files/information_schema.slt
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,7 @@ datafusion.execution.skip_partial_aggregation_probe_rows_threshold 100000
datafusion.execution.skip_physical_aggregate_schema_check false
datafusion.execution.soft_max_rows_per_output_file 50000000
datafusion.execution.sort_in_place_threshold_bytes 1048576
datafusion.execution.sort_pushdown_buffer_capacity 1073741824
datafusion.execution.sort_spill_reservation_bytes 10485760
datafusion.execution.spill_compression uncompressed
datafusion.execution.split_file_groups_by_statistics false
Expand Down Expand Up @@ -413,6 +414,7 @@ datafusion.execution.skip_partial_aggregation_probe_rows_threshold 100000 Number
datafusion.execution.skip_physical_aggregate_schema_check false When set to true, skips verifying that the schema produced by planning the input of `LogicalPlan::Aggregate` exactly matches the schema of the input plan. When set to false, if the schema does not match exactly (including nullability and metadata), a planning error will be raised. This is used to workaround bugs in the planner that are now caught by the new schema verification step.
datafusion.execution.soft_max_rows_per_output_file 50000000 Target number of rows in output files when writing multiple. This is a soft max, so it can be exceeded slightly. There also will be one file smaller than the limit if the total number of rows written is not roughly divisible by the soft max
datafusion.execution.sort_in_place_threshold_bytes 1048576 When sorting, below what size should data be concatenated and sorted in a single RecordBatch rather than sorted in batches and merged.
datafusion.execution.sort_pushdown_buffer_capacity 1073741824 Maximum buffer capacity (in bytes) per partition for BufferExec inserted during sort pushdown optimization. When PushdownSort eliminates a SortExec under SortPreservingMergeExec, a BufferExec is inserted to replace SortExec's buffering role. This prevents I/O stalls by allowing the scan to run ahead of the merge. This uses strictly less memory than the SortExec it replaces (which buffers the entire partition). The buffer respects the global memory pool limit. Setting this to a large value is safe — actual memory usage is bounded by partition size and global memory limits.
datafusion.execution.sort_spill_reservation_bytes 10485760 Specifies the reserved memory for each spillable sort operation to facilitate an in-memory merge. When a sort operation spills to disk, the in-memory data must be sorted and merged before being written to a file. This setting reserves a specific amount of memory for that in-memory sort/merge process. Note: This setting is irrelevant if the sort operation cannot spill (i.e., if there's no `DiskManager` configured).
datafusion.execution.spill_compression uncompressed Sets the compression codec used when spilling data to disk. Since datafusion writes spill files using the Arrow IPC Stream format, only codecs supported by the Arrow IPC Stream Writer are allowed. Valid values are: uncompressed, lz4_frame, zstd. Note: lz4_frame offers faster (de)compression, but typically results in larger spill files. In contrast, zstd achieves higher compression ratios at the cost of slower (de)compression speed.
datafusion.execution.split_file_groups_by_statistics false Attempt to eliminate sorts by packing & sorting files with non-overlapping statistics into the same file groups. Currently experimental
Expand Down
4 changes: 2 additions & 2 deletions datafusion/sqllogictest/test_files/sort_pushdown.slt
Original file line number Diff line number Diff line change
Expand Up @@ -2221,7 +2221,7 @@ logical_plan
02)--TableScan: tg_buffer projection=[id, value]
physical_plan
01)SortPreservingMergeExec: [id@0 ASC NULLS LAST]
02)--BufferExec: capacity=67108864
02)--BufferExec: capacity=1073741824
03)----DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/tg_buffer/b_mid.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/tg_buffer/a_high.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/tg_buffer/c_low.parquet]]}, projection=[id, value], output_ordering=[id@0 ASC NULLS LAST], file_type=parquet

# Verify correctness
Expand All @@ -2248,7 +2248,7 @@ logical_plan
02)--TableScan: tg_buffer projection=[id, value]
physical_plan
01)SortPreservingMergeExec: [id@0 ASC NULLS LAST], fetch=3
02)--BufferExec: capacity=67108864
02)--BufferExec: capacity=1073741824
03)----DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/tg_buffer/b_mid.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/tg_buffer/a_high.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/tg_buffer/c_low.parquet]]}, projection=[id, value], limit=3, output_ordering=[id@0 ASC NULLS LAST], file_type=parquet

query II
Expand Down
1 change: 1 addition & 0 deletions docs/source/user-guide/configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ The following configuration settings are available:
| datafusion.execution.spill_compression | uncompressed | Sets the compression codec used when spilling data to disk. Since datafusion writes spill files using the Arrow IPC Stream format, only codecs supported by the Arrow IPC Stream Writer are allowed. Valid values are: uncompressed, lz4_frame, zstd. Note: lz4_frame offers faster (de)compression, but typically results in larger spill files. In contrast, zstd achieves higher compression ratios at the cost of slower (de)compression speed. |
| datafusion.execution.sort_spill_reservation_bytes | 10485760 | Specifies the reserved memory for each spillable sort operation to facilitate an in-memory merge. When a sort operation spills to disk, the in-memory data must be sorted and merged before being written to a file. This setting reserves a specific amount of memory for that in-memory sort/merge process. Note: This setting is irrelevant if the sort operation cannot spill (i.e., if there's no `DiskManager` configured). |
| datafusion.execution.sort_in_place_threshold_bytes | 1048576 | When sorting, below what size should data be concatenated and sorted in a single RecordBatch rather than sorted in batches and merged. |
| datafusion.execution.sort_pushdown_buffer_capacity | 1073741824 | Maximum buffer capacity (in bytes) per partition for BufferExec inserted during sort pushdown optimization. When PushdownSort eliminates a SortExec under SortPreservingMergeExec, a BufferExec is inserted to replace SortExec's buffering role. This prevents I/O stalls by allowing the scan to run ahead of the merge. This uses strictly less memory than the SortExec it replaces (which buffers the entire partition). The buffer respects the global memory pool limit. Setting this to a large value is safe — actual memory usage is bounded by partition size and global memory limits. |
| datafusion.execution.max_spill_file_size_bytes | 134217728 | Maximum size in bytes for individual spill files before rotating to a new file. When operators spill data to disk (e.g., RepartitionExec), they write multiple batches to the same file until this size limit is reached, then rotate to a new file. This reduces syscall overhead compared to one-file-per-batch while preventing files from growing too large. A larger value reduces file creation overhead but may hold more disk space. A smaller value creates more files but allows finer-grained space reclamation as files can be deleted once fully consumed. Now only `RepartitionExec` supports this spill file rotation feature, other spilling operators may create spill files larger than the limit. Default: 128 MB |
| datafusion.execution.meta_fetch_concurrency | 32 | Number of files to read in parallel when inferring schema and statistics |
| datafusion.execution.minimum_parallel_output_files | 4 | Guarantees a minimum level of output files running in parallel. RecordBatches will be distributed in round robin fashion to each parallel writer. Each writer is closed and a new file opened once soft_max_rows_per_output_file is reached. |
Expand Down
Loading