diff --git a/datafusion/core/tests/dataframe/mod.rs b/datafusion/core/tests/dataframe/mod.rs index aab51efa1e34..324d9fbcc104 100644 --- a/datafusion/core/tests/dataframe/mod.rs +++ b/datafusion/core/tests/dataframe/mod.rs @@ -3115,19 +3115,18 @@ async fn union_with_mix_of_presorted_and_explicitly_resorted_inputs_with_reparti ) -> Result<()> { assert_snapshot!( union_with_mix_of_presorted_and_explicitly_resorted_inputs_impl(false).await?, - @r#" + @r" AggregateExec: mode=Final, gby=[id@0 as id], aggr=[], ordering_mode=Sorted - SortExec: expr=[id@0 ASC NULLS LAST], preserve_partitioning=[false] - CoalescePartitionsExec - AggregateExec: mode=Partial, gby=[id@0 as id], aggr=[] - UnionExec - DataSourceExec: file_groups={1 group: [[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], output_ordering=[id@0 ASC NULLS LAST], file_type=parquet + SortPreservingMergeExec: [id@0 ASC NULLS LAST] + AggregateExec: mode=Partial, gby=[id@0 as id], aggr=[], ordering_mode=Sorted + UnionExec + DataSourceExec: file_groups={1 group: [[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], output_ordering=[id@0 ASC NULLS LAST], file_type=parquet + SortExec: expr=[id@0 ASC NULLS LAST], preserve_partitioning=[false] DataSourceExec: file_groups={1 group: [[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], file_type=parquet - "#); + "); Ok(()) } -#[ignore] // See https://github.com/apache/datafusion/issues/18380 #[tokio::test] // Test with `repartition_sorts` enabled to preserve pre-sorted partitions and avoid resorting async fn union_with_mix_of_presorted_and_explicitly_resorted_inputs_with_repartition_sorts_true( @@ -3144,52 +3143,6 @@ async fn union_with_mix_of_presorted_and_explicitly_resorted_inputs_with_reparti DataSourceExec: file_groups={1 group: [[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], file_type=parquet "#); - // 💥 Doesn't pass, and generates this plan: - // - // AggregateExec: mode=Final, gby=[id@0 as id], aggr=[], ordering_mode=Sorted - // SortPreservingMergeExec: [id@0 ASC NULLS LAST] - // SortExec: expr=[id@0 ASC NULLS LAST], preserve_partitioning=[true] - // AggregateExec: mode=Partial, gby=[id@0 as id], aggr=[] - // UnionExec - // DataSourceExec: file_groups={1 group: [[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], output_ordering=[id@0 ASC NULLS LAST], file_type=parquet - // DataSourceExec: file_groups={1 group: [[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], file_type=parquet - // - // - // === Excerpt from the verbose explain === - // - // +------------------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ - // | plan_type | plan | - // +------------------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ - // | initial_physical_plan | AggregateExec: mode=Final, gby=[id@0 as id], aggr=[], ordering_mode=Sorted | - // | | AggregateExec: mode=Partial, gby=[id@0 as id], aggr=[], ordering_mode=Sorted | - // | | UnionExec | - // | | DataSourceExec: file_groups={1 group: [[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], output_ordering=[id@0 ASC NULLS LAST], file_type=parquet | - // | | SortExec: expr=[id@0 ASC NULLS LAST], preserve_partitioning=[false] | - // | | DataSourceExec: file_groups={1 group: [[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], file_type=parquet | - // ... - // | physical_plan after EnforceDistribution | OutputRequirementExec: order_by=[], dist_by=Unspecified | - // | | AggregateExec: mode=Final, gby=[id@0 as id], aggr=[], ordering_mode=Sorted | - // | | SortExec: expr=[id@0 ASC NULLS LAST], preserve_partitioning=[false] | - // | | CoalescePartitionsExec | - // | | AggregateExec: mode=Partial, gby=[id@0 as id], aggr=[], ordering_mode=Sorted | - // | | UnionExec | - // | | DataSourceExec: file_groups={1 group: [[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], output_ordering=[id@0 ASC NULLS LAST], file_type=parquet | - // | | SortExec: expr=[id@0 ASC NULLS LAST], preserve_partitioning=[false] | - // | | DataSourceExec: file_groups={1 group: [[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], file_type=parquet | - // | | | - // | physical_plan after CombinePartialFinalAggregate | SAME TEXT AS ABOVE - // | | | - // | physical_plan after EnforceSorting | OutputRequirementExec: order_by=[], dist_by=Unspecified | - // | | AggregateExec: mode=Final, gby=[id@0 as id], aggr=[], ordering_mode=Sorted | - // | | SortPreservingMergeExec: [id@0 ASC NULLS LAST] | - // | | SortExec: expr=[id@0 ASC NULLS LAST], preserve_partitioning=[true] | - // | | AggregateExec: mode=Partial, gby=[id@0 as id], aggr=[] | - // | | UnionExec | - // | | DataSourceExec: file_groups={1 group: [[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], output_ordering=[id@0 ASC NULLS LAST], file_type=parquet | - // | | DataSourceExec: file_groups={1 group: [[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], file_type=parquet | - // ... - // +------------------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ - Ok(()) } diff --git a/datafusion/core/tests/physical_optimizer/enforce_sorting.rs b/datafusion/core/tests/physical_optimizer/enforce_sorting.rs index c0cfa46733f1..caeaeb3d6986 100644 --- a/datafusion/core/tests/physical_optimizer/enforce_sorting.rs +++ b/datafusion/core/tests/physical_optimizer/enforce_sorting.rs @@ -664,21 +664,13 @@ async fn test_union_inputs_different_sorted7() -> Result<()> { // Union has unnecessarily fine ordering below it. We should be able to replace them with absolutely necessary ordering. let test = EnforceSortingTest::new(physical_plan).with_repartition_sorts(true); assert_snapshot!(test.run(), @r" - Input Plan: + Input / Optimized Plan: SortPreservingMergeExec: [nullable_col@0 ASC] UnionExec SortExec: expr=[nullable_col@0 ASC, non_nullable_col@1 ASC], preserve_partitioning=[false] DataSourceExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], file_type=parquet SortExec: expr=[nullable_col@0 ASC, non_nullable_col@1 ASC], preserve_partitioning=[false] DataSourceExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], file_type=parquet - - Optimized Plan: - SortPreservingMergeExec: [nullable_col@0 ASC] - UnionExec - SortExec: expr=[nullable_col@0 ASC], preserve_partitioning=[false] - DataSourceExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], file_type=parquet - SortExec: expr=[nullable_col@0 ASC], preserve_partitioning=[false] - DataSourceExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], file_type=parquet "); // Union preserves the inputs ordering, and we should not change any of the SortExecs under UnionExec diff --git a/datafusion/physical-optimizer/src/enforce_sorting/mod.rs b/datafusion/physical-optimizer/src/enforce_sorting/mod.rs index 28d187bbf893..69593697e272 100644 --- a/datafusion/physical-optimizer/src/enforce_sorting/mod.rs +++ b/datafusion/physical-optimizer/src/enforce_sorting/mod.rs @@ -49,7 +49,7 @@ use crate::enforce_sorting::sort_pushdown::{ use crate::output_requirements::OutputRequirementExec; use crate::utils::{ add_sort_above, add_sort_above_with_check, is_coalesce_partitions, is_limit, - is_repartition, is_sort, is_sort_preserving_merge, is_union, is_window, + is_repartition, is_sort, is_sort_preserving_merge, is_window, }; use crate::PhysicalOptimizerRule; @@ -516,10 +516,7 @@ pub fn ensure_sorting( ); child = update_sort_ctx_children_data(child, true)?; } - } else if physical_ordering.is_none() - || !plan.maintains_input_order()[idx] - || is_union(plan) - { + } else if physical_ordering.is_none() || !plan.maintains_input_order()[idx] { // We have a `SortExec` whose effect may be neutralized by another // order-imposing operator, remove this sort: child = update_child_to_remove_unnecessary_sort(idx, child, plan)?;