Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions datafusion/core/tests/dataframe/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2869,7 +2869,7 @@ async fn test_count_wildcard_on_sort() -> Result<()> {

assert_snapshot!(
pretty_format_batches(&sql_results).unwrap(),
@r###"
@r"
+---------------+------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+------------------------------------------------------------------------------------------------------------+
Expand All @@ -2889,12 +2889,12 @@ async fn test_count_wildcard_on_sort() -> Result<()> {
| | DataSourceExec: partitions=1, partition_sizes=[1] |
| | |
+---------------+------------------------------------------------------------------------------------------------------------+
"###
"
);

assert_snapshot!(
pretty_format_batches(&df_results).unwrap(),
@r###"
@r"
+---------------+----------------------------------------------------------------------------+
| plan_type | plan |
+---------------+----------------------------------------------------------------------------+
Expand All @@ -2910,7 +2910,7 @@ async fn test_count_wildcard_on_sort() -> Result<()> {
| | DataSourceExec: partitions=1, partition_sizes=[1] |
| | |
+---------------+----------------------------------------------------------------------------+
"###
"
);
Ok(())
}
Expand Down
212 changes: 106 additions & 106 deletions datafusion/core/tests/physical_optimizer/enforce_distribution.rs

Large diffs are not rendered by default.

28 changes: 14 additions & 14 deletions datafusion/core/tests/physical_optimizer/enforce_sorting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1631,13 +1631,13 @@ async fn test_with_lost_ordering_unbounded() -> Result<()> {
SortExec: expr=[a@0 ASC], preserve_partitioning=[false]
CoalescePartitionsExec
RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10
RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1
RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1, maintains_sort_order=true
StreamingTableExec: partition_sizes=1, projection=[a, b, c, d, e], infinite_source=true, output_ordering=[a@0 ASC]

Optimized Plan:
SortPreservingMergeExec: [a@0 ASC]
RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10, preserve_order=true, sort_exprs=a@0 ASC
RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1
RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1, maintains_sort_order=true
StreamingTableExec: partition_sizes=1, projection=[a, b, c, d, e], infinite_source=true, output_ordering=[a@0 ASC]
");

Expand All @@ -1649,13 +1649,13 @@ async fn test_with_lost_ordering_unbounded() -> Result<()> {
SortExec: expr=[a@0 ASC], preserve_partitioning=[false]
CoalescePartitionsExec
RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10
RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1
RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1, maintains_sort_order=true
StreamingTableExec: partition_sizes=1, projection=[a, b, c, d, e], infinite_source=true, output_ordering=[a@0 ASC]

Optimized Plan:
SortPreservingMergeExec: [a@0 ASC]
RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10, preserve_order=true, sort_exprs=a@0 ASC
RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1
RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1, maintains_sort_order=true
StreamingTableExec: partition_sizes=1, projection=[a, b, c, d, e], infinite_source=true, output_ordering=[a@0 ASC]
");

Expand All @@ -1674,7 +1674,7 @@ async fn test_with_lost_ordering_bounded() -> Result<()> {
SortExec: expr=[a@0 ASC], preserve_partitioning=[false]
CoalescePartitionsExec
RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10
RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1
RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1, maintains_sort_order=true
DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=csv, has_header=false
");

Expand All @@ -1686,14 +1686,14 @@ async fn test_with_lost_ordering_bounded() -> Result<()> {
SortExec: expr=[a@0 ASC], preserve_partitioning=[false]
CoalescePartitionsExec
RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10
RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1
RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1, maintains_sort_order=true
DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=csv, has_header=false

Optimized Plan:
SortPreservingMergeExec: [a@0 ASC]
SortExec: expr=[a@0 ASC], preserve_partitioning=[true]
RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10
RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1
RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1, maintains_sort_order=true
DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=csv, has_header=false
");

Expand All @@ -1715,7 +1715,7 @@ async fn test_do_not_pushdown_through_spm() -> Result<()> {
Input / Optimized Plan:
SortExec: expr=[b@1 ASC], preserve_partitioning=[false]
SortPreservingMergeExec: [a@0 ASC, b@1 ASC]
RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1
RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1, maintains_sort_order=true
DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC, b@1 ASC], file_type=csv, has_header=false
");

Expand Down Expand Up @@ -1744,13 +1744,13 @@ async fn test_pushdown_through_spm() -> Result<()> {
Input Plan:
SortExec: expr=[a@0 ASC, b@1 ASC, c@2 ASC], preserve_partitioning=[false]
SortPreservingMergeExec: [a@0 ASC, b@1 ASC]
RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1
RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1, maintains_sort_order=true
DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC, b@1 ASC], file_type=csv, has_header=false

Optimized Plan:
SortPreservingMergeExec: [a@0 ASC, b@1 ASC]
SortExec: expr=[a@0 ASC, b@1 ASC, c@2 ASC], preserve_partitioning=[true]
RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1
RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1, maintains_sort_order=true
DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC, b@1 ASC], file_type=csv, has_header=false
");
Ok(())
Expand All @@ -1774,7 +1774,7 @@ async fn test_window_multi_layer_requirement() -> Result<()> {
BoundedWindowAggExec: wdw=[count: Field { "count": Int64 }, frame: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted]
SortPreservingMergeExec: [a@0 ASC, b@1 ASC]
RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=10, preserve_order=true, sort_exprs=a@0 ASC, b@1 ASC
RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1
RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1, maintains_sort_order=true
SortExec: expr=[a@0 ASC, b@1 ASC], preserve_partitioning=[false]
DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false

Expand Down Expand Up @@ -1969,7 +1969,7 @@ async fn test_remove_unnecessary_sort2() -> Result<()> {
assert_snapshot!(test.run(), @r"
Input Plan:
RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=10
RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1
RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1, maintains_sort_order=true
SortExec: expr=[nullable_col@0 ASC], preserve_partitioning=[false]
SortPreservingMergeExec: [nullable_col@0 ASC, non_nullable_col@1 ASC]
SortExec: expr=[nullable_col@0 ASC, non_nullable_col@1 ASC], preserve_partitioning=[false]
Expand Down Expand Up @@ -2016,7 +2016,7 @@ async fn test_remove_unnecessary_sort3() -> Result<()> {
AggregateExec: mode=Final, gby=[], aggr=[]
SortPreservingMergeExec: [nullable_col@0 ASC, non_nullable_col@1 ASC]
SortExec: expr=[nullable_col@0 ASC, non_nullable_col@1 ASC], preserve_partitioning=[true]
RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1
RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1, maintains_sort_order=true
SortPreservingMergeExec: [non_nullable_col@1 ASC]
SortExec: expr=[non_nullable_col@1 ASC], preserve_partitioning=[false]
DataSourceExec: partitions=1, partition_sizes=[0]
Expand Down Expand Up @@ -2365,7 +2365,7 @@ async fn test_commutativity() -> Result<()> {

assert_snapshot!(displayable(orig_plan.as_ref()).indent(true), @r#"
SortExec: expr=[nullable_col@0 ASC], preserve_partitioning=[false]
RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1
RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1, maintains_sort_order=true
BoundedWindowAggExec: wdw=[count: Field { "count": Int64 }, frame: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted]
DataSourceExec: partitions=1, partition_sizes=[0]
"#);
Expand Down
Loading