diff --git a/datafusion/core/src/physical_optimizer/repartition.rs b/datafusion/core/src/physical_optimizer/repartition.rs index 806ed1d4b45e..81f6b2edac13 100644 --- a/datafusion/core/src/physical_optimizer/repartition.rs +++ b/datafusion/core/src/physical_optimizer/repartition.rs @@ -129,32 +129,47 @@ impl Repartition { } } -/// Recursively visits all `plan`s puts and then optionally adds a -/// `RepartitionExec` at the output of `plan` to match -/// `target_partitions` in an attempt to increase the overall parallelism. +/// Recursively attempts to increase the overall parallelism of the +/// plan, while respecting ordering, by adding a `RepartitionExec` at +/// the output of `plan` if it would help parallelism and not destroy +/// any possibly useful ordering. /// -/// It does so using depth first scan of the tree, and repartitions +/// It does so using a depth first scan of the tree, and repartitions /// any plan that: /// /// 1. Has fewer partitions than `target_partitions` /// /// 2. Has a direct parent that `benefits_from_input_partitioning` /// -/// if `can_reorder` is false, means that the output of this node -/// can not be reordered as as the final output is relying on that order +/// 3. Does not destroy any existing sort order if the parent is +/// relying on it. /// -/// If 'would_benefit` is false, the upstream operator doesn't -/// benefit from additional repartition +/// if `can_reorder` is false, it means the parent node of `plan` is +/// trying to take advantage of the output sort order of plan, so it +/// should not be repartitioned if doing so would destroy the output +/// sort order. /// +/// (Parent) - If can_reorder is false, means this parent node is +/// trying to use the sort ouder order this plan. If true +/// means parent doesn't care about sort order +/// +/// (plan) - We are deciding to add a partition above here +/// +/// (children) - Recursively visit all children first +/// +/// If 'would_benefit` is true, the upstream operator would benefit +/// from additional partitions and thus repatitioning is considered. +/// +/// if `is_root` is true, no repartition is added. fn optimize_partitions( target_partitions: usize, plan: Arc, + is_root: bool, can_reorder: bool, would_benefit: bool, ) -> Result> { // Recurse into children bottom-up (attempt to repartition as // early as possible) - let new_plan = if plan.children().is_empty() { // leaf node - don't replace children plan @@ -163,10 +178,34 @@ fn optimize_partitions( .children() .iter() .map(|child| { + // does plan itelf (not parent) require its input to + // be sorted in some way? + let required_input_ordering = + plan_has_required_input_ordering(plan.as_ref()); + + let can_reorder_child = if can_reorder { + // parent of `plan` will not use any particular order + + // if `plan` itself doesn't need order OR + !required_input_ordering || + // child has no order to preserve + child.output_ordering().is_none() + } else { + // parent would like to use the `plan`'s output + // order. + + // if `plan` doesn't maintain the input order and + // doesn't need the child's output order itself + (!plan.maintains_input_order() && !required_input_ordering) || + // child has no ordering to preserve + child.output_ordering().is_none() + }; + optimize_partitions( target_partitions, child.clone(), - can_reorder || child.output_ordering().is_none(), + false, // child is not root + can_reorder_child, plan.benefits_from_input_partitioning(), ) }) @@ -191,6 +230,11 @@ fn optimize_partitions( && stats.num_rows.map(|num_rows| num_rows > 1).unwrap_or(true); } + // don't reparititon root of the plan + if is_root { + could_repartition = false; + } + if would_benefit && could_repartition && can_reorder { Ok(Arc::new(RepartitionExec::try_new( new_plan, @@ -201,6 +245,14 @@ fn optimize_partitions( } } +/// Returns true if `plan` requires any of inputs to be sorted in some +/// way for correctness. If this is true, its output should not be +/// repartitioned if it would destroy the required order. +fn plan_has_required_input_ordering(plan: &dyn ExecutionPlan) -> bool { + // NB: checking `is_empty()` is not the right check! + plan.required_input_ordering().iter().any(Option::is_some) +} + impl PhysicalOptimizerRule for Repartition { fn optimize( &self, @@ -213,11 +265,15 @@ impl PhysicalOptimizerRule for Repartition { if !enabled || target_partitions == 1 { Ok(plan) } else { + let is_root = true; + let can_reorder = plan.output_ordering().is_none(); + let would_benefit = false; optimize_partitions( target_partitions, plan.clone(), - plan.output_ordering().is_none(), - false, + is_root, + can_reorder, + would_benefit, ) } } @@ -230,6 +286,13 @@ impl PhysicalOptimizerRule for Repartition { true } } + +#[cfg(test)] +#[ctor::ctor] +fn init() { + let _ = env_logger::try_init(); +} + #[cfg(test)] mod tests { use arrow::compute::SortOptions; @@ -251,12 +314,13 @@ mod tests { use crate::physical_plan::sorts::sort::SortExec; use crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec; use crate::physical_plan::union::UnionExec; - use crate::physical_plan::{displayable, Statistics}; + use crate::physical_plan::{displayable, DisplayFormatType, Statistics}; fn schema() -> SchemaRef { Arc::new(Schema::new(vec![Field::new("c1", DataType::Boolean, true)])) } + /// Create a non sorted parquet exec fn parquet_exec() -> Arc { Arc::new(ParquetExec::new( FileScanConfig { @@ -275,6 +339,30 @@ mod tests { )) } + // Created a sorted parquet exec + fn parquet_exec_sorted() -> Arc { + let sort_exprs = vec![PhysicalSortExpr { + expr: col("c1", &schema()).unwrap(), + options: SortOptions::default(), + }]; + + Arc::new(ParquetExec::new( + FileScanConfig { + object_store_url: ObjectStoreUrl::parse("test:///").unwrap(), + file_schema: schema(), + file_groups: vec![vec![PartitionedFile::new("x".to_string(), 100)]], + statistics: Statistics::default(), + projection: None, + limit: None, + table_partition_cols: vec![], + output_ordering: Some(sort_exprs), + infinite_source: false, + }, + None, + None, + )) + } + fn sort_preserving_merge_exec( input: Arc, ) -> Arc { @@ -350,6 +438,14 @@ mod tests { )) } + fn union_exec(input: Vec>) -> Arc { + Arc::new(UnionExec::new(input)) + } + + fn sort_required_exec(input: Arc) -> Arc { + Arc::new(SortRequiredExec::new(input)) + } + fn trim_plan_display(plan: &str) -> Vec<&str> { plan.split('\n') .map(|s| s.trim()) @@ -550,8 +646,7 @@ mod tests { #[test] fn repartition_ignores_union() -> Result<()> { - let plan: Arc = - Arc::new(UnionExec::new(vec![parquet_exec(); 5])); + let plan = union_exec(vec![parquet_exec(); 5]); let expected = &[ "UnionExec", @@ -568,9 +663,11 @@ mod tests { } #[test] - fn repartition_ignores_sort_preserving_merge() -> Result<()> { + fn repartition_through_sort_preserving_merge() -> Result<()> { + // sort preserving merge with non-sorted input let plan = sort_preserving_merge_exec(parquet_exec()); + // need repartiton and resort as the data was not sorted correctly let expected = &[ "SortPreservingMergeExec: [c1@0 ASC]", "SortExec: [c1@0 ASC]", @@ -583,9 +680,94 @@ mod tests { } #[test] - fn repartition_does_not_repartition_transitively() -> Result<()> { + fn repartition_ignores_sort_preserving_merge() -> Result<()> { + // sort preserving merge already sorted input, + let plan = sort_preserving_merge_exec(parquet_exec_sorted()); + + // should not repartition / sort (as the data was already sorted) + let expected = &[ + "SortPreservingMergeExec: [c1@0 ASC]", + "ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[c1@0 ASC], projection=[c1]", + ]; + + assert_optimized!(expected, plan); + Ok(()) + } + + #[test] + fn repartition_ignores_sort_preserving_merge_with_union() -> Result<()> { + // 2 sorted parquet files unioned (partitions are concatenated, sort is preserved) + let input = union_exec(vec![parquet_exec_sorted(); 2]); + let plan = sort_preserving_merge_exec(input); + + // should not repartition / sort (as the data was already sorted) + let expected = &[ + "SortPreservingMergeExec: [c1@0 ASC]", + "UnionExec", + "ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[c1@0 ASC], projection=[c1]", + "ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[c1@0 ASC], projection=[c1]", + ]; + + assert_optimized!(expected, plan); + Ok(()) + } + + #[test] + fn repartition_does_not_destroy_sort() -> Result<()> { + // SortRequired + // Parquet(sorted) + + let plan = sort_required_exec(parquet_exec_sorted()); + + // should not repartition as doing so destroys the necessary sort order + let expected = &[ + "SortRequiredExec", + "ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[c1@0 ASC], projection=[c1]", + ]; + + assert_optimized!(expected, plan); + Ok(()) + } + + #[test] + fn repartition_does_not_destroy_sort_more_complex() -> Result<()> { + // model a more complicated scenario where one child of a union can be repartitioned for performance + // but the other can not be + // + // Union + // SortRequired + // Parquet(sorted) + // Filter + // Parquet(unsorted) + + let input1 = sort_required_exec(parquet_exec_sorted()); + let input2 = filter_exec(parquet_exec()); + let plan = union_exec(vec![input1, input2]); + + // should not repartition below the SortRequired as that + // destroys the sort order but should still repartition for + // FilterExec + let expected = &[ + "UnionExec", + // union input 1: no repartitioning + "SortRequiredExec", + "ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[c1@0 ASC], projection=[c1]", + // union input 2: should repartition + "FilterExec: c1@0", + "RepartitionExec: partitioning=RoundRobinBatch(10)", + "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[c1]", + ]; + + assert_optimized!(expected, plan); + Ok(()) + } + + #[test] + fn repartition_transitively_with_projection() -> Result<()> { + // non sorted input let plan = sort_preserving_merge_exec(projection_exec(parquet_exec())); + // needs to repartition / sort as the data was not sorted correctly let expected = &[ "SortPreservingMergeExec: [c1@0 ASC]", "SortExec: [c1@0 ASC]", @@ -598,6 +780,22 @@ mod tests { Ok(()) } + #[test] + fn repartition_ignores_transitively_with_projection() -> Result<()> { + // sorted input + let plan = sort_preserving_merge_exec(projection_exec(parquet_exec_sorted())); + + // data should not be repartitioned / resorted + let expected = &[ + "SortPreservingMergeExec: [c1@0 ASC]", + "ProjectionExec: expr=[c1@0 as c1]", + "ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[c1@0 ASC], projection=[c1]", + ]; + + assert_optimized!(expected, plan); + Ok(()) + } + #[test] fn repartition_transitively_past_sort_with_projection() -> Result<()> { let plan = @@ -655,4 +853,73 @@ mod tests { assert_optimized!(expected, plan); Ok(()) } + + /// Models operators like BoundedWindowExec that require an input + /// ordering but is easy to construct + #[derive(Debug)] + struct SortRequiredExec { + input: Arc, + } + + impl SortRequiredExec { + fn new(input: Arc) -> Self { + Self { input } + } + } + + impl ExecutionPlan for SortRequiredExec { + fn as_any(&self) -> &dyn std::any::Any { + self + } + + fn schema(&self) -> SchemaRef { + self.input.schema() + } + + fn output_partitioning(&self) -> crate::physical_plan::Partitioning { + self.input.output_partitioning() + } + + fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { + self.input.output_ordering() + } + + fn children(&self) -> Vec> { + vec![self.input.clone()] + } + + // model that it requires the output ordering of its input + fn required_input_ordering(&self) -> Vec> { + vec![self.input.output_ordering()] + } + + fn with_new_children( + self: Arc, + mut children: Vec>, + ) -> Result> { + assert_eq!(children.len(), 1); + let child = children.pop().unwrap(); + Ok(Arc::new(Self::new(child))) + } + + fn execute( + &self, + _partition: usize, + _context: Arc, + ) -> Result { + unreachable!(); + } + + fn statistics(&self) -> Statistics { + self.input.statistics() + } + + fn fmt_as( + &self, + _t: DisplayFormatType, + f: &mut std::fmt::Formatter, + ) -> std::fmt::Result { + write!(f, "SortRequiredExec") + } + } } diff --git a/datafusion/core/src/physical_optimizer/sort_enforcement.rs b/datafusion/core/src/physical_optimizer/sort_enforcement.rs index 52463b4bdc09..703a13a1cb1d 100644 --- a/datafusion/core/src/physical_optimizer/sort_enforcement.rs +++ b/datafusion/core/src/physical_optimizer/sort_enforcement.rs @@ -486,15 +486,19 @@ fn check_alignment( #[cfg(test)] mod tests { use super::*; + use crate::datasource::listing::PartitionedFile; + use crate::datasource::object_store::ObjectStoreUrl; use crate::physical_plan::displayable; + use crate::physical_plan::file_format::{FileScanConfig, ParquetExec}; use crate::physical_plan::filter::FilterExec; use crate::physical_plan::memory::MemoryExec; use crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec; + use crate::physical_plan::union::UnionExec; use crate::physical_plan::windows::create_window_expr; use crate::prelude::SessionContext; use arrow::compute::SortOptions; use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; - use datafusion_common::Result; + use datafusion_common::{Result, Statistics}; use datafusion_expr::{AggregateFunction, WindowFrame, WindowFunction}; use datafusion_physical_expr::expressions::{col, NotExpr}; use datafusion_physical_expr::PhysicalSortExpr; @@ -589,71 +593,84 @@ mod tests { Ok(()) } + /// Runs the sort enforcement optimizer and asserts the plan + /// against the original and expected plans + /// + /// `$EXPECTED_PLAN_LINES`: input plan + /// `$EXPECTED_OPTIMIZED_PLAN_LINES`: optimized plan + /// `$PLAN`: the plan to optimized + /// + macro_rules! assert_optimized { + ($EXPECTED_PLAN_LINES: expr, $EXPECTED_OPTIMIZED_PLAN_LINES: expr, $PLAN: expr) => { + let session_ctx = SessionContext::new(); + let state = session_ctx.state(); + + let physical_plan = $PLAN; + let formatted = displayable(physical_plan.as_ref()).indent().to_string(); + let actual: Vec<&str> = formatted.trim().lines().collect(); + + let expected_plan_lines: Vec<&str> = $EXPECTED_PLAN_LINES + .iter().map(|s| *s).collect(); + + assert_eq!( + expected_plan_lines, actual, + "\n**Original Plan Mismatch\n\nexpected:\n\n{expected_plan_lines:#?}\nactual:\n\n{actual:#?}\n\n" + ); + + let expected_optimized_lines: Vec<&str> = $EXPECTED_OPTIMIZED_PLAN_LINES + .iter().map(|s| *s).collect(); + + // Run the actual optimizer + let optimized_physical_plan = + EnforceSorting::new().optimize(physical_plan, state.config_options())?; + + let formatted = displayable(optimized_physical_plan.as_ref()) + .indent() + .to_string(); + let actual: Vec<&str> = formatted.trim().lines().collect(); + assert_eq!( + expected_optimized_lines, actual, + "\n**Optimized Plan Mismatch\n\nexpected:\n\n{expected_optimized_lines:#?}\nactual:\n\n{actual:#?}\n\n" + ); + + }; + } + #[tokio::test] async fn test_remove_unnecessary_sort() -> Result<()> { - let session_ctx = SessionContext::new(); - let state = session_ctx.state(); let schema = create_test_schema()?; - let source = Arc::new(MemoryExec::try_new(&[], schema.clone(), None)?) - as Arc; - let sort_exprs = vec![PhysicalSortExpr { - expr: col("non_nullable_col", schema.as_ref()).unwrap(), - options: SortOptions::default(), - }]; - let sort_exec = Arc::new(SortExec::try_new(sort_exprs, source, None)?) - as Arc; - let sort_exprs = vec![PhysicalSortExpr { - expr: col("nullable_col", schema.as_ref()).unwrap(), - options: SortOptions::default(), - }]; - let physical_plan = Arc::new(SortExec::try_new(sort_exprs, sort_exec, None)?) - as Arc; - let formatted = displayable(physical_plan.as_ref()).indent().to_string(); - let expected = { - vec![ - "SortExec: [nullable_col@0 ASC]", - " SortExec: [non_nullable_col@1 ASC]", - ] - }; - let actual: Vec<&str> = formatted.trim().lines().collect(); - let actual_len = actual.len(); - let actual_trim_last = &actual[..actual_len - 1]; - assert_eq!( - expected, actual_trim_last, - "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n" - ); - let optimized_physical_plan = - EnforceSorting::new().optimize(physical_plan, state.config_options())?; - let formatted = displayable(optimized_physical_plan.as_ref()) - .indent() - .to_string(); - let expected = { vec!["SortExec: [nullable_col@0 ASC]"] }; - let actual: Vec<&str> = formatted.trim().lines().collect(); - let actual_len = actual.len(); - let actual_trim_last = &actual[..actual_len - 1]; - assert_eq!( - expected, actual_trim_last, - "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n" - ); + let source = memory_exec(&schema); + let input = sort_exec(vec![sort_expr("non_nullable_col", &schema)], source); + let physical_plan = sort_exec(vec![sort_expr("nullable_col", &schema)], input); + + let expected_input = vec![ + "SortExec: [nullable_col@0 ASC]", + " SortExec: [non_nullable_col@1 ASC]", + " MemoryExec: partitions=0, partition_sizes=[]", + ]; + let expected_optimized = vec![ + "SortExec: [nullable_col@0 ASC]", + " MemoryExec: partitions=0, partition_sizes=[]", + ]; + assert_optimized!(expected_input, expected_optimized, physical_plan); Ok(()) } #[tokio::test] async fn test_remove_unnecessary_sort_window_multilayer() -> Result<()> { - let session_ctx = SessionContext::new(); - let state = session_ctx.state(); let schema = create_test_schema()?; - let source = Arc::new(MemoryExec::try_new(&[], schema.clone(), None)?) - as Arc; - let sort_exprs = vec![PhysicalSortExpr { - expr: col("non_nullable_col", source.schema().as_ref()).unwrap(), - options: SortOptions { + let source = memory_exec(&schema); + + let sort_exprs = vec![sort_expr_options( + "non_nullable_col", + &source.schema(), + SortOptions { descending: true, nulls_first: true, }, - }]; - let sort_exec = Arc::new(SortExec::try_new(sort_exprs.clone(), source, None)?) - as Arc; + )]; + let sort = sort_exec(sort_exprs.clone(), source); + let window_agg_exec = Arc::new(WindowAggExec::try_new( vec![create_window_expr( &WindowFunction::AggregateFunction(AggregateFunction::Count), @@ -664,32 +681,33 @@ mod tests { Arc::new(WindowFrame::new(true)), schema.as_ref(), )?], - sort_exec.clone(), - sort_exec.schema(), + sort.clone(), + sort.schema(), vec![], Some(sort_exprs), )?) as Arc; - let sort_exprs = vec![PhysicalSortExpr { - expr: col("non_nullable_col", window_agg_exec.schema().as_ref()).unwrap(), - options: SortOptions { + + let sort_exprs = vec![sort_expr_options( + "non_nullable_col", + &window_agg_exec.schema(), + SortOptions { descending: false, nulls_first: false, }, - }]; - let sort_exec = Arc::new(SortExec::try_new( - sort_exprs.clone(), - window_agg_exec, - None, - )?) as Arc; + )]; + + let sort = sort_exec(sort_exprs.clone(), window_agg_exec); + // Add dummy layer propagating Sort above, to test whether sort can be removed from multi layer before - let filter_exec = Arc::new(FilterExec::try_new( + let filter = filter_exec( Arc::new(NotExpr::new( col("non_nullable_col", schema.as_ref()).unwrap(), )), - sort_exec, - )?) as Arc; + sort, + ); + // let filter_exec = sort_exec; - let window_agg_exec = Arc::new(WindowAggExec::try_new( + let physical_plan = Arc::new(WindowAggExec::try_new( vec![create_window_expr( &WindowFunction::AggregateFunction(AggregateFunction::Count), "count".to_owned(), @@ -699,214 +717,221 @@ mod tests { Arc::new(WindowFrame::new(true)), schema.as_ref(), )?], - filter_exec.clone(), - filter_exec.schema(), + filter.clone(), + filter.schema(), vec![], Some(sort_exprs), )?) as Arc; - let physical_plan = window_agg_exec; - let formatted = displayable(physical_plan.as_ref()).indent().to_string(); - let expected = { - vec![ - "WindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), end_bound: CurrentRow }]", - " FilterExec: NOT non_nullable_col@1", - " SortExec: [non_nullable_col@1 ASC NULLS LAST]", - " WindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), end_bound: CurrentRow }]", - " SortExec: [non_nullable_col@1 DESC]", - " MemoryExec: partitions=0, partition_sizes=[]", - ] - }; - let actual: Vec<&str> = formatted.trim().lines().collect(); - assert_eq!( - expected, actual, - "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n" - ); - let optimized_physical_plan = - EnforceSorting::new().optimize(physical_plan, state.config_options())?; - let formatted = displayable(optimized_physical_plan.as_ref()) - .indent() - .to_string(); - let expected = { - vec![ - "WindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: CurrentRow, end_bound: Following(NULL) }]", - " FilterExec: NOT non_nullable_col@1", - " WindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), end_bound: CurrentRow }]", - " SortExec: [non_nullable_col@1 DESC]", - " MemoryExec: partitions=0, partition_sizes=[]", - ] - }; - let actual: Vec<&str> = formatted.trim().lines().collect(); - assert_eq!( - expected, actual, - "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n" - ); + + let expected_input = vec![ + "WindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), end_bound: CurrentRow }]", + " FilterExec: NOT non_nullable_col@1", + " SortExec: [non_nullable_col@1 ASC NULLS LAST]", + " WindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), end_bound: CurrentRow }]", + " SortExec: [non_nullable_col@1 DESC]", + " MemoryExec: partitions=0, partition_sizes=[]", + ]; + + let expected_optimized = vec![ + "WindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: CurrentRow, end_bound: Following(NULL) }]", + " FilterExec: NOT non_nullable_col@1", + " WindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), end_bound: CurrentRow }]", + " SortExec: [non_nullable_col@1 DESC]", + " MemoryExec: partitions=0, partition_sizes=[]", + ]; + assert_optimized!(expected_input, expected_optimized, physical_plan); Ok(()) } #[tokio::test] async fn test_add_required_sort() -> Result<()> { - let session_ctx = SessionContext::new(); - let state = session_ctx.state(); let schema = create_test_schema()?; - let source = Arc::new(MemoryExec::try_new(&[], schema.clone(), None)?) - as Arc; - let sort_exprs = vec![PhysicalSortExpr { - expr: col("nullable_col", schema.as_ref()).unwrap(), - options: SortOptions::default(), - }]; - let physical_plan = Arc::new(SortPreservingMergeExec::new(sort_exprs, source)) - as Arc; - let formatted = displayable(physical_plan.as_ref()).indent().to_string(); - let expected = { vec!["SortPreservingMergeExec: [nullable_col@0 ASC]"] }; - let actual: Vec<&str> = formatted.trim().lines().collect(); - let actual_len = actual.len(); - let actual_trim_last = &actual[..actual_len - 1]; - assert_eq!( - expected, actual_trim_last, - "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n" - ); - let optimized_physical_plan = - EnforceSorting::new().optimize(physical_plan, state.config_options())?; - let formatted = displayable(optimized_physical_plan.as_ref()) - .indent() - .to_string(); - let expected = { - vec![ - "SortPreservingMergeExec: [nullable_col@0 ASC]", - " SortExec: [nullable_col@0 ASC]", - ] - }; - let actual: Vec<&str> = formatted.trim().lines().collect(); - let actual_len = actual.len(); - let actual_trim_last = &actual[..actual_len - 1]; - assert_eq!( - expected, actual_trim_last, - "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n" - ); + let source = memory_exec(&schema); + + let sort_exprs = vec![sort_expr("nullable_col", &schema)]; + + let physical_plan = sort_preserving_merge_exec(sort_exprs, source); + + let expected_input = vec![ + "SortPreservingMergeExec: [nullable_col@0 ASC]", + " MemoryExec: partitions=0, partition_sizes=[]", + ]; + let expected_optimized = vec![ + "SortPreservingMergeExec: [nullable_col@0 ASC]", + " SortExec: [nullable_col@0 ASC]", + " MemoryExec: partitions=0, partition_sizes=[]", + ]; + assert_optimized!(expected_input, expected_optimized, physical_plan); Ok(()) } #[tokio::test] async fn test_remove_unnecessary_sort1() -> Result<()> { - let session_ctx = SessionContext::new(); - let state = session_ctx.state(); let schema = create_test_schema()?; - let source = Arc::new(MemoryExec::try_new(&[], schema.clone(), None)?) - as Arc; - let sort_exprs = vec![PhysicalSortExpr { - expr: col("nullable_col", schema.as_ref()).unwrap(), - options: SortOptions::default(), - }]; - let sort_exec = Arc::new(SortExec::try_new(sort_exprs.clone(), source, None)?) - as Arc; - let sort_preserving_merge_exec = - Arc::new(SortPreservingMergeExec::new(sort_exprs, sort_exec)) - as Arc; - let sort_exprs = vec![PhysicalSortExpr { - expr: col("nullable_col", schema.as_ref()).unwrap(), - options: SortOptions::default(), - }]; - let sort_exec = Arc::new(SortExec::try_new( - sort_exprs.clone(), - sort_preserving_merge_exec, - None, - )?) as Arc; - let sort_preserving_merge_exec = - Arc::new(SortPreservingMergeExec::new(sort_exprs, sort_exec)) - as Arc; - let physical_plan = sort_preserving_merge_exec; - let formatted = displayable(physical_plan.as_ref()).indent().to_string(); - let expected = { - vec![ - "SortPreservingMergeExec: [nullable_col@0 ASC]", - " SortExec: [nullable_col@0 ASC]", - " SortPreservingMergeExec: [nullable_col@0 ASC]", - " SortExec: [nullable_col@0 ASC]", - " MemoryExec: partitions=0, partition_sizes=[]", - ] - }; - let actual: Vec<&str> = formatted.trim().lines().collect(); - assert_eq!( - expected, actual, - "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n" - ); - let optimized_physical_plan = - EnforceSorting::new().optimize(physical_plan, state.config_options())?; - let formatted = displayable(optimized_physical_plan.as_ref()) - .indent() - .to_string(); - let expected = { - vec![ - "SortPreservingMergeExec: [nullable_col@0 ASC]", - " SortPreservingMergeExec: [nullable_col@0 ASC]", - " SortExec: [nullable_col@0 ASC]", - " MemoryExec: partitions=0, partition_sizes=[]", - ] - }; - let actual: Vec<&str> = formatted.trim().lines().collect(); - assert_eq!( - expected, actual, - "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n" - ); + let source = memory_exec(&schema); + let sort_exprs = vec![sort_expr("nullable_col", &schema)]; + let sort = sort_exec(sort_exprs.clone(), source); + let spm = sort_preserving_merge_exec(sort_exprs, sort); + + let sort_exprs = vec![sort_expr("nullable_col", &schema)]; + let sort = sort_exec(sort_exprs.clone(), spm); + let physical_plan = sort_preserving_merge_exec(sort_exprs, sort); + let expected_input = vec![ + "SortPreservingMergeExec: [nullable_col@0 ASC]", + " SortExec: [nullable_col@0 ASC]", + " SortPreservingMergeExec: [nullable_col@0 ASC]", + " SortExec: [nullable_col@0 ASC]", + " MemoryExec: partitions=0, partition_sizes=[]", + ]; + let expected_optimized = vec![ + "SortPreservingMergeExec: [nullable_col@0 ASC]", + " SortPreservingMergeExec: [nullable_col@0 ASC]", + " SortExec: [nullable_col@0 ASC]", + " MemoryExec: partitions=0, partition_sizes=[]", + ]; + assert_optimized!(expected_input, expected_optimized, physical_plan); Ok(()) } #[tokio::test] async fn test_change_wrong_sorting() -> Result<()> { - let session_ctx = SessionContext::new(); - let state = session_ctx.state(); let schema = create_test_schema()?; - let source = Arc::new(MemoryExec::try_new(&[], schema.clone(), None)?) - as Arc; + let source = memory_exec(&schema); let sort_exprs = vec![ - PhysicalSortExpr { - expr: col("nullable_col", schema.as_ref()).unwrap(), - options: SortOptions::default(), + sort_expr("nullable_col", &schema), + sort_expr("non_nullable_col", &schema), + ]; + let sort = sort_exec(vec![sort_exprs[0].clone()], source); + let physical_plan = sort_preserving_merge_exec(sort_exprs, sort); + let expected_input = vec![ + "SortPreservingMergeExec: [nullable_col@0 ASC,non_nullable_col@1 ASC]", + " SortExec: [nullable_col@0 ASC]", + " MemoryExec: partitions=0, partition_sizes=[]", + ]; + let expected_optimized = vec![ + "SortPreservingMergeExec: [nullable_col@0 ASC,non_nullable_col@1 ASC]", + " SortExec: [nullable_col@0 ASC,non_nullable_col@1 ASC]", + " MemoryExec: partitions=0, partition_sizes=[]", + ]; + assert_optimized!(expected_input, expected_optimized, physical_plan); + Ok(()) + } + + #[tokio::test] + async fn test_union_inputs_sorted() -> Result<()> { + let schema = create_test_schema()?; + + let source1 = parquet_exec(&schema); + let sort_exprs = vec![sort_expr("nullable_col", &schema)]; + let sort = sort_exec(sort_exprs.clone(), source1); + + let source2 = parquet_exec_sorted(&schema, sort_exprs.clone()); + + let union = union_exec(vec![source2, sort]); + let physical_plan = sort_preserving_merge_exec(sort_exprs, union); + + // one input to the union is already sorted, one is not. + let expected_input = vec![ + "SortPreservingMergeExec: [nullable_col@0 ASC]", + " UnionExec", + " ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[nullable_col@0 ASC], projection=[nullable_col, non_nullable_col]", + " SortExec: [nullable_col@0 ASC]", + " ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]", + ]; + // should not add a sort at the output of the union, input plan should not be changed + let expected_optimized = expected_input.clone(); + assert_optimized!(expected_input, expected_optimized, physical_plan); + Ok(()) + } + + /// make PhysicalSortExpr with default options + fn sort_expr(name: &str, schema: &Schema) -> PhysicalSortExpr { + sort_expr_options(name, schema, SortOptions::default()) + } + + /// PhysicalSortExpr with specified options + fn sort_expr_options( + name: &str, + schema: &Schema, + options: SortOptions, + ) -> PhysicalSortExpr { + PhysicalSortExpr { + expr: col(name, schema).unwrap(), + options, + } + } + + fn memory_exec(schema: &SchemaRef) -> Arc { + Arc::new(MemoryExec::try_new(&[], schema.clone(), None).unwrap()) + } + + fn sort_exec( + sort_exprs: impl IntoIterator, + input: Arc, + ) -> Arc { + let sort_exprs = sort_exprs.into_iter().collect(); + Arc::new(SortExec::try_new(sort_exprs, input, None).unwrap()) + } + + fn sort_preserving_merge_exec( + sort_exprs: impl IntoIterator, + input: Arc, + ) -> Arc { + let sort_exprs = sort_exprs.into_iter().collect(); + Arc::new(SortPreservingMergeExec::new(sort_exprs, input)) + } + + fn filter_exec( + predicate: Arc, + input: Arc, + ) -> Arc { + Arc::new(FilterExec::try_new(predicate, input).unwrap()) + } + + /// Create a non sorted parquet exec + fn parquet_exec(schema: &SchemaRef) -> Arc { + Arc::new(ParquetExec::new( + FileScanConfig { + object_store_url: ObjectStoreUrl::parse("test:///").unwrap(), + file_schema: schema.clone(), + file_groups: vec![vec![PartitionedFile::new("x".to_string(), 100)]], + statistics: Statistics::default(), + projection: None, + limit: None, + table_partition_cols: vec![], + output_ordering: None, + infinite_source: false, }, - PhysicalSortExpr { - expr: col("non_nullable_col", schema.as_ref()).unwrap(), - options: SortOptions::default(), + None, + None, + )) + } + + // Created a sorted parquet exec + fn parquet_exec_sorted( + schema: &SchemaRef, + sort_exprs: impl IntoIterator, + ) -> Arc { + let sort_exprs = sort_exprs.into_iter().collect(); + + Arc::new(ParquetExec::new( + FileScanConfig { + object_store_url: ObjectStoreUrl::parse("test:///").unwrap(), + file_schema: schema.clone(), + file_groups: vec![vec![PartitionedFile::new("x".to_string(), 100)]], + statistics: Statistics::default(), + projection: None, + limit: None, + table_partition_cols: vec![], + output_ordering: Some(sort_exprs), + infinite_source: false, }, - ]; - let sort_exec = Arc::new(SortExec::try_new( - vec![sort_exprs[0].clone()], - source, None, - )?) as Arc; - let sort_preserving_merge_exec = - Arc::new(SortPreservingMergeExec::new(sort_exprs, sort_exec)) - as Arc; - let physical_plan = sort_preserving_merge_exec; - let formatted = displayable(physical_plan.as_ref()).indent().to_string(); - let expected = { - vec![ - "SortPreservingMergeExec: [nullable_col@0 ASC,non_nullable_col@1 ASC]", - " SortExec: [nullable_col@0 ASC]", - " MemoryExec: partitions=0, partition_sizes=[]", - ] - }; - let actual: Vec<&str> = formatted.trim().lines().collect(); - assert_eq!( - expected, actual, - "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n" - ); - let optimized_physical_plan = - EnforceSorting::new().optimize(physical_plan, state.config_options())?; - let formatted = displayable(optimized_physical_plan.as_ref()) - .indent() - .to_string(); - let expected = { - vec![ - "SortPreservingMergeExec: [nullable_col@0 ASC,non_nullable_col@1 ASC]", - " SortExec: [nullable_col@0 ASC,non_nullable_col@1 ASC]", - " MemoryExec: partitions=0, partition_sizes=[]", - ] - }; - let actual: Vec<&str> = formatted.trim().lines().collect(); - assert_eq!( - expected, actual, - "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n" - ); - Ok(()) + None, + )) + } + + fn union_exec(input: Vec>) -> Arc { + Arc::new(UnionExec::new(input)) } } diff --git a/datafusion/core/src/physical_plan/mod.rs b/datafusion/core/src/physical_plan/mod.rs index b9a0d9707ee1..0417814f3c80 100644 --- a/datafusion/core/src/physical_plan/mod.rs +++ b/datafusion/core/src/physical_plan/mod.rs @@ -136,7 +136,12 @@ pub trait ExecutionPlan: Debug + Send + Sync { } /// Specifies the ordering requirements for all of the children - /// For each child, it's the local ordering requirement within each partition rather than the global ordering + /// For each child, it's the local ordering requirement within + /// each partition rather than the global ordering + /// + /// NOTE that checking `!is_empty()` does **not** check for a + /// required input ordering. Instead, the correct check is that at + /// least one entry must be `Some` fn required_input_ordering(&self) -> Vec> { vec![None; self.children().len()] } diff --git a/datafusion/core/src/physical_plan/union.rs b/datafusion/core/src/physical_plan/union.rs index 921a0d99f03e..a0fca8066115 100644 --- a/datafusion/core/src/physical_plan/union.rs +++ b/datafusion/core/src/physical_plan/union.rs @@ -247,6 +247,30 @@ impl ExecutionPlan for UnionExec { } } + fn maintains_input_order(&self) -> bool { + let first_input_ordering = self.inputs[0].output_ordering(); + // If the Union is not partition aware and all the input + // ordering spec strictly equal with the first_input_ordering, + // then the `UnionExec` maintains the input order + // + // It might be too strict here in the case that the input + // ordering are compatible but not exactly the same. See + // comments in output_ordering + !self.partition_aware + && first_input_ordering.is_some() + && self + .inputs + .iter() + .map(|plan| plan.output_ordering()) + .all(|ordering| { + ordering.is_some() + && sort_expr_list_eq_strict_order( + ordering.unwrap(), + first_input_ordering.unwrap(), + ) + }) + } + fn with_new_children( self: Arc, children: Vec>, diff --git a/datafusion/core/tests/sql/explain_analyze.rs b/datafusion/core/tests/sql/explain_analyze.rs index 0cc65abf0850..1e4f081f4789 100644 --- a/datafusion/core/tests/sql/explain_analyze.rs +++ b/datafusion/core/tests/sql/explain_analyze.rs @@ -86,11 +86,10 @@ async fn explain_analyze_baseline_metrics() { "CoalesceBatchesExec: target_batch_size=4096", "metrics=[output_rows=5, elapsed_compute" ); - // The number of output rows becomes less after changing the global sort to the local sort with limit push down assert_metrics!( &formatted, "CoalescePartitionsExec", - "metrics=[output_rows=3, elapsed_compute=" + "metrics=[output_rows=5, elapsed_compute=" ); assert_metrics!( &formatted,