From 2858e34d3e5348b6963c4ff73808f50361d6b96a Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Fri, 4 Feb 2022 11:32:29 -0500 Subject: [PATCH] Do not repartition sorted inputs `SortPreservingMerge` --- .../src/physical_optimizer/repartition.rs | 35 ++++++++++++++++++- .../sorts/sort_preserving_merge.rs | 6 ++++ 2 files changed, 40 insertions(+), 1 deletion(-) diff --git a/datafusion/src/physical_optimizer/repartition.rs b/datafusion/src/physical_optimizer/repartition.rs index 1f4505324aa3..6f6d67854769 100644 --- a/datafusion/src/physical_optimizer/repartition.rs +++ b/datafusion/src/physical_optimizer/repartition.rs @@ -109,11 +109,12 @@ mod tests { use super::*; use crate::datasource::PartitionedFile; - use crate::physical_plan::expressions::col; + use crate::physical_plan::expressions::{col, PhysicalSortExpr}; use crate::physical_plan::file_format::{FileScanConfig, ParquetExec}; use crate::physical_plan::filter::FilterExec; use crate::physical_plan::hash_aggregate::{AggregateMode, HashAggregateExec}; use crate::physical_plan::limit::{GlobalLimitExec, LocalLimitExec}; + use crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec; use crate::physical_plan::union::UnionExec; use crate::physical_plan::{displayable, Statistics}; use crate::test::object_store::TestObjectStore; @@ -137,6 +138,17 @@ mod tests { )) } + fn sort_preserving_merge_exec( + input: Arc, + ) -> Arc { + let expr = vec![PhysicalSortExpr { + expr: col("c1", &schema()).unwrap(), + options: arrow::compute::SortOptions::default(), + }]; + + Arc::new(SortPreservingMergeExec::new(expr, input)) + } + fn filter_exec(input: Arc) -> Arc { Arc::new(FilterExec::try_new(col("c1", &schema()).unwrap(), input).unwrap()) } @@ -276,4 +288,25 @@ mod tests { assert_eq!(&trim_plan_display(&plan), &expected); Ok(()) } + + #[test] + fn repartition_ignores_sort_preserving_merge() -> Result<()> { + let optimizer = Repartition {}; + + let optimized = optimizer.optimize( + sort_preserving_merge_exec(parquet_exec()), + &ExecutionConfig::new().with_target_partitions(5), + )?; + + let plan = displayable(optimized.as_ref()).indent().to_string(); + + let expected = &[ + "SortPreservingMergeExec: [c1@0 ASC]", + // Expect no repartition of SortPreservingMergeExec + "ParquetExec: limit=None, partitions=[x]", + ]; + + assert_eq!(&trim_plan_display(&plan), &expected); + Ok(()) + } } diff --git a/datafusion/src/physical_plan/sorts/sort_preserving_merge.rs b/datafusion/src/physical_plan/sorts/sort_preserving_merge.rs index ddc9ff1f9e47..d015e0c51f39 100644 --- a/datafusion/src/physical_plan/sorts/sort_preserving_merge.rs +++ b/datafusion/src/physical_plan/sorts/sort_preserving_merge.rs @@ -128,6 +128,12 @@ impl ExecutionPlan for SortPreservingMergeExec { Distribution::UnspecifiedDistribution } + fn should_repartition_children(&self) -> bool { + // if the children are repartitioned they may no longer remain + // sorted + false + } + fn children(&self) -> Vec> { vec![self.input.clone()] }