diff --git a/datafusion/core/tests/dataframe/mod.rs b/datafusion/core/tests/dataframe/mod.rs index e2f3ece1e4ca..b400c442389c 100644 --- a/datafusion/core/tests/dataframe/mod.rs +++ b/datafusion/core/tests/dataframe/mod.rs @@ -2869,7 +2869,7 @@ async fn test_count_wildcard_on_sort() -> Result<()> { assert_snapshot!( pretty_format_batches(&sql_results).unwrap(), - @r###" + @r" +---------------+------------------------------------------------------------------------------------------------------------+ | plan_type | plan | +---------------+------------------------------------------------------------------------------------------------------------+ @@ -2889,12 +2889,12 @@ async fn test_count_wildcard_on_sort() -> Result<()> { | | DataSourceExec: partitions=1, partition_sizes=[1] | | | | +---------------+------------------------------------------------------------------------------------------------------------+ - "### + " ); assert_snapshot!( pretty_format_batches(&df_results).unwrap(), - @r###" + @r" +---------------+----------------------------------------------------------------------------+ | plan_type | plan | +---------------+----------------------------------------------------------------------------+ @@ -2910,7 +2910,7 @@ async fn test_count_wildcard_on_sort() -> Result<()> { | | DataSourceExec: partitions=1, partition_sizes=[1] | | | | +---------------+----------------------------------------------------------------------------+ - "### + " ); Ok(()) } diff --git a/datafusion/core/tests/physical_optimizer/enforce_distribution.rs b/datafusion/core/tests/physical_optimizer/enforce_distribution.rs index aa5a2d053926..a3d9a1e407c7 100644 --- a/datafusion/core/tests/physical_optimizer/enforce_distribution.rs +++ b/datafusion/core/tests/physical_optimizer/enforce_distribution.rs @@ -1474,19 +1474,19 @@ SortMergeJoin: join_type=..., on=[(a@0, c@2)] JoinType::Inner | JoinType::Left | JoinType::LeftSemi | JoinType::LeftAnti => { // TODO(wiedld): show different test result if enforce distribution first. assert_plan!(plan_sort, @r" -SortMergeJoin: join_type=..., on=[(a@0, c@2)] - SortMergeJoin: join_type=..., on=[(a@0, b1@1)] - RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=1 - SortExec: expr=[a@0 ASC], preserve_partitioning=[false] - DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet - RepartitionExec: partitioning=Hash([b1@1], 10), input_partitions=1 - SortExec: expr=[b1@1 ASC], preserve_partitioning=[false] - ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1, d@3 as d1, e@4 as e1] - DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet - RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=1 - SortExec: expr=[c@2 ASC], preserve_partitioning=[false] - DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet -"); + SortMergeJoin: join_type=..., on=[(a@0, c@2)] + SortMergeJoin: join_type=..., on=[(a@0, b1@1)] + RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=1, maintains_sort_order=true + SortExec: expr=[a@0 ASC], preserve_partitioning=[false] + DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet + RepartitionExec: partitioning=Hash([b1@1], 10), input_partitions=1, maintains_sort_order=true + SortExec: expr=[b1@1 ASC], preserve_partitioning=[false] + ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1, d@3 as d1, e@4 as e1] + DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet + RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=1, maintains_sort_order=true + SortExec: expr=[c@2 ASC], preserve_partitioning=[false] + DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet + "); } // Should include 8 RepartitionExecs (4 hash, 8 round-robin), 4 SortExecs // Since ordering of the left child is not preserved after SortMergeJoin @@ -1500,22 +1500,22 @@ SortMergeJoin: join_type=..., on=[(a@0, c@2)] _ => { // TODO(wiedld): show different test result if enforce distribution first. assert_plan!(plan_sort, @r" -SortMergeJoin: join_type=..., on=[(a@0, c@2)] - RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=1 - SortExec: expr=[a@0 ASC], preserve_partitioning=[false] - CoalescePartitionsExec - SortMergeJoin: join_type=..., on=[(a@0, b1@1)] - RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=1 - SortExec: expr=[a@0 ASC], preserve_partitioning=[false] - DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet - RepartitionExec: partitioning=Hash([b1@1], 10), input_partitions=1 - SortExec: expr=[b1@1 ASC], preserve_partitioning=[false] - ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1, d@3 as d1, e@4 as e1] - DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet - RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=1 - SortExec: expr=[c@2 ASC], preserve_partitioning=[false] - DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet -"); + SortMergeJoin: join_type=..., on=[(a@0, c@2)] + RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=1, maintains_sort_order=true + SortExec: expr=[a@0 ASC], preserve_partitioning=[false] + CoalescePartitionsExec + SortMergeJoin: join_type=..., on=[(a@0, b1@1)] + RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=1, maintains_sort_order=true + SortExec: expr=[a@0 ASC], preserve_partitioning=[false] + DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet + RepartitionExec: partitioning=Hash([b1@1], 10), input_partitions=1, maintains_sort_order=true + SortExec: expr=[b1@1 ASC], preserve_partitioning=[false] + ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1, d@3 as d1, e@4 as e1] + DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet + RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=1, maintains_sort_order=true + SortExec: expr=[c@2 ASC], preserve_partitioning=[false] + DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet + "); } } @@ -1581,40 +1581,40 @@ SortMergeJoin: join_type=..., on=[(b1@6, c@2)] JoinType::Inner | JoinType::Right => { // TODO(wiedld): show different test result if enforce distribution first. assert_plan!(plan_sort, @r" -SortMergeJoin: join_type=..., on=[(b1@6, c@2)] - SortMergeJoin: join_type=..., on=[(a@0, b1@1)] - RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=1 - SortExec: expr=[a@0 ASC], preserve_partitioning=[false] - DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet - RepartitionExec: partitioning=Hash([b1@1], 10), input_partitions=1 - SortExec: expr=[b1@1 ASC], preserve_partitioning=[false] - ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1, d@3 as d1, e@4 as e1] - DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet - RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=1 - SortExec: expr=[c@2 ASC], preserve_partitioning=[false] - DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet -"); + SortMergeJoin: join_type=..., on=[(b1@6, c@2)] + SortMergeJoin: join_type=..., on=[(a@0, b1@1)] + RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=1, maintains_sort_order=true + SortExec: expr=[a@0 ASC], preserve_partitioning=[false] + DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet + RepartitionExec: partitioning=Hash([b1@1], 10), input_partitions=1, maintains_sort_order=true + SortExec: expr=[b1@1 ASC], preserve_partitioning=[false] + ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1, d@3 as d1, e@4 as e1] + DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet + RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=1, maintains_sort_order=true + SortExec: expr=[c@2 ASC], preserve_partitioning=[false] + DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet + "); } // Should include 8 RepartitionExecs (4 of them preserves order) and 4 SortExecs JoinType::Left | JoinType::Full => { // TODO(wiedld): show different test result if enforce distribution first. assert_plan!(plan_sort, @r" -SortMergeJoin: join_type=..., on=[(b1@6, c@2)] - RepartitionExec: partitioning=Hash([b1@6], 10), input_partitions=1 - SortExec: expr=[b1@6 ASC], preserve_partitioning=[false] - CoalescePartitionsExec - SortMergeJoin: join_type=..., on=[(a@0, b1@1)] - RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=1 - SortExec: expr=[a@0 ASC], preserve_partitioning=[false] - DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet - RepartitionExec: partitioning=Hash([b1@1], 10), input_partitions=1 - SortExec: expr=[b1@1 ASC], preserve_partitioning=[false] - ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1, d@3 as d1, e@4 as e1] - DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet - RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=1 - SortExec: expr=[c@2 ASC], preserve_partitioning=[false] - DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet -"); + SortMergeJoin: join_type=..., on=[(b1@6, c@2)] + RepartitionExec: partitioning=Hash([b1@6], 10), input_partitions=1, maintains_sort_order=true + SortExec: expr=[b1@6 ASC], preserve_partitioning=[false] + CoalescePartitionsExec + SortMergeJoin: join_type=..., on=[(a@0, b1@1)] + RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=1, maintains_sort_order=true + SortExec: expr=[a@0 ASC], preserve_partitioning=[false] + DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet + RepartitionExec: partitioning=Hash([b1@1], 10), input_partitions=1, maintains_sort_order=true + SortExec: expr=[b1@1 ASC], preserve_partitioning=[false] + ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1, d@3 as d1, e@4 as e1] + DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet + RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=1, maintains_sort_order=true + SortExec: expr=[c@2 ASC], preserve_partitioning=[false] + DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet + "); } // this match arm cannot be reached _ => unreachable!() @@ -1703,27 +1703,27 @@ SortMergeJoin: join_type=Inner, on=[(b3@1, b2@1), (a3@0, a2@0)] // Test: result IS DIFFERENT, if EnforceSorting is run first: let plan_sort = test_config.to_plan(join, &SORT_DISTRIB_DISTRIB); assert_plan!(plan_sort, @r" -SortMergeJoin: join_type=Inner, on=[(b3@1, b2@1), (a3@0, a2@0)] - RepartitionExec: partitioning=Hash([b3@1, a3@0], 10), input_partitions=1 - SortExec: expr=[b3@1 ASC, a3@0 ASC], preserve_partitioning=[false] - CoalescePartitionsExec - ProjectionExec: expr=[a1@0 as a3, b1@1 as b3] - ProjectionExec: expr=[a1@1 as a1, b1@0 as b1] - AggregateExec: mode=FinalPartitioned, gby=[b1@0 as b1, a1@1 as a1], aggr=[] - RepartitionExec: partitioning=Hash([b1@0, a1@1], 10), input_partitions=10 - AggregateExec: mode=Partial, gby=[b@1 as b1, a@0 as a1], aggr=[] - RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1 - DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet - RepartitionExec: partitioning=Hash([b2@1, a2@0], 10), input_partitions=1 - SortExec: expr=[b2@1 ASC, a2@0 ASC], preserve_partitioning=[false] - CoalescePartitionsExec - ProjectionExec: expr=[a@1 as a2, b@0 as b2] - AggregateExec: mode=FinalPartitioned, gby=[b@0 as b, a@1 as a], aggr=[] - RepartitionExec: partitioning=Hash([b@0, a@1], 10), input_partitions=10 - AggregateExec: mode=Partial, gby=[b@1 as b, a@0 as a], aggr=[] - RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1 - DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet -"); + SortMergeJoin: join_type=Inner, on=[(b3@1, b2@1), (a3@0, a2@0)] + RepartitionExec: partitioning=Hash([b3@1, a3@0], 10), input_partitions=1, maintains_sort_order=true + SortExec: expr=[b3@1 ASC, a3@0 ASC], preserve_partitioning=[false] + CoalescePartitionsExec + ProjectionExec: expr=[a1@0 as a3, b1@1 as b3] + ProjectionExec: expr=[a1@1 as a1, b1@0 as b1] + AggregateExec: mode=FinalPartitioned, gby=[b1@0 as b1, a1@1 as a1], aggr=[] + RepartitionExec: partitioning=Hash([b1@0, a1@1], 10), input_partitions=10 + AggregateExec: mode=Partial, gby=[b@1 as b1, a@0 as a1], aggr=[] + RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1 + DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet + RepartitionExec: partitioning=Hash([b2@1, a2@0], 10), input_partitions=1, maintains_sort_order=true + SortExec: expr=[b2@1 ASC, a2@0 ASC], preserve_partitioning=[false] + CoalescePartitionsExec + ProjectionExec: expr=[a@1 as a2, b@0 as b2] + AggregateExec: mode=FinalPartitioned, gby=[b@0 as b, a@1 as a], aggr=[] + RepartitionExec: partitioning=Hash([b@0, a@1], 10), input_partitions=10 + AggregateExec: mode=Partial, gby=[b@1 as b, a@0 as a], aggr=[] + RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1 + DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet + "); Ok(()) } @@ -1980,12 +1980,12 @@ fn repartition_sorted_limit_with_filter() -> Result<()> { let plan_distrib = test_config.to_plan(plan.clone(), &DISTRIB_DISTRIB_SORT); assert_plan!(plan_distrib, @r" -SortRequiredExec: [c@2 ASC] - FilterExec: c@2 = 0 - RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1 - SortExec: expr=[c@2 ASC], preserve_partitioning=[false] - DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet -"); + SortRequiredExec: [c@2 ASC] + FilterExec: c@2 = 0 + RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1, maintains_sort_order=true + SortExec: expr=[c@2 ASC], preserve_partitioning=[false] + DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet + "); // We can use repartition here, ordering requirement by SortRequiredExec // is still satisfied. let plan_sort = test_config.to_plan(plan, &SORT_DISTRIB_DISTRIB); @@ -2175,11 +2175,11 @@ fn repartition_does_not_destroy_sort() -> Result<()> { let plan_distrib = test_config.to_plan(plan.clone(), &DISTRIB_DISTRIB_SORT); assert_plan!(plan_distrib, @r" -SortRequiredExec: [d@3 ASC] - FilterExec: c@2 = 0 - RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1 - DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[d@3 ASC], file_type=parquet -"); + SortRequiredExec: [d@3 ASC] + FilterExec: c@2 = 0 + RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1, maintains_sort_order=true + DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[d@3 ASC], file_type=parquet + "); // during repartitioning ordering is preserved let plan_sort = test_config.to_plan(plan, &SORT_DISTRIB_DISTRIB); assert_plan!(plan_distrib, plan_sort); @@ -2740,14 +2740,14 @@ fn parallelization_limit_with_filter() -> Result<()> { // SortExec doesn't benefit from input partitioning assert_plan!(plan_parquet_distrib, @r" -GlobalLimitExec: skip=0, fetch=100 - CoalescePartitionsExec - LocalLimitExec: fetch=100 - FilterExec: c@2 = 0 - RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1 - SortExec: expr=[c@2 ASC], preserve_partitioning=[false] - DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet -"); + GlobalLimitExec: skip=0, fetch=100 + CoalescePartitionsExec + LocalLimitExec: fetch=100 + FilterExec: c@2 = 0 + RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1, maintains_sort_order=true + SortExec: expr=[c@2 ASC], preserve_partitioning=[false] + DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet + "); let plan_parquet_sort = test_config.to_plan(plan_parquet, &SORT_DISTRIB_DISTRIB); assert_plan!(plan_parquet_distrib, plan_parquet_sort); @@ -2758,14 +2758,14 @@ GlobalLimitExec: skip=0, fetch=100 // SortExec doesn't benefit from input partitioning assert_plan!(plan_csv_distrib, @r" -GlobalLimitExec: skip=0, fetch=100 - CoalescePartitionsExec - LocalLimitExec: fetch=100 - FilterExec: c@2 = 0 - RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1 - SortExec: expr=[c@2 ASC], preserve_partitioning=[false] - DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false -"); + GlobalLimitExec: skip=0, fetch=100 + CoalescePartitionsExec + LocalLimitExec: fetch=100 + FilterExec: c@2 = 0 + RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1, maintains_sort_order=true + SortExec: expr=[c@2 ASC], preserve_partitioning=[false] + DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false + "); let plan_csv_sort = test_config.to_plan(plan_csv, &SORT_DISTRIB_DISTRIB); assert_plan!(plan_csv_distrib, plan_csv_sort); diff --git a/datafusion/core/tests/physical_optimizer/enforce_sorting.rs b/datafusion/core/tests/physical_optimizer/enforce_sorting.rs index c0cfa46733f1..3bfc919a079c 100644 --- a/datafusion/core/tests/physical_optimizer/enforce_sorting.rs +++ b/datafusion/core/tests/physical_optimizer/enforce_sorting.rs @@ -1631,13 +1631,13 @@ async fn test_with_lost_ordering_unbounded() -> Result<()> { SortExec: expr=[a@0 ASC], preserve_partitioning=[false] CoalescePartitionsExec RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10 - RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1 + RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1, maintains_sort_order=true StreamingTableExec: partition_sizes=1, projection=[a, b, c, d, e], infinite_source=true, output_ordering=[a@0 ASC] Optimized Plan: SortPreservingMergeExec: [a@0 ASC] RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10, preserve_order=true, sort_exprs=a@0 ASC - RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1 + RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1, maintains_sort_order=true StreamingTableExec: partition_sizes=1, projection=[a, b, c, d, e], infinite_source=true, output_ordering=[a@0 ASC] "); @@ -1649,13 +1649,13 @@ async fn test_with_lost_ordering_unbounded() -> Result<()> { SortExec: expr=[a@0 ASC], preserve_partitioning=[false] CoalescePartitionsExec RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10 - RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1 + RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1, maintains_sort_order=true StreamingTableExec: partition_sizes=1, projection=[a, b, c, d, e], infinite_source=true, output_ordering=[a@0 ASC] Optimized Plan: SortPreservingMergeExec: [a@0 ASC] RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10, preserve_order=true, sort_exprs=a@0 ASC - RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1 + RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1, maintains_sort_order=true StreamingTableExec: partition_sizes=1, projection=[a, b, c, d, e], infinite_source=true, output_ordering=[a@0 ASC] "); @@ -1674,7 +1674,7 @@ async fn test_with_lost_ordering_bounded() -> Result<()> { SortExec: expr=[a@0 ASC], preserve_partitioning=[false] CoalescePartitionsExec RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10 - RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1 + RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1, maintains_sort_order=true DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=csv, has_header=false "); @@ -1686,14 +1686,14 @@ async fn test_with_lost_ordering_bounded() -> Result<()> { SortExec: expr=[a@0 ASC], preserve_partitioning=[false] CoalescePartitionsExec RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10 - RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1 + RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1, maintains_sort_order=true DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=csv, has_header=false Optimized Plan: SortPreservingMergeExec: [a@0 ASC] SortExec: expr=[a@0 ASC], preserve_partitioning=[true] RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10 - RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1 + RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1, maintains_sort_order=true DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=csv, has_header=false "); @@ -1715,7 +1715,7 @@ async fn test_do_not_pushdown_through_spm() -> Result<()> { Input / Optimized Plan: SortExec: expr=[b@1 ASC], preserve_partitioning=[false] SortPreservingMergeExec: [a@0 ASC, b@1 ASC] - RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1 + RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1, maintains_sort_order=true DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC, b@1 ASC], file_type=csv, has_header=false "); @@ -1744,13 +1744,13 @@ async fn test_pushdown_through_spm() -> Result<()> { Input Plan: SortExec: expr=[a@0 ASC, b@1 ASC, c@2 ASC], preserve_partitioning=[false] SortPreservingMergeExec: [a@0 ASC, b@1 ASC] - RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1 + RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1, maintains_sort_order=true DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC, b@1 ASC], file_type=csv, has_header=false Optimized Plan: SortPreservingMergeExec: [a@0 ASC, b@1 ASC] SortExec: expr=[a@0 ASC, b@1 ASC, c@2 ASC], preserve_partitioning=[true] - RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1 + RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1, maintains_sort_order=true DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC, b@1 ASC], file_type=csv, has_header=false "); Ok(()) @@ -1774,7 +1774,7 @@ async fn test_window_multi_layer_requirement() -> Result<()> { BoundedWindowAggExec: wdw=[count: Field { "count": Int64 }, frame: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted] SortPreservingMergeExec: [a@0 ASC, b@1 ASC] RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=10, preserve_order=true, sort_exprs=a@0 ASC, b@1 ASC - RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1 + RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1, maintains_sort_order=true SortExec: expr=[a@0 ASC, b@1 ASC], preserve_partitioning=[false] DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false @@ -1969,7 +1969,7 @@ async fn test_remove_unnecessary_sort2() -> Result<()> { assert_snapshot!(test.run(), @r" Input Plan: RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=10 - RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1 + RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1, maintains_sort_order=true SortExec: expr=[nullable_col@0 ASC], preserve_partitioning=[false] SortPreservingMergeExec: [nullable_col@0 ASC, non_nullable_col@1 ASC] SortExec: expr=[nullable_col@0 ASC, non_nullable_col@1 ASC], preserve_partitioning=[false] @@ -2016,7 +2016,7 @@ async fn test_remove_unnecessary_sort3() -> Result<()> { AggregateExec: mode=Final, gby=[], aggr=[] SortPreservingMergeExec: [nullable_col@0 ASC, non_nullable_col@1 ASC] SortExec: expr=[nullable_col@0 ASC, non_nullable_col@1 ASC], preserve_partitioning=[true] - RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1 + RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1, maintains_sort_order=true SortPreservingMergeExec: [non_nullable_col@1 ASC] SortExec: expr=[non_nullable_col@1 ASC], preserve_partitioning=[false] DataSourceExec: partitions=1, partition_sizes=[0] @@ -2365,7 +2365,7 @@ async fn test_commutativity() -> Result<()> { assert_snapshot!(displayable(orig_plan.as_ref()).indent(true), @r#" SortExec: expr=[nullable_col@0 ASC], preserve_partitioning=[false] - RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1 + RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1, maintains_sort_order=true BoundedWindowAggExec: wdw=[count: Field { "count": Int64 }, frame: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted] DataSourceExec: partitions=1, partition_sizes=[0] "#); diff --git a/datafusion/core/tests/physical_optimizer/replace_with_order_preserving_variants.rs b/datafusion/core/tests/physical_optimizer/replace_with_order_preserving_variants.rs index 066e52614a12..8fe35fc307df 100644 --- a/datafusion/core/tests/physical_optimizer/replace_with_order_preserving_variants.rs +++ b/datafusion/core/tests/physical_optimizer/replace_with_order_preserving_variants.rs @@ -192,7 +192,7 @@ async fn test_replace_multiple_input_repartition_1( SortPreservingMergeExec: [a@0 ASC NULLS LAST] SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true] RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 - RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, maintains_sort_order=true DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST "); }, @@ -202,13 +202,13 @@ async fn test_replace_multiple_input_repartition_1( SortPreservingMergeExec: [a@0 ASC NULLS LAST] SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true] RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 - RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, maintains_sort_order=true StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST] Optimized: SortPreservingMergeExec: [a@0 ASC NULLS LAST] RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST - RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, maintains_sort_order=true StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST] "); }, @@ -218,13 +218,13 @@ async fn test_replace_multiple_input_repartition_1( SortPreservingMergeExec: [a@0 ASC NULLS LAST] SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true] RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 - RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, maintains_sort_order=true DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST Optimized: SortPreservingMergeExec: [a@0 ASC NULLS LAST] RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST - RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, maintains_sort_order=true DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST "); } @@ -275,21 +275,21 @@ async fn test_with_inter_children_change_only( SortExec: expr=[a@0 ASC], preserve_partitioning=[true] FilterExec: c@1 > 3 RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 - RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, maintains_sort_order=true SortExec: expr=[a@0 ASC], preserve_partitioning=[false] CoalescePartitionsExec RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 - RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, maintains_sort_order=true StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC] Optimized: SortPreservingMergeExec: [a@0 ASC] FilterExec: c@1 > 3 RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC - RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, maintains_sort_order=true SortPreservingMergeExec: [a@0 ASC] RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC - RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, maintains_sort_order=true StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC] "); }, @@ -300,11 +300,11 @@ async fn test_with_inter_children_change_only( SortExec: expr=[a@0 ASC], preserve_partitioning=[true] FilterExec: c@1 > 3 RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 - RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, maintains_sort_order=true SortExec: expr=[a@0 ASC], preserve_partitioning=[false] CoalescePartitionsExec RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 - RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, maintains_sort_order=true DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC "); }, @@ -315,21 +315,21 @@ async fn test_with_inter_children_change_only( SortExec: expr=[a@0 ASC], preserve_partitioning=[true] FilterExec: c@1 > 3 RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 - RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, maintains_sort_order=true SortExec: expr=[a@0 ASC], preserve_partitioning=[false] CoalescePartitionsExec RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 - RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, maintains_sort_order=true DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC Optimized: SortPreservingMergeExec: [a@0 ASC] FilterExec: c@1 > 3 RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC - RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, maintains_sort_order=true SortPreservingMergeExec: [a@0 ASC] RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC - RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, maintains_sort_order=true DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC "); } @@ -375,14 +375,14 @@ async fn test_replace_multiple_input_repartition_2( SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true] RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 FilterExec: c@1 > 3 - RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, maintains_sort_order=true StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST] Optimized: SortPreservingMergeExec: [a@0 ASC NULLS LAST] RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST FilterExec: c@1 > 3 - RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, maintains_sort_order=true StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST] "); }, @@ -393,7 +393,7 @@ async fn test_replace_multiple_input_repartition_2( SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true] RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 FilterExec: c@1 > 3 - RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, maintains_sort_order=true DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST "); }, @@ -404,14 +404,14 @@ async fn test_replace_multiple_input_repartition_2( SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true] RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 FilterExec: c@1 > 3 - RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, maintains_sort_order=true DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST Optimized: SortPreservingMergeExec: [a@0 ASC NULLS LAST] RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST FilterExec: c@1 > 3 - RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, maintains_sort_order=true DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST "); } @@ -460,7 +460,7 @@ async fn test_replace_multiple_input_repartition_with_extra_steps( CoalesceBatchesExec: target_batch_size=8192 FilterExec: c@1 > 3 RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 - RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, maintains_sort_order=true StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST] Optimized: @@ -468,7 +468,7 @@ async fn test_replace_multiple_input_repartition_with_extra_steps( CoalesceBatchesExec: target_batch_size=8192 FilterExec: c@1 > 3 RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST - RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, maintains_sort_order=true StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST] "); }, @@ -480,7 +480,7 @@ async fn test_replace_multiple_input_repartition_with_extra_steps( CoalesceBatchesExec: target_batch_size=8192 FilterExec: c@1 > 3 RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 - RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, maintains_sort_order=true DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST "); }, @@ -492,7 +492,7 @@ async fn test_replace_multiple_input_repartition_with_extra_steps( CoalesceBatchesExec: target_batch_size=8192 FilterExec: c@1 > 3 RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 - RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, maintains_sort_order=true DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST Optimized: @@ -500,7 +500,7 @@ async fn test_replace_multiple_input_repartition_with_extra_steps( CoalesceBatchesExec: target_batch_size=8192 FilterExec: c@1 > 3 RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST - RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, maintains_sort_order=true DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST "); } @@ -551,7 +551,7 @@ async fn test_replace_multiple_input_repartition_with_extra_steps_2( FilterExec: c@1 > 3 RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 CoalesceBatchesExec: target_batch_size=8192 - RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, maintains_sort_order=true StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST] Optimized: @@ -560,7 +560,7 @@ async fn test_replace_multiple_input_repartition_with_extra_steps_2( FilterExec: c@1 > 3 RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST CoalesceBatchesExec: target_batch_size=8192 - RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, maintains_sort_order=true StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST] "); }, @@ -573,7 +573,7 @@ async fn test_replace_multiple_input_repartition_with_extra_steps_2( FilterExec: c@1 > 3 RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 CoalesceBatchesExec: target_batch_size=8192 - RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, maintains_sort_order=true DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST "); }, @@ -586,7 +586,7 @@ async fn test_replace_multiple_input_repartition_with_extra_steps_2( FilterExec: c@1 > 3 RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 CoalesceBatchesExec: target_batch_size=8192 - RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, maintains_sort_order=true DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST Optimized: @@ -595,7 +595,7 @@ async fn test_replace_multiple_input_repartition_with_extra_steps_2( FilterExec: c@1 > 3 RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST CoalesceBatchesExec: target_batch_size=8192 - RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, maintains_sort_order=true DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST "); } @@ -639,7 +639,7 @@ async fn test_not_replacing_when_no_need_to_preserve_sorting( CoalesceBatchesExec: target_batch_size=8192 FilterExec: c@1 > 3 RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 - RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, maintains_sort_order=true StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST] "); }, @@ -650,7 +650,7 @@ async fn test_not_replacing_when_no_need_to_preserve_sorting( CoalesceBatchesExec: target_batch_size=8192 FilterExec: c@1 > 3 RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 - RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, maintains_sort_order=true DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST "); // Expected bounded results same with and without flag, because there is no executor with ordering requirement @@ -662,7 +662,7 @@ async fn test_not_replacing_when_no_need_to_preserve_sorting( CoalesceBatchesExec: target_batch_size=8192 FilterExec: c@1 > 3 RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 - RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, maintains_sort_order=true DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST "); } @@ -712,7 +712,7 @@ async fn test_with_multiple_replaceable_repartitions( CoalesceBatchesExec: target_batch_size=8192 FilterExec: c@1 > 3 RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 - RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, maintains_sort_order=true StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST] Optimized: @@ -721,7 +721,7 @@ async fn test_with_multiple_replaceable_repartitions( CoalesceBatchesExec: target_batch_size=8192 FilterExec: c@1 > 3 RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST - RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, maintains_sort_order=true StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST] "); }, @@ -734,7 +734,7 @@ async fn test_with_multiple_replaceable_repartitions( CoalesceBatchesExec: target_batch_size=8192 FilterExec: c@1 > 3 RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 - RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, maintains_sort_order=true DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST "); }, @@ -747,7 +747,7 @@ async fn test_with_multiple_replaceable_repartitions( CoalesceBatchesExec: target_batch_size=8192 FilterExec: c@1 > 3 RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 - RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, maintains_sort_order=true DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST Optimized: @@ -756,7 +756,7 @@ async fn test_with_multiple_replaceable_repartitions( CoalesceBatchesExec: target_batch_size=8192 FilterExec: c@1 > 3 RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST - RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, maintains_sort_order=true DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST "); } @@ -804,7 +804,7 @@ async fn test_not_replace_with_different_orderings( SortPreservingMergeExec: [c@1 ASC] SortExec: expr=[c@1 ASC], preserve_partitioning=[true] RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 - RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, maintains_sort_order=true StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST] "); }, @@ -814,7 +814,7 @@ async fn test_not_replace_with_different_orderings( SortPreservingMergeExec: [c@1 ASC] SortExec: expr=[c@1 ASC], preserve_partitioning=[true] RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 - RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, maintains_sort_order=true DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST "); // Expected bounded results same with and without flag, because ordering requirement of the executor is @@ -826,7 +826,7 @@ async fn test_not_replace_with_different_orderings( SortPreservingMergeExec: [c@1 ASC] SortExec: expr=[c@1 ASC], preserve_partitioning=[true] RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 - RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, maintains_sort_order=true DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST "); } @@ -870,13 +870,13 @@ async fn test_with_lost_ordering( SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[false] CoalescePartitionsExec RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 - RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, maintains_sort_order=true StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST] Optimized: SortPreservingMergeExec: [a@0 ASC NULLS LAST] RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST - RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, maintains_sort_order=true StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST] "); }, @@ -886,7 +886,7 @@ async fn test_with_lost_ordering( SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[false] CoalescePartitionsExec RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 - RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, maintains_sort_order=true DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST "); }, @@ -896,13 +896,13 @@ async fn test_with_lost_ordering( SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[false] CoalescePartitionsExec RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 - RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, maintains_sort_order=true DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST Optimized: SortPreservingMergeExec: [a@0 ASC NULLS LAST] RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST - RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, maintains_sort_order=true DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST "); } @@ -956,22 +956,22 @@ async fn test_with_lost_and_kept_ordering( SortExec: expr=[c@1 ASC], preserve_partitioning=[true] FilterExec: c@1 > 3 RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 - RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, maintains_sort_order=true SortExec: expr=[c@1 ASC], preserve_partitioning=[false] CoalescePartitionsExec RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 - RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, maintains_sort_order=true StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST] Optimized: SortPreservingMergeExec: [c@1 ASC] FilterExec: c@1 > 3 RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=c@1 ASC - RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, maintains_sort_order=true SortExec: expr=[c@1 ASC], preserve_partitioning=[false] CoalescePartitionsExec RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 - RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, maintains_sort_order=true StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST] "); }, @@ -982,11 +982,11 @@ async fn test_with_lost_and_kept_ordering( SortExec: expr=[c@1 ASC], preserve_partitioning=[true] FilterExec: c@1 > 3 RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 - RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, maintains_sort_order=true SortExec: expr=[c@1 ASC], preserve_partitioning=[false] CoalescePartitionsExec RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 - RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, maintains_sort_order=true DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST "); }, @@ -997,22 +997,22 @@ async fn test_with_lost_and_kept_ordering( SortExec: expr=[c@1 ASC], preserve_partitioning=[true] FilterExec: c@1 > 3 RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 - RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, maintains_sort_order=true SortExec: expr=[c@1 ASC], preserve_partitioning=[false] CoalescePartitionsExec RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 - RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, maintains_sort_order=true DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST Optimized: SortPreservingMergeExec: [c@1 ASC] FilterExec: c@1 > 3 RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=c@1 ASC - RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, maintains_sort_order=true SortExec: expr=[c@1 ASC], preserve_partitioning=[false] CoalescePartitionsExec RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 - RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, maintains_sort_order=true DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST "); } @@ -1077,11 +1077,11 @@ async fn test_with_multiple_child_trees( HashJoinExec: mode=Partitioned, join_type=Inner, on=[(c@1, c@1)] CoalesceBatchesExec: target_batch_size=4096 RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 - RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, maintains_sort_order=true StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST] CoalesceBatchesExec: target_batch_size=4096 RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 - RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, maintains_sort_order=true StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST] "); }, @@ -1093,11 +1093,11 @@ async fn test_with_multiple_child_trees( HashJoinExec: mode=Partitioned, join_type=Inner, on=[(c@1, c@1)] CoalesceBatchesExec: target_batch_size=4096 RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 - RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, maintains_sort_order=true DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST CoalesceBatchesExec: target_batch_size=4096 RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 - RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, maintains_sort_order=true DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST "); // Expected bounded results same with and without flag, because ordering get lost during intermediate executor anyway. diff --git a/datafusion/core/tests/physical_optimizer/sanity_checker.rs b/datafusion/core/tests/physical_optimizer/sanity_checker.rs index 9867ed173341..f46147de1bfd 100644 --- a/datafusion/core/tests/physical_optimizer/sanity_checker.rs +++ b/datafusion/core/tests/physical_optimizer/sanity_checker.rs @@ -556,10 +556,10 @@ async fn test_sort_merge_join_satisfied() -> Result<()> { actual, @r" SortMergeJoin: join_type=Inner, on=[(c9@0, a@0)] - RepartitionExec: partitioning=Hash([c9@0], 10), input_partitions=1 + RepartitionExec: partitioning=Hash([c9@0], 10), input_partitions=1, maintains_sort_order=true SortExec: expr=[c9@0 ASC], preserve_partitioning=[false] DataSourceExec: partitions=1, partition_sizes=[0] - RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=1 + RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=1, maintains_sort_order=true SortExec: expr=[a@0 ASC], preserve_partitioning=[false] DataSourceExec: partitions=1, partition_sizes=[0] " @@ -606,7 +606,7 @@ async fn test_sort_merge_join_order_missing() -> Result<()> { actual, @r" SortMergeJoin: join_type=Inner, on=[(c9@0, a@0)] - RepartitionExec: partitioning=Hash([c9@0], 10), input_partitions=1 + RepartitionExec: partitioning=Hash([c9@0], 10), input_partitions=1, maintains_sort_order=true SortExec: expr=[c9@0 ASC], preserve_partitioning=[false] DataSourceExec: partitions=1, partition_sizes=[0] RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=1 @@ -654,10 +654,10 @@ async fn test_sort_merge_join_dist_missing() -> Result<()> { actual, @r" SortMergeJoin: join_type=Inner, on=[(c9@0, a@0)] - RepartitionExec: partitioning=Hash([c9@0], 10), input_partitions=1 + RepartitionExec: partitioning=Hash([c9@0], 10), input_partitions=1, maintains_sort_order=true SortExec: expr=[c9@0 ASC], preserve_partitioning=[false] DataSourceExec: partitions=1, partition_sizes=[0] - RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1 + RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1, maintains_sort_order=true SortExec: expr=[a@0 ASC], preserve_partitioning=[false] DataSourceExec: partitions=1, partition_sizes=[0] " diff --git a/datafusion/core/tests/sql/explain_analyze.rs b/datafusion/core/tests/sql/explain_analyze.rs index 7a38934d29a3..4a67046e933d 100644 --- a/datafusion/core/tests/sql/explain_analyze.rs +++ b/datafusion/core/tests/sql/explain_analyze.rs @@ -811,7 +811,7 @@ async fn test_physical_plan_display_indent_multi_children() { assert_snapshot!( actual, - @r###" + @r" CoalesceBatchesExec: target_batch_size=4096 HashJoinExec: mode=Partitioned, join_type=Inner, on=[(c1@0, c2@0)], projection=[c1@0] CoalesceBatchesExec: target_batch_size=4096 @@ -821,7 +821,7 @@ async fn test_physical_plan_display_indent_multi_children() { RepartitionExec: partitioning=Hash([c2@0], 9000), input_partitions=1 ProjectionExec: expr=[c1@0 as c2] DataSourceExec: file_groups={1 group: [[ARROW_TEST_DATA/csv/aggregate_test_100.csv]]}, projection=[c1], file_type=csv, has_header=true - "### + " ); } diff --git a/datafusion/core/tests/sql/joins.rs b/datafusion/core/tests/sql/joins.rs index b64a1e18f3f6..9b8d1ae75a38 100644 --- a/datafusion/core/tests/sql/joins.rs +++ b/datafusion/core/tests/sql/joins.rs @@ -73,10 +73,10 @@ async fn join_change_in_planner() -> Result<()> { @r" SymmetricHashJoinExec: mode=Partitioned, join_type=Full, on=[(a2@1, a2@1)], filter=CAST(a1@0 AS Int64) > CAST(a1@1 AS Int64) + 3 AND CAST(a1@0 AS Int64) < CAST(a1@1 AS Int64) + 10 CoalesceBatchesExec: target_batch_size=8192 - RepartitionExec: partitioning=Hash([a2@1], 8), input_partitions=1 + RepartitionExec: partitioning=Hash([a2@1], 8), input_partitions=1, maintains_sort_order=true StreamingTableExec: partition_sizes=1, projection=[a1, a2], infinite_source=true, output_ordering=[a1@0 ASC NULLS LAST] CoalesceBatchesExec: target_batch_size=8192 - RepartitionExec: partitioning=Hash([a2@1], 8), input_partitions=1 + RepartitionExec: partitioning=Hash([a2@1], 8), input_partitions=1, maintains_sort_order=true StreamingTableExec: partition_sizes=1, projection=[a1, a2], infinite_source=true, output_ordering=[a1@0 ASC NULLS LAST] " ); @@ -132,10 +132,10 @@ async fn join_no_order_on_filter() -> Result<()> { @r" SymmetricHashJoinExec: mode=Partitioned, join_type=Full, on=[(a2@1, a2@1)], filter=CAST(a3@0 AS Int64) > CAST(a3@1 AS Int64) + 3 AND CAST(a3@0 AS Int64) < CAST(a3@1 AS Int64) + 10 CoalesceBatchesExec: target_batch_size=8192 - RepartitionExec: partitioning=Hash([a2@1], 8), input_partitions=1 + RepartitionExec: partitioning=Hash([a2@1], 8), input_partitions=1, maintains_sort_order=true StreamingTableExec: partition_sizes=1, projection=[a1, a2, a3], infinite_source=true, output_ordering=[a1@0 ASC NULLS LAST] CoalesceBatchesExec: target_batch_size=8192 - RepartitionExec: partitioning=Hash([a2@1], 8), input_partitions=1 + RepartitionExec: partitioning=Hash([a2@1], 8), input_partitions=1, maintains_sort_order=true StreamingTableExec: partition_sizes=1, projection=[a1, a2, a3], infinite_source=true, output_ordering=[a1@0 ASC NULLS LAST] " ); diff --git a/datafusion/physical-plan/src/repartition/mod.rs b/datafusion/physical-plan/src/repartition/mod.rs index 53cd24185ff8..3d3a5d6e2c21 100644 --- a/datafusion/physical-plan/src/repartition/mod.rs +++ b/datafusion/physical-plan/src/repartition/mod.rs @@ -739,6 +739,7 @@ impl RepartitionExec { impl DisplayAs for RepartitionExec { fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result { + let input_partition_count = self.input.output_partitioning().partition_count(); match t { DisplayFormatType::Default | DisplayFormatType::Verbose => { write!( @@ -746,11 +747,15 @@ impl DisplayAs for RepartitionExec { "{}: partitioning={}, input_partitions={}", self.name(), self.partitioning(), - self.input.output_partitioning().partition_count() + input_partition_count, )?; if self.preserve_order { write!(f, ", preserve_order=true")?; + } else if input_partition_count <= 1 + && self.input.output_ordering().is_some() + { + write!(f, ", maintains_sort_order=true")?; } if let Some(sort_exprs) = self.sort_exprs() { @@ -760,9 +765,6 @@ impl DisplayAs for RepartitionExec { } DisplayFormatType::TreeRender => { writeln!(f, "partitioning_scheme={}", self.partitioning(),)?; - - let input_partition_count = - self.input.output_partitioning().partition_count(); let output_partition_count = self.partitioning().partition_count(); let input_to_output_partition_str = format!("{input_partition_count} -> {output_partition_count}"); @@ -773,6 +775,10 @@ impl DisplayAs for RepartitionExec { if self.preserve_order { writeln!(f, "preserve_order={}", self.preserve_order)?; + } else if input_partition_count <= 1 + && self.input.output_ordering().is_some() + { + write!(f, "maintains_sort_order=true")?; } Ok(()) } @@ -2431,7 +2437,7 @@ mod test { // Repartition should not preserve order assert_plan!(&exec, @r" - RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1 + RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1, maintains_sort_order=true DataSourceExec: partitions=1, partition_sizes=[0], output_ordering=c0@0 ASC "); @@ -2668,7 +2674,7 @@ mod test { // Repartition should not preserve order assert_plan!(exec.as_ref(), @r" - RepartitionExec: partitioning=RoundRobinBatch(20), input_partitions=1 + RepartitionExec: partitioning=RoundRobinBatch(20), input_partitions=1, maintains_sort_order=true DataSourceExec: partitions=1, partition_sizes=[0], output_ordering=c0@0 ASC "); Ok(()) diff --git a/datafusion/sqllogictest/test_files/agg_func_substitute.slt b/datafusion/sqllogictest/test_files/agg_func_substitute.slt index 9aeaaacb1071..088c61047ea8 100644 --- a/datafusion/sqllogictest/test_files/agg_func_substitute.slt +++ b/datafusion/sqllogictest/test_files/agg_func_substitute.slt @@ -49,7 +49,7 @@ physical_plan 04)------CoalesceBatchesExec: target_batch_size=8192 05)--------RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=4 06)----------AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[nth_value(multiple_ordered_table.c,Int64(1)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]], ordering_mode=Sorted -07)------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +07)------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1, maintains_sort_order=true 08)--------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, c], output_orderings=[[a@0 ASC NULLS LAST], [c@1 ASC NULLS LAST]], file_type=csv, has_header=true @@ -69,7 +69,7 @@ physical_plan 04)------CoalesceBatchesExec: target_batch_size=8192 05)--------RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=4 06)----------AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[nth_value(multiple_ordered_table.c,Int64(1)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]], ordering_mode=Sorted -07)------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +07)------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1, maintains_sort_order=true 08)--------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, c], output_orderings=[[a@0 ASC NULLS LAST], [c@1 ASC NULLS LAST]], file_type=csv, has_header=true query TT @@ -88,7 +88,7 @@ physical_plan 04)------CoalesceBatchesExec: target_batch_size=8192 05)--------RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=4 06)----------AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[nth_value(multiple_ordered_table.c,Int64(1) + Int64(100)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]], ordering_mode=Sorted -07)------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +07)------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1, maintains_sort_order=true 08)--------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, c], output_orderings=[[a@0 ASC NULLS LAST], [c@1 ASC NULLS LAST]], file_type=csv, has_header=true query II diff --git a/datafusion/sqllogictest/test_files/aggregate.slt b/datafusion/sqllogictest/test_files/aggregate.slt index e81bfb72a0ef..83faae0db595 100644 --- a/datafusion/sqllogictest/test_files/aggregate.slt +++ b/datafusion/sqllogictest/test_files/aggregate.slt @@ -7170,7 +7170,7 @@ physical_plan 01)AggregateExec: mode=Final, gby=[], aggr=[first_value(convert_first_last_table.c1) ORDER BY [convert_first_last_table.c3 DESC NULLS FIRST]] 02)--CoalescePartitionsExec 03)----AggregateExec: mode=Partial, gby=[], aggr=[last_value(convert_first_last_table.c1) ORDER BY [convert_first_last_table.c3 ASC NULLS LAST]] -04)------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +04)------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1, maintains_sort_order=true 05)--------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/convert_first_last.csv]]}, projection=[c1, c3], output_orderings=[[c1@0 ASC NULLS LAST], [c3@1 ASC NULLS LAST]], file_type=csv, has_header=true # test last to first @@ -7184,7 +7184,7 @@ physical_plan 01)AggregateExec: mode=Final, gby=[], aggr=[last_value(convert_first_last_table.c1) ORDER BY [convert_first_last_table.c2 ASC NULLS LAST]] 02)--CoalescePartitionsExec 03)----AggregateExec: mode=Partial, gby=[], aggr=[first_value(convert_first_last_table.c1) ORDER BY [convert_first_last_table.c2 DESC NULLS FIRST]] -04)------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +04)------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1, maintains_sort_order=true 05)--------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/convert_first_last.csv]]}, projection=[c1, c2], output_orderings=[[c1@0 ASC NULLS LAST], [c2@1 DESC]], file_type=csv, has_header=true # test building plan with aggreagte sum diff --git a/datafusion/sqllogictest/test_files/explain_tree.slt b/datafusion/sqllogictest/test_files/explain_tree.slt index 6ac28997a990..d507b7f2dfed 100644 --- a/datafusion/sqllogictest/test_files/explain_tree.slt +++ b/datafusion/sqllogictest/test_files/explain_tree.slt @@ -1430,18 +1430,21 @@ physical_plan 12)┌─────────────┴─────────────┐ 13)│ RepartitionExec │ 14)│ -------------------- │ -15)│ partition_count(in->out): │ -16)│ 1 -> 4 │ +15)│ maintains_sort_order: │ +16)│ true │ 17)│ │ -18)│ partitioning_scheme: │ -19)│ RoundRobinBatch(4) │ -20)└─────────────┬─────────────┘ -21)┌─────────────┴─────────────┐ -22)│ StreamingTableExec │ -23)│ -------------------- │ -24)│ infinite: true │ -25)│ limit: None │ -26)└───────────────────────────┘ +18)│ partition_count(in->out): │ +19)│ 1 -> 4 │ +20)│ │ +21)│ partitioning_scheme: │ +22)│ RoundRobinBatch(4) │ +23)└─────────────┬─────────────┘ +24)┌─────────────┴─────────────┐ +25)│ StreamingTableExec │ +26)│ -------------------- │ +27)│ infinite: true │ +28)│ limit: None │ +29)└───────────────────────────┘ # constant ticker, CAST(time AS DATE) = time, order by time @@ -1466,18 +1469,21 @@ physical_plan 13)┌─────────────┴─────────────┐ 14)│ RepartitionExec │ 15)│ -------------------- │ -16)│ partition_count(in->out): │ -17)│ 1 -> 4 │ +16)│ maintains_sort_order: │ +17)│ true │ 18)│ │ -19)│ partitioning_scheme: │ -20)│ RoundRobinBatch(4) │ -21)└─────────────┬─────────────┘ -22)┌─────────────┴─────────────┐ -23)│ StreamingTableExec │ -24)│ -------------------- │ -25)│ infinite: true │ -26)│ limit: None │ -27)└───────────────────────────┘ +19)│ partition_count(in->out): │ +20)│ 1 -> 4 │ +21)│ │ +22)│ partitioning_scheme: │ +23)│ RoundRobinBatch(4) │ +24)└─────────────┬─────────────┘ +25)┌─────────────┴─────────────┐ +26)│ StreamingTableExec │ +27)│ -------------------- │ +28)│ infinite: true │ +29)│ limit: None │ +30)└───────────────────────────┘ # same thing but order by date query TT @@ -1501,18 +1507,21 @@ physical_plan 13)┌─────────────┴─────────────┐ 14)│ RepartitionExec │ 15)│ -------------------- │ -16)│ partition_count(in->out): │ -17)│ 1 -> 4 │ +16)│ maintains_sort_order: │ +17)│ true │ 18)│ │ -19)│ partitioning_scheme: │ -20)│ RoundRobinBatch(4) │ -21)└─────────────┬─────────────┘ -22)┌─────────────┴─────────────┐ -23)│ StreamingTableExec │ -24)│ -------------------- │ -25)│ infinite: true │ -26)│ limit: None │ -27)└───────────────────────────┘ +19)│ partition_count(in->out): │ +20)│ 1 -> 4 │ +21)│ │ +22)│ partitioning_scheme: │ +23)│ RoundRobinBatch(4) │ +24)└─────────────┬─────────────┘ +25)┌─────────────┴─────────────┐ +26)│ StreamingTableExec │ +27)│ -------------------- │ +28)│ infinite: true │ +29)│ limit: None │ +30)└───────────────────────────┘ # same thing but order by ticker query TT @@ -1534,18 +1543,21 @@ physical_plan 11)┌─────────────┴─────────────┐ 12)│ RepartitionExec │ 13)│ -------------------- │ -14)│ partition_count(in->out): │ -15)│ 1 -> 4 │ +14)│ maintains_sort_order: │ +15)│ true │ 16)│ │ -17)│ partitioning_scheme: │ -18)│ RoundRobinBatch(4) │ -19)└─────────────┬─────────────┘ -20)┌─────────────┴─────────────┐ -21)│ StreamingTableExec │ -22)│ -------------------- │ -23)│ infinite: true │ -24)│ limit: None │ -25)└───────────────────────────┘ +17)│ partition_count(in->out): │ +18)│ 1 -> 4 │ +19)│ │ +20)│ partitioning_scheme: │ +21)│ RoundRobinBatch(4) │ +22)└─────────────┬─────────────┘ +23)┌─────────────┴─────────────┐ +24)│ StreamingTableExec │ +25)│ -------------------- │ +26)│ infinite: true │ +27)│ limit: None │ +28)└───────────────────────────┘ # same thing but order by time, date @@ -1571,18 +1583,21 @@ physical_plan 14)┌─────────────┴─────────────┐ 15)│ RepartitionExec │ 16)│ -------------------- │ -17)│ partition_count(in->out): │ -18)│ 1 -> 4 │ +17)│ maintains_sort_order: │ +18)│ true │ 19)│ │ -20)│ partitioning_scheme: │ -21)│ RoundRobinBatch(4) │ -22)└─────────────┬─────────────┘ -23)┌─────────────┴─────────────┐ -24)│ StreamingTableExec │ -25)│ -------------------- │ -26)│ infinite: true │ -27)│ limit: None │ -28)└───────────────────────────┘ +20)│ partition_count(in->out): │ +21)│ 1 -> 4 │ +22)│ │ +23)│ partitioning_scheme: │ +24)│ RoundRobinBatch(4) │ +25)└─────────────┬─────────────┘ +26)┌─────────────┴─────────────┐ +27)│ StreamingTableExec │ +28)│ -------------------- │ +29)│ infinite: true │ +30)│ limit: None │ +31)└───────────────────────────┘ @@ -1611,18 +1626,21 @@ physical_plan 15)┌─────────────┴─────────────┐ 16)│ RepartitionExec │ 17)│ -------------------- │ -18)│ partition_count(in->out): │ -19)│ 1 -> 4 │ +18)│ maintains_sort_order: │ +19)│ true │ 20)│ │ -21)│ partitioning_scheme: │ -22)│ RoundRobinBatch(4) │ -23)└─────────────┬─────────────┘ -24)┌─────────────┴─────────────┐ -25)│ StreamingTableExec │ -26)│ -------------------- │ -27)│ infinite: true │ -28)│ limit: None │ -29)└───────────────────────────┘ +21)│ partition_count(in->out): │ +22)│ 1 -> 4 │ +23)│ │ +24)│ partitioning_scheme: │ +25)│ RoundRobinBatch(4) │ +26)└─────────────┬─────────────┘ +27)┌─────────────┴─────────────┐ +28)│ StreamingTableExec │ +29)│ -------------------- │ +30)│ infinite: true │ +31)│ limit: None │ +32)└───────────────────────────┘ @@ -1649,18 +1667,21 @@ physical_plan 13)┌─────────────┴─────────────┐ 14)│ RepartitionExec │ 15)│ -------------------- │ -16)│ partition_count(in->out): │ -17)│ 1 -> 4 │ +16)│ maintains_sort_order: │ +17)│ true │ 18)│ │ -19)│ partitioning_scheme: │ -20)│ RoundRobinBatch(4) │ -21)└─────────────┬─────────────┘ -22)┌─────────────┴─────────────┐ -23)│ StreamingTableExec │ -24)│ -------------------- │ -25)│ infinite: true │ -26)│ limit: None │ -27)└───────────────────────────┘ +19)│ partition_count(in->out): │ +20)│ 1 -> 4 │ +21)│ │ +22)│ partitioning_scheme: │ +23)│ RoundRobinBatch(4) │ +24)└─────────────┬─────────────┘ +25)┌─────────────┴─────────────┐ +26)│ StreamingTableExec │ +27)│ -------------------- │ +28)│ infinite: true │ +29)│ limit: None │ +30)└───────────────────────────┘ diff --git a/datafusion/sqllogictest/test_files/filter_without_sort_exec.slt b/datafusion/sqllogictest/test_files/filter_without_sort_exec.slt index be79f1423859..ec069212f558 100644 --- a/datafusion/sqllogictest/test_files/filter_without_sort_exec.slt +++ b/datafusion/sqllogictest/test_files/filter_without_sort_exec.slt @@ -39,7 +39,7 @@ logical_plan physical_plan 01)SortPreservingMergeExec: [date@0 ASC NULLS LAST, time@2 ASC NULLS LAST] 02)--FilterExec: ticker@1 = A -03)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +03)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1, maintains_sort_order=true 04)------StreamingTableExec: partition_sizes=1, projection=[date, ticker, time], infinite_source=true, output_ordering=[date@0 ASC NULLS LAST, ticker@1 ASC NULLS LAST, time@2 ASC NULLS LAST] # constant ticker, CAST(time AS DATE) = time, order by time @@ -55,7 +55,7 @@ logical_plan physical_plan 01)SortPreservingMergeExec: [time@2 ASC NULLS LAST] 02)--FilterExec: ticker@1 = A AND CAST(time@2 AS Date32) = date@0 -03)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +03)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1, maintains_sort_order=true 04)------StreamingTableExec: partition_sizes=1, projection=[date, ticker, time], infinite_source=true, output_ordering=[date@0 ASC NULLS LAST, ticker@1 ASC NULLS LAST, time@2 ASC NULLS LAST] # same thing but order by date @@ -71,7 +71,7 @@ logical_plan physical_plan 01)SortPreservingMergeExec: [date@0 ASC NULLS LAST] 02)--FilterExec: ticker@1 = A AND CAST(time@2 AS Date32) = date@0 -03)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +03)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1, maintains_sort_order=true 04)------StreamingTableExec: partition_sizes=1, projection=[date, ticker, time], infinite_source=true, output_ordering=[date@0 ASC NULLS LAST, ticker@1 ASC NULLS LAST, time@2 ASC NULLS LAST] # same thing but order by ticker @@ -87,7 +87,7 @@ logical_plan physical_plan 01)CoalescePartitionsExec 02)--FilterExec: ticker@1 = A AND CAST(time@2 AS Date32) = date@0 -03)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +03)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1, maintains_sort_order=true 04)------StreamingTableExec: partition_sizes=1, projection=[date, ticker, time], infinite_source=true, output_ordering=[date@0 ASC NULLS LAST, ticker@1 ASC NULLS LAST, time@2 ASC NULLS LAST] # same thing but order by time, date @@ -103,7 +103,7 @@ logical_plan physical_plan 01)SortPreservingMergeExec: [time@2 ASC NULLS LAST, date@0 ASC NULLS LAST] 02)--FilterExec: ticker@1 = A AND CAST(time@2 AS Date32) = date@0 -03)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +03)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1, maintains_sort_order=true 04)------StreamingTableExec: partition_sizes=1, projection=[date, ticker, time], infinite_source=true, output_ordering=[date@0 ASC NULLS LAST, ticker@1 ASC NULLS LAST, time@2 ASC NULLS LAST] # CAST(time AS DATE) <> date (should require a sort) @@ -143,5 +143,5 @@ logical_plan physical_plan 01)SortPreservingMergeExec: [ticker@1 ASC NULLS LAST, time@2 ASC NULLS LAST] 02)--FilterExec: date@0 = 2006-01-02 -03)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +03)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1, maintains_sort_order=true 04)------StreamingTableExec: partition_sizes=1, projection=[date, ticker, time], infinite_source=true, output_ordering=[date@0 ASC NULLS LAST, ticker@1 ASC NULLS LAST, time@2 ASC NULLS LAST] diff --git a/datafusion/sqllogictest/test_files/group_by.slt b/datafusion/sqllogictest/test_files/group_by.slt index b74815edaa57..ddf081689e9e 100644 --- a/datafusion/sqllogictest/test_files/group_by.slt +++ b/datafusion/sqllogictest/test_files/group_by.slt @@ -3870,7 +3870,7 @@ physical_plan 03)----CoalesceBatchesExec: target_batch_size=2 04)------RepartitionExec: partitioning=Hash([d@0], 8), input_partitions=8 05)--------AggregateExec: mode=Partial, gby=[d@2 as d], aggr=[first_value(multiple_ordered_table.a) ORDER BY [multiple_ordered_table.a ASC NULLS LAST], first_value(multiple_ordered_table.c) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]] -06)----------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 +06)----------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, maintains_sort_order=true 07)------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, c, d], output_orderings=[[a@0 ASC NULLS LAST], [c@1 ASC NULLS LAST]], file_type=csv, has_header=true query II rowsort @@ -3982,7 +3982,7 @@ physical_plan 03)----CoalesceBatchesExec: target_batch_size=2 04)------RepartitionExec: partitioning=Hash([c@0, b@1], 8), input_partitions=8 05)--------AggregateExec: mode=Partial, gby=[c@1 as c, b@0 as b], aggr=[sum(multiple_ordered_table_with_pk.d)], ordering_mode=PartiallySorted([0]) -06)----------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 +06)----------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, maintains_sort_order=true 07)------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[b, c, d], output_ordering=[c@1 ASC NULLS LAST], constraints=[PrimaryKey([3])], file_type=csv, has_header=true # drop table multiple_ordered_table_with_pk @@ -4023,7 +4023,7 @@ physical_plan 03)----CoalesceBatchesExec: target_batch_size=2 04)------RepartitionExec: partitioning=Hash([c@0, b@1], 8), input_partitions=8 05)--------AggregateExec: mode=Partial, gby=[c@1 as c, b@0 as b], aggr=[sum(multiple_ordered_table_with_pk.d)], ordering_mode=PartiallySorted([0]) -06)----------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 +06)----------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, maintains_sort_order=true 07)------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[b, c, d], output_ordering=[c@1 ASC NULLS LAST], constraints=[PrimaryKey([3])], file_type=csv, has_header=true statement ok @@ -4297,7 +4297,7 @@ physical_plan 04)------CoalesceBatchesExec: target_batch_size=2 05)--------RepartitionExec: partitioning=Hash([date_bin(Utf8("15 minutes"),unbounded_csv_with_timestamps.ts)@0], 8), input_partitions=8, preserve_order=true, sort_exprs=date_bin(Utf8("15 minutes"),unbounded_csv_with_timestamps.ts)@0 DESC 06)----------AggregateExec: mode=Partial, gby=[date_bin(IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 900000000000 }, ts@0) as date_bin(Utf8("15 minutes"),unbounded_csv_with_timestamps.ts)], aggr=[], ordering_mode=Sorted -07)------------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 +07)------------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, maintains_sort_order=true 08)--------------StreamingTableExec: partition_sizes=1, projection=[ts], infinite_source=true, output_ordering=[ts@0 DESC] query P @@ -4352,7 +4352,7 @@ physical_plan 05)--------CoalesceBatchesExec: target_batch_size=2 06)----------RepartitionExec: partitioning=Hash([date_part(Utf8("MONTH"),csv_with_timestamps.ts)@0], 8), input_partitions=8 07)------------AggregateExec: mode=Partial, gby=[date_part(MONTH, ts@0) as date_part(Utf8("MONTH"),csv_with_timestamps.ts)], aggr=[] -08)--------------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 +08)--------------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, maintains_sort_order=true 09)----------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/timestamps.csv]]}, projection=[ts], output_ordering=[ts@0 DESC], file_type=csv, has_header=false query I @@ -4392,7 +4392,7 @@ logical_plan physical_plan 01)SortPreservingMergeExec: [name@0 DESC, time_chunks@1 DESC], fetch=5 02)--ProjectionExec: expr=[name@0 as name, date_bin(IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 900000000000 }, ts@1) as time_chunks] -03)----RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 +03)----RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, maintains_sort_order=true 04)------StreamingTableExec: partition_sizes=1, projection=[name, ts], infinite_source=true, output_ordering=[name@0 DESC, ts@1 DESC] statement ok diff --git a/datafusion/sqllogictest/test_files/join_disable_repartition_joins.slt b/datafusion/sqllogictest/test_files/join_disable_repartition_joins.slt index e78169734fe5..65e8d3189f00 100644 --- a/datafusion/sqllogictest/test_files/join_disable_repartition_joins.slt +++ b/datafusion/sqllogictest/test_files/join_disable_repartition_joins.slt @@ -58,7 +58,7 @@ physical_plan 02)--CoalesceBatchesExec: target_batch_size=8192, fetch=5 03)----HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(c@0, c@1)], projection=[a@1] 04)------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[c], file_type=csv, has_header=true -05)------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +05)------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1, maintains_sort_order=true 06)--------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, c], output_ordering=[a@0 ASC NULLS LAST], file_type=csv, has_header=true # preserve_inner_join @@ -100,7 +100,7 @@ physical_plan 04)------HashJoinExec: mode=CollectLeft, join_type=RightSemi, on=[(d@1, d@3), (c@0, c@2)], projection=[a@0, b@1] 05)--------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[c, d], file_type=csv, has_header=true 06)--------FilterExec: d@3 = 3 -07)----------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +07)----------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1, maintains_sort_order=true 08)------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b, c, d], output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST, c@2 ASC NULLS LAST], file_type=csv, has_header=true # preserve_right_semi_join diff --git a/datafusion/sqllogictest/test_files/joins.slt b/datafusion/sqllogictest/test_files/joins.slt index 4a243258a519..f217ba1bd5a0 100644 --- a/datafusion/sqllogictest/test_files/joins.slt +++ b/datafusion/sqllogictest/test_files/joins.slt @@ -3189,12 +3189,12 @@ physical_plan 01)SortPreservingMergeExec: [rn1@5 ASC NULLS LAST] 02)--SortMergeJoin: join_type=Inner, on=[(a@1, a@1)] 03)----CoalesceBatchesExec: target_batch_size=2 -04)------RepartitionExec: partitioning=Hash([a@1], 2), input_partitions=1 +04)------RepartitionExec: partitioning=Hash([a@1], 2), input_partitions=1, maintains_sort_order=true 05)--------ProjectionExec: expr=[a0@0 as a0, a@1 as a, b@2 as b, c@3 as c, d@4 as d, row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING@5 as rn1] 06)----------BoundedWindowAggExec: wdw=[row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING: Field { "row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING": UInt64 }, frame: ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING], mode=[Sorted] 07)------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a, b, c, d], output_ordering=[a@1 ASC, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST], file_type=csv, has_header=true 08)----CoalesceBatchesExec: target_batch_size=2 -09)------RepartitionExec: partitioning=Hash([a@1], 2), input_partitions=1 +09)------RepartitionExec: partitioning=Hash([a@1], 2), input_partitions=1, maintains_sort_order=true 10)--------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a, b, c, d], output_ordering=[a@1 ASC, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST], file_type=csv, has_header=true # sort merge join should propagate ordering equivalence of the right side @@ -3221,10 +3221,10 @@ physical_plan 01)SortPreservingMergeExec: [rn1@10 ASC NULLS LAST] 02)--SortMergeJoin: join_type=Right, on=[(a@1, a@1)] 03)----CoalesceBatchesExec: target_batch_size=2 -04)------RepartitionExec: partitioning=Hash([a@1], 2), input_partitions=1 +04)------RepartitionExec: partitioning=Hash([a@1], 2), input_partitions=1, maintains_sort_order=true 05)--------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a, b, c, d], output_ordering=[a@1 ASC, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST], file_type=csv, has_header=true 06)----CoalesceBatchesExec: target_batch_size=2 -07)------RepartitionExec: partitioning=Hash([a@1], 2), input_partitions=1 +07)------RepartitionExec: partitioning=Hash([a@1], 2), input_partitions=1, maintains_sort_order=true 08)--------ProjectionExec: expr=[a0@0 as a0, a@1 as a, b@2 as b, c@3 as c, d@4 as d, row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING@5 as rn1] 09)----------BoundedWindowAggExec: wdw=[row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING: Field { "row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING": UInt64 }, frame: ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING], mode=[Sorted] 10)------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a, b, c, d], output_ordering=[a@1 ASC, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST], file_type=csv, has_header=true @@ -3260,12 +3260,12 @@ physical_plan 01)SortPreservingMergeExec: [a@1 ASC, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST, rn1@11 ASC NULLS LAST] 02)--SortMergeJoin: join_type=Inner, on=[(a@1, a@1)] 03)----CoalesceBatchesExec: target_batch_size=2 -04)------RepartitionExec: partitioning=Hash([a@1], 2), input_partitions=1 +04)------RepartitionExec: partitioning=Hash([a@1], 2), input_partitions=1, maintains_sort_order=true 05)--------ProjectionExec: expr=[a0@0 as a0, a@1 as a, b@2 as b, c@3 as c, d@4 as d, row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING@5 as rn1] 06)----------BoundedWindowAggExec: wdw=[row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING: Field { "row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING": UInt64 }, frame: ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING], mode=[Sorted] 07)------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a, b, c, d], output_ordering=[a@1 ASC, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST], file_type=csv, has_header=true 08)----CoalesceBatchesExec: target_batch_size=2 -09)------RepartitionExec: partitioning=Hash([a@1], 2), input_partitions=1 +09)------RepartitionExec: partitioning=Hash([a@1], 2), input_partitions=1, maintains_sort_order=true 10)--------ProjectionExec: expr=[a0@0 as a0, a@1 as a, b@2 as b, c@3 as c, d@4 as d, row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING@5 as rn1] 11)----------BoundedWindowAggExec: wdw=[row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING: Field { "row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING": UInt64 }, frame: ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING], mode=[Sorted] 12)------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a, b, c, d], output_ordering=[a@1 ASC, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST], file_type=csv, has_header=true @@ -3463,10 +3463,10 @@ physical_plan 07)------------CoalesceBatchesExec: target_batch_size=2 08)--------------HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, a@0)] 09)----------------CoalesceBatchesExec: target_batch_size=2 -10)------------------RepartitionExec: partitioning=Hash([a@0], 2), input_partitions=1 +10)------------------RepartitionExec: partitioning=Hash([a@0], 2), input_partitions=1, maintains_sort_order=true 11)--------------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b, c], output_ordering=[a@0 ASC, b@1 ASC NULLS LAST, c@2 ASC NULLS LAST], file_type=csv, has_header=true 12)----------------CoalesceBatchesExec: target_batch_size=2 -13)------------------RepartitionExec: partitioning=Hash([a@0], 2), input_partitions=1 +13)------------------RepartitionExec: partitioning=Hash([a@0], 2), input_partitions=1, maintains_sort_order=true 14)--------------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b], output_ordering=[a@0 ASC, b@1 ASC NULLS LAST], file_type=csv, has_header=true query TT @@ -3483,7 +3483,7 @@ logical_plan physical_plan 01)NestedLoopJoinExec: join_type=Inner, filter=a@1 < a@0 02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a, b, c, d], output_ordering=[a@1 ASC, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST], file_type=csv, has_header=true -03)--RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 +03)--RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1, maintains_sort_order=true 04)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a, b, c, d], output_ordering=[a@1 ASC, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST], file_type=csv, has_header=true # Currently datafusion can pushdown filter conditions with scalar UDF into @@ -3504,7 +3504,7 @@ physical_plan 02)--ProjectionExec: expr=[a0@0 as a0, a@1 as a, b@2 as b, c@3 as c, d@4 as d, CAST(a@1 AS Float64) as join_proj_push_down_1] 03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a, b, c, d], output_ordering=[a@1 ASC, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST], file_type=csv, has_header=true 04)--ProjectionExec: expr=[a0@0 as a0, a@1 as a, b@2 as b, c@3 as c, d@4 as d, CAST(a@1 AS Float64) as join_proj_push_down_2] -05)----RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 +05)----RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1, maintains_sort_order=true 06)------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a, b, c, d], output_ordering=[a@1 ASC, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST], file_type=csv, has_header=true #### diff --git a/datafusion/sqllogictest/test_files/limit.slt b/datafusion/sqllogictest/test_files/limit.slt index 48781e46f11c..a2b6eae7d966 100644 --- a/datafusion/sqllogictest/test_files/limit.slt +++ b/datafusion/sqllogictest/test_files/limit.slt @@ -664,7 +664,7 @@ physical_plan 03)----SortExec: TopK(fetch=14), expr=[c@0 DESC], preserve_partitioning=[true] 04)------UnionExec 05)--------ProjectionExec: expr=[CAST(c@0 AS Int64) as c] -06)----------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +06)----------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1, maintains_sort_order=true 07)------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[c], output_ordering=[c@0 ASC NULLS LAST], file_type=csv, has_header=true 08)--------ProjectionExec: expr=[CAST(d@0 AS Int64) as c] 09)----------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 diff --git a/datafusion/sqllogictest/test_files/monotonic_projection_test.slt b/datafusion/sqllogictest/test_files/monotonic_projection_test.slt index 9c806cfa0d8a..d94d48d45af9 100644 --- a/datafusion/sqllogictest/test_files/monotonic_projection_test.slt +++ b/datafusion/sqllogictest/test_files/monotonic_projection_test.slt @@ -46,7 +46,7 @@ logical_plan physical_plan 01)SortPreservingMergeExec: [a_big@0 ASC NULLS LAST, b@1 ASC NULLS LAST] 02)--ProjectionExec: expr=[CAST(a@0 AS Int64) as a_big, b@1 as b] -03)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +03)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1, maintains_sort_order=true 04)------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b], output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST], file_type=csv, has_header=true query TT @@ -62,7 +62,7 @@ logical_plan physical_plan 01)SortPreservingMergeExec: [a@0 ASC NULLS LAST, b@2 ASC NULLS LAST] 02)--ProjectionExec: expr=[a@0 as a, CAST(a@0 AS Int64) as a_big, b@1 as b] -03)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +03)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1, maintains_sort_order=true 04)------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b], output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST], file_type=csv, has_header=true # Cast to larger types as well as preserving ordering @@ -83,7 +83,7 @@ logical_plan physical_plan 01)SortPreservingMergeExec: [a_big@1 ASC NULLS LAST, b@2 ASC NULLS LAST] 02)--ProjectionExec: expr=[a@0 as a, CAST(a@0 AS Int64) as a_big, b@1 as b] -03)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +03)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1, maintains_sort_order=true 04)------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b], output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST], file_type=csv, has_header=true # test for common rename @@ -135,7 +135,7 @@ physical_plan 01)SortPreservingMergeExec: [a_str@0 ASC NULLS LAST, b@1 ASC NULLS LAST] 02)--SortExec: expr=[a_str@0 ASC NULLS LAST, b@1 ASC NULLS LAST], preserve_partitioning=[true] 03)----ProjectionExec: expr=[CAST(a@0 AS Utf8View) as a_str, b@1 as b] -04)------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +04)------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1, maintains_sort_order=true 05)--------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b], output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST], file_type=csv, has_header=true # We cannot determine a+b is ordered from the @@ -170,5 +170,5 @@ physical_plan 01)SortPreservingMergeExec: [sum_expr@0 ASC NULLS LAST] 02)--SortExec: expr=[sum_expr@0 ASC NULLS LAST], preserve_partitioning=[true] 03)----ProjectionExec: expr=[CAST(a@0 + b@1 AS Int64) as sum_expr, a@0 as a, b@1 as b] -04)------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +04)------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1, maintains_sort_order=true 05)--------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b], output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST], file_type=csv, has_header=true diff --git a/datafusion/sqllogictest/test_files/order.slt b/datafusion/sqllogictest/test_files/order.slt index b1d02f6dc16e..8f0cb5e53d76 100644 --- a/datafusion/sqllogictest/test_files/order.slt +++ b/datafusion/sqllogictest/test_files/order.slt @@ -561,7 +561,7 @@ logical_plan physical_plan 01)SortPreservingMergeExec: [result@0 ASC NULLS LAST] 02)--ProjectionExec: expr=[b@1 + a@0 + c@2 as result] -03)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +03)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1, maintains_sort_order=true 04)------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b, c], output_orderings=[[a@0 ASC NULLS LAST], [b@1 ASC NULLS LAST], [c@2 ASC NULLS LAST]], file_type=csv, has_header=true statement ok @@ -592,7 +592,7 @@ logical_plan physical_plan 01)SortPreservingMergeExec: [db15@0 ASC NULLS LAST] 02)--ProjectionExec: expr=[date_bin(IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 900000000000 }, ts@0, 1659537600000000000) as db15] -03)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +03)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1, maintains_sort_order=true 04)------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/timestamps.csv]]}, projection=[ts], output_ordering=[ts@0 ASC NULLS LAST], file_type=csv, has_header=false query TT @@ -607,7 +607,7 @@ logical_plan physical_plan 01)SortPreservingMergeExec: [dt_day@0 ASC NULLS LAST] 02)--ProjectionExec: expr=[date_trunc(DAY, ts@0) as dt_day] -03)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +03)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1, maintains_sort_order=true 04)------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/timestamps.csv]]}, projection=[ts], output_ordering=[ts@0 ASC NULLS LAST], file_type=csv, has_header=false statement ok @@ -650,7 +650,7 @@ logical_plan physical_plan 01)SortPreservingMergeExec: [atan_c11@0 ASC NULLS LAST] 02)--ProjectionExec: expr=[atan(c11@0) as atan_c11] -03)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +03)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1, maintains_sort_order=true 04)------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c11], output_ordering=[c11@0 ASC NULLS LAST], file_type=csv, has_header=true query TT @@ -665,7 +665,7 @@ logical_plan physical_plan 01)SortPreservingMergeExec: [ceil_c11@0 ASC NULLS LAST] 02)--ProjectionExec: expr=[ceil(c11@0) as ceil_c11] -03)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +03)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1, maintains_sort_order=true 04)------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c11], output_ordering=[c11@0 ASC NULLS LAST], file_type=csv, has_header=true query TT @@ -680,7 +680,7 @@ logical_plan physical_plan 01)SortPreservingMergeExec: [log_c11_base_c12@0 ASC NULLS LAST] 02)--ProjectionExec: expr=[log(c12@1, c11@0) as log_c11_base_c12] -03)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +03)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1, maintains_sort_order=true 04)------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c11, c12], output_orderings=[[c11@0 ASC NULLS LAST], [c12@1 DESC NULLS LAST]], file_type=csv, has_header=true query TT @@ -695,7 +695,7 @@ logical_plan physical_plan 01)SortPreservingMergeExec: [log_c12_base_c11@0 DESC NULLS LAST] 02)--ProjectionExec: expr=[log(c11@0, c12@1) as log_c12_base_c11] -03)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +03)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1, maintains_sort_order=true 04)------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c11, c12], output_orderings=[[c11@0 ASC NULLS LAST], [c12@1 DESC NULLS LAST]], file_type=csv, has_header=true statement ok @@ -1143,7 +1143,7 @@ physical_plan 01)SortPreservingMergeExec: [c_str@0 ASC NULLS LAST], fetch=5 02)--SortExec: TopK(fetch=5), expr=[c_str@0 ASC NULLS LAST], preserve_partitioning=[true] 03)----ProjectionExec: expr=[CAST(c@0 AS Utf8View) as c_str] -04)------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 +04)------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1, maintains_sort_order=true 05)--------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[c], output_ordering=[c@0 ASC NULLS LAST], file_type=csv, has_header=true @@ -1173,7 +1173,7 @@ logical_plan physical_plan 01)SortPreservingMergeExec: [c_bigint@0 ASC NULLS LAST], fetch=5 02)--ProjectionExec: expr=[CAST(c@0 AS Int64) as c_bigint] -03)----RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 +03)----RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1, maintains_sort_order=true 04)------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[c], output_ordering=[c@0 ASC NULLS LAST], file_type=csv, has_header=true statement ok @@ -1209,7 +1209,7 @@ physical_plan 01)SortPreservingMergeExec: [abs_c@0 ASC NULLS LAST], fetch=5 02)--SortExec: TopK(fetch=5), expr=[abs_c@0 ASC NULLS LAST], preserve_partitioning=[true] 03)----ProjectionExec: expr=[abs(c@0) as abs_c] -04)------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 +04)------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1, maintains_sort_order=true 05)--------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[c], output_ordering=[c@0 ASC NULLS LAST], file_type=csv, has_header=true statement ok @@ -1243,7 +1243,7 @@ logical_plan physical_plan 01)SortPreservingMergeExec: [abs_c@0 ASC NULLS LAST], fetch=5 02)--ProjectionExec: expr=[abs(c@0) as abs_c] -03)----RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 +03)----RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1, maintains_sort_order=true 04)------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[c], output_ordering=[c@0 ASC NULLS LAST], file_type=csv, has_header=true # Boolean to integer casts preserve the order. @@ -1269,7 +1269,7 @@ logical_plan physical_plan 01)SortPreservingMergeExec: [c@0 ASC NULLS LAST] 02)--ProjectionExec: expr=[CAST(inc_col@0 > desc_col@1 AS Int32) as c] -03)----RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 +03)----RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1, maintains_sort_order=true 04)------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_1.csv]]}, projection=[inc_col, desc_col], output_orderings=[[inc_col@0 ASC NULLS LAST], [desc_col@1 DESC]], file_type=csv, has_header=true # Union a query with the actual data and one with a constant @@ -1292,7 +1292,7 @@ logical_plan 03)----TableScan: ordered_table projection=[a, b] physical_plan 01)ProjectionExec: expr=[a@0 + b@1 as sum1] -02)--RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 +02)--RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1, maintains_sort_order=true 03)----SortExec: TopK(fetch=1), expr=[a@0 ASC NULLS LAST], preserve_partitioning=[false] 04)------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b], file_type=csv, has_header=true @@ -1331,7 +1331,7 @@ logical_plan 03)----TableScan: ordered_table projection=[a, b] physical_plan 01)ProjectionExec: expr=[a@0 + b@1 as sum1] -02)--RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 +02)--RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1, maintains_sort_order=true 03)----SortExec: TopK(fetch=1), expr=[a@0 ASC NULLS LAST], preserve_partitioning=[false] 04)------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b], file_type=csv, has_header=true diff --git a/datafusion/sqllogictest/test_files/qualify.slt b/datafusion/sqllogictest/test_files/qualify.slt index d4de5f9a9a61..8ced344e0f2a 100644 --- a/datafusion/sqllogictest/test_files/qualify.slt +++ b/datafusion/sqllogictest/test_files/qualify.slt @@ -351,7 +351,7 @@ physical_plan 02)--SortExec: expr=[dept@0 ASC NULLS LAST], preserve_partitioning=[true] 03)----ProjectionExec: expr=[dept@0 as dept, sum(users.salary)@1 as s] 04)------FilterExec: rank() ORDER BY [sum(users.salary) DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@2 = 1, projection=[dept@0, sum(users.salary)@1] -05)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +05)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1, maintains_sort_order=true 06)----------BoundedWindowAggExec: wdw=[rank() ORDER BY [sum(users.salary) DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Field { "rank() ORDER BY [sum(users.salary) DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW": UInt64 }, frame: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted] 07)------------SortPreservingMergeExec: [sum(users.salary)@1 DESC] 08)--------------SortExec: expr=[sum(users.salary)@1 DESC], preserve_partitioning=[true] diff --git a/datafusion/sqllogictest/test_files/select.slt b/datafusion/sqllogictest/test_files/select.slt index 21878d36db02..30dbcc978c9b 100644 --- a/datafusion/sqllogictest/test_files/select.slt +++ b/datafusion/sqllogictest/test_files/select.slt @@ -1404,7 +1404,7 @@ logical_plan physical_plan 01)SortPreservingMergeExec: [a@0 ASC NULLS LAST] 02)--ProjectionExec: expr=[a@0 as a, a@0 + b@1 as annotated_data_finite2.a + annotated_data_finite2.b] -03)----RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 +03)----RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1, maintains_sort_order=true 04)------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b], output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST], file_type=csv, has_header=true # since query below doesn't computation @@ -1441,7 +1441,7 @@ logical_plan physical_plan 01)SortPreservingMergeExec: [b@2 ASC NULLS LAST, c@3 ASC NULLS LAST] 02)--FilterExec: a@1 = 0 -03)----RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 +03)----RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1, maintains_sort_order=true 04)------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a, b, c, d], output_ordering=[a@1 ASC NULLS LAST, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST], file_type=csv, has_header=true # source is ordered by a,b,c @@ -1461,7 +1461,7 @@ logical_plan physical_plan 01)SortPreservingMergeExec: [c@3 ASC NULLS LAST] 02)--FilterExec: a@1 = 0 AND b@2 = 0 -03)----RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 +03)----RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1, maintains_sort_order=true 04)------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a, b, c, d], output_ordering=[a@1 ASC NULLS LAST, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST], file_type=csv, has_header=true # source is ordered by a,b,c @@ -1481,7 +1481,7 @@ logical_plan physical_plan 01)SortPreservingMergeExec: [b@2 ASC NULLS LAST, c@3 ASC NULLS LAST] 02)--FilterExec: a@1 = 0 AND b@2 = 0 -03)----RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 +03)----RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1, maintains_sort_order=true 04)------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a, b, c, d], output_ordering=[a@1 ASC NULLS LAST, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST], file_type=csv, has_header=true # source is ordered by a,b,c @@ -1501,7 +1501,7 @@ logical_plan physical_plan 01)SortPreservingMergeExec: [a@1 ASC NULLS LAST, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST] 02)--FilterExec: a@1 = 0 AND b@2 = 0 -03)----RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 +03)----RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1, maintains_sort_order=true 04)------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a, b, c, d], output_ordering=[a@1 ASC NULLS LAST, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST], file_type=csv, has_header=true # source is ordered by a,b,c @@ -1522,7 +1522,7 @@ physical_plan 01)SortPreservingMergeExec: [c@3 ASC NULLS LAST] 02)--SortExec: expr=[c@3 ASC NULLS LAST], preserve_partitioning=[true] 03)----FilterExec: a@1 = 0 OR b@2 = 0 -04)------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 +04)------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1, maintains_sort_order=true 05)--------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a, b, c, d], output_ordering=[a@1 ASC NULLS LAST, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST], file_type=csv, has_header=true # When ordering lost during projection, we shouldn't keep the SortExec. @@ -1569,7 +1569,7 @@ logical_plan physical_plan 01)SortPreservingMergeExec: [CAST(round(CAST(b@2 AS Float64)) AS Int32) ASC NULLS LAST] 02)--FilterExec: CAST(round(CAST(b@2 AS Float64)) AS Int32) = a@1 -03)----RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 +03)----RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1, maintains_sort_order=true 04)------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a, b, c, d], output_ordering=[a@1 ASC NULLS LAST, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST], file_type=csv, has_header=true diff --git a/datafusion/sqllogictest/test_files/sort_merge_join.slt b/datafusion/sqllogictest/test_files/sort_merge_join.slt index aa87026c5cf3..5f9276bdb78e 100644 --- a/datafusion/sqllogictest/test_files/sort_merge_join.slt +++ b/datafusion/sqllogictest/test_files/sort_merge_join.slt @@ -939,4 +939,4 @@ SELECT t2.a, t2.b, t2.c FROM t2 WHERE t2.a > 3 OR t2.a IN (SELECT t3.x FROM t3 WHERE t2.b < 150) ---- -4 101 1001 \ No newline at end of file +4 101 1001 diff --git a/datafusion/sqllogictest/test_files/spark/math/csc.slt b/datafusion/sqllogictest/test_files/spark/math/csc.slt index 5eb9f4447280..837704113da4 100644 --- a/datafusion/sqllogictest/test_files/spark/math/csc.slt +++ b/datafusion/sqllogictest/test_files/spark/math/csc.slt @@ -43,4 +43,4 @@ SELECT csc(a) FROM (VALUES (pi()), (-pi()), (pi()/2) , (arrow_cast('NAN','Float3 8165619676597685 -8165619676597685 1 -NaN \ No newline at end of file +NaN diff --git a/datafusion/sqllogictest/test_files/topk.slt b/datafusion/sqllogictest/test_files/topk.slt index 8a08cc17d417..6c4d7be5ab8a 100644 --- a/datafusion/sqllogictest/test_files/topk.slt +++ b/datafusion/sqllogictest/test_files/topk.slt @@ -383,7 +383,7 @@ physical_plan 02)--SortExec: TopK(fetch=3), expr=[number_plus@0 DESC, number@1 DESC, age@3 ASC NULLS LAST], preserve_partitioning=[true], sort_prefix=[number_plus@0 DESC, number@1 DESC] 03)----ProjectionExec: expr=[__common_expr_1@0 as number_plus, number@1 as number, __common_expr_1@0 as other_number_plus, age@2 as age] 04)------ProjectionExec: expr=[CAST(number@0 AS Int64) + 1 as __common_expr_1, number@0 as number, age@1 as age] -05)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +05)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1, maintains_sort_order=true 06)----------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/topk/partial_sorted/1.parquet]]}, projection=[number, age], output_ordering=[number@0 DESC], file_type=parquet # Cleanup diff --git a/datafusion/sqllogictest/test_files/unnest.slt b/datafusion/sqllogictest/test_files/unnest.slt index e650116a69b5..c4319c665bd0 100644 --- a/datafusion/sqllogictest/test_files/unnest.slt +++ b/datafusion/sqllogictest/test_files/unnest.slt @@ -968,7 +968,7 @@ physical_plan 07)------------ProjectionExec: expr=[generated_id@0 as generated_id, __unnest_placeholder(make_array(range().value),depth=1)@1 as ar] 08)--------------UnnestExec 09)----------------ProjectionExec: expr=[row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING@1 as generated_id, make_array(value@0) as __unnest_placeholder(make_array(range().value))] -10)------------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +10)------------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1, maintains_sort_order=true 11)--------------------BoundedWindowAggExec: wdw=[row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING: Field { "row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING": UInt64 }, frame: ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING], mode=[Sorted] 12)----------------------LazyMemoryExec: partitions=1, batch_generators=[range: start=1, end=5, batch_size=8192] diff --git a/datafusion/sqllogictest/test_files/window.slt b/datafusion/sqllogictest/test_files/window.slt index 2e228472d68c..b7ef74e6c167 100644 --- a/datafusion/sqllogictest/test_files/window.slt +++ b/datafusion/sqllogictest/test_files/window.slt @@ -1927,7 +1927,7 @@ physical_plan 03)----ProjectionExec: expr=[c3@0 as c3, sum(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 DESC NULLS FIRST, aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@2 as sum1, sum(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c3] ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@3 as sum2] 04)------BoundedWindowAggExec: wdw=[sum(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c3] ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Field { "sum(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c3] ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW": nullable UInt64 }, frame: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted] 05)--------CoalesceBatchesExec: target_batch_size=4096 -06)----------RepartitionExec: partitioning=Hash([c3@0], 2), input_partitions=1 +06)----------RepartitionExec: partitioning=Hash([c3@0], 2), input_partitions=1, maintains_sort_order=true 07)------------ProjectionExec: expr=[c3@1 as c3, c9@2 as c9, sum(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 DESC NULLS FIRST, aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@3 as sum(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 DESC NULLS FIRST, aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW] 08)--------------BoundedWindowAggExec: wdw=[sum(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 DESC NULLS FIRST, aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Field { "sum(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 DESC NULLS FIRST, aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW": nullable UInt64 }, frame: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted] 09)----------------SortExec: expr=[c3@1 DESC, c9@2 DESC, c2@0 ASC NULLS LAST], preserve_partitioning=[false] @@ -2144,7 +2144,7 @@ physical_plan 02)--AggregateExec: mode=Final, gby=[], aggr=[array_agg(aggregate_test_100.c13)] 03)----CoalescePartitionsExec 04)------AggregateExec: mode=Partial, gby=[], aggr=[array_agg(aggregate_test_100.c13)] -05)--------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 +05)--------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1, maintains_sort_order=true 06)----------SortExec: TopK(fetch=1), expr=[c13@0 ASC NULLS LAST], preserve_partitioning=[false] 07)------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c13], file_type=csv, has_header=true @@ -3371,7 +3371,7 @@ physical_plan 13)------------------------CoalesceBatchesExec: target_batch_size=4096 14)--------------------------RepartitionExec: partitioning=Hash([a@1, b@2], 2), input_partitions=2, preserve_order=true, sort_exprs=a@1 ASC NULLS LAST, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST, __common_expr_1@0 ASC NULLS LAST 15)----------------------------ProjectionExec: expr=[CAST(a@0 AS Int64) as __common_expr_1, a@0 as a, b@1 as b, c@2 as c, d@3 as d] -16)------------------------------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 +16)------------------------------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1, maintains_sort_order=true 17)--------------------------------StreamingTableExec: partition_sizes=1, projection=[a, b, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST, c@2 ASC NULLS LAST] # reset the partition number 1 again @@ -3716,7 +3716,7 @@ physical_plan 02)--ProjectionExec: expr=[a0@0 as a0, a@1 as a, b@2 as b, c@3 as c, d@4 as d, avg(multiple_ordered_table_inf.d) PARTITION BY [multiple_ordered_table_inf.d] ORDER BY [multiple_ordered_table_inf.a ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND CURRENT ROW@5 as avg_d] 03)----BoundedWindowAggExec: wdw=[avg(multiple_ordered_table_inf.d) PARTITION BY [multiple_ordered_table_inf.d] ORDER BY [multiple_ordered_table_inf.a ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND CURRENT ROW: Field { "avg(multiple_ordered_table_inf.d) PARTITION BY [multiple_ordered_table_inf.d] ORDER BY [multiple_ordered_table_inf.a ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND CURRENT ROW": nullable Float64 }, frame: RANGE BETWEEN 10 PRECEDING AND CURRENT ROW], mode=[Linear] 04)------CoalesceBatchesExec: target_batch_size=4096 -05)--------RepartitionExec: partitioning=Hash([d@4], 2), input_partitions=1 +05)--------RepartitionExec: partitioning=Hash([d@4], 2), input_partitions=1, maintains_sort_order=true 06)----------StreamingTableExec: partition_sizes=1, projection=[a0, a, b, c, d], infinite_source=true, output_orderings=[[a@1 ASC NULLS LAST, b@2 ASC NULLS LAST], [c@3 ASC NULLS LAST]] # CTAS with NTILE function @@ -5576,7 +5576,7 @@ physical_plan 02)--ProjectionExec: expr=[c1@0 as c1, sum(aggregate_test_100_ordered.c9) PARTITION BY [aggregate_test_100_ordered.c1] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING@2 as sum_c9] 03)----WindowAggExec: wdw=[sum(aggregate_test_100_ordered.c9) PARTITION BY [aggregate_test_100_ordered.c1] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING: Ok(Field { name: "sum(aggregate_test_100_ordered.c9) PARTITION BY [aggregate_test_100_ordered.c1] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING", data_type: UInt64, nullable: true }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: Following(UInt64(NULL)), is_causal: false }] 04)------CoalesceBatchesExec: target_batch_size=1 -05)--------RepartitionExec: partitioning=Hash([c1@0], 2), input_partitions=1 +05)--------RepartitionExec: partitioning=Hash([c1@0], 2), input_partitions=1, maintains_sort_order=true 06)----------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c9], output_ordering=[c1@0 ASC NULLS LAST], file_type=csv, has_header=true query TT @@ -5606,7 +5606,7 @@ physical_plan 02)--ProjectionExec: expr=[c1@0 as c1, min(aggregate_test_100_ordered.c5) PARTITION BY [aggregate_test_100_ordered.c1] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING@2 as min_c5] 03)----WindowAggExec: wdw=[min(aggregate_test_100_ordered.c5) PARTITION BY [aggregate_test_100_ordered.c1] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING: Ok(Field { name: "min(aggregate_test_100_ordered.c5) PARTITION BY [aggregate_test_100_ordered.c1] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING", data_type: Int32, nullable: true }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: Following(UInt64(NULL)), is_causal: false }] 04)------CoalesceBatchesExec: target_batch_size=1 -05)--------RepartitionExec: partitioning=Hash([c1@0], 2), input_partitions=1 +05)--------RepartitionExec: partitioning=Hash([c1@0], 2), input_partitions=1, maintains_sort_order=true 06)----------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c5], output_ordering=[c1@0 ASC NULLS LAST], file_type=csv, has_header=true query TT