From 2858e34d3e5348b6963c4ff73808f50361d6b96a Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Fri, 4 Feb 2022 11:32:29 -0500 Subject: [PATCH 01/11] Do not repartition sorted inputs `SortPreservingMerge` --- .../src/physical_optimizer/repartition.rs | 35 ++++++++++++++++++- .../sorts/sort_preserving_merge.rs | 6 ++++ 2 files changed, 40 insertions(+), 1 deletion(-) diff --git a/datafusion/src/physical_optimizer/repartition.rs b/datafusion/src/physical_optimizer/repartition.rs index 1f4505324aa3..6f6d67854769 100644 --- a/datafusion/src/physical_optimizer/repartition.rs +++ b/datafusion/src/physical_optimizer/repartition.rs @@ -109,11 +109,12 @@ mod tests { use super::*; use crate::datasource::PartitionedFile; - use crate::physical_plan::expressions::col; + use crate::physical_plan::expressions::{col, PhysicalSortExpr}; use crate::physical_plan::file_format::{FileScanConfig, ParquetExec}; use crate::physical_plan::filter::FilterExec; use crate::physical_plan::hash_aggregate::{AggregateMode, HashAggregateExec}; use crate::physical_plan::limit::{GlobalLimitExec, LocalLimitExec}; + use crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec; use crate::physical_plan::union::UnionExec; use crate::physical_plan::{displayable, Statistics}; use crate::test::object_store::TestObjectStore; @@ -137,6 +138,17 @@ mod tests { )) } + fn sort_preserving_merge_exec( + input: Arc, + ) -> Arc { + let expr = vec![PhysicalSortExpr { + expr: col("c1", &schema()).unwrap(), + options: arrow::compute::SortOptions::default(), + }]; + + Arc::new(SortPreservingMergeExec::new(expr, input)) + } + fn filter_exec(input: Arc) -> Arc { Arc::new(FilterExec::try_new(col("c1", &schema()).unwrap(), input).unwrap()) } @@ -276,4 +288,25 @@ mod tests { assert_eq!(&trim_plan_display(&plan), &expected); Ok(()) } + + #[test] + fn repartition_ignores_sort_preserving_merge() -> Result<()> { + let optimizer = Repartition {}; + + let optimized = optimizer.optimize( + sort_preserving_merge_exec(parquet_exec()), + &ExecutionConfig::new().with_target_partitions(5), + )?; + + let plan = displayable(optimized.as_ref()).indent().to_string(); + + let expected = &[ + "SortPreservingMergeExec: [c1@0 ASC]", + // Expect no repartition of SortPreservingMergeExec + "ParquetExec: limit=None, partitions=[x]", + ]; + + assert_eq!(&trim_plan_display(&plan), &expected); + Ok(()) + } } diff --git a/datafusion/src/physical_plan/sorts/sort_preserving_merge.rs b/datafusion/src/physical_plan/sorts/sort_preserving_merge.rs index ddc9ff1f9e47..d015e0c51f39 100644 --- a/datafusion/src/physical_plan/sorts/sort_preserving_merge.rs +++ b/datafusion/src/physical_plan/sorts/sort_preserving_merge.rs @@ -128,6 +128,12 @@ impl ExecutionPlan for SortPreservingMergeExec { Distribution::UnspecifiedDistribution } + fn should_repartition_children(&self) -> bool { + // if the children are repartitioned they may no longer remain + // sorted + false + } + fn children(&self) -> Vec> { vec![self.input.clone()] } From 463c04891eff85d26d63d19a1a5aeed74b00b2f9 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Mon, 7 Feb 2022 12:12:16 -0500 Subject: [PATCH 02/11] Add notion of sortedness to `ExecutionPlan`, use to avoid repartitioning when that would result in incorrect behavior --- .../src/physical_optimizer/repartition.rs | 361 +++++++++++++++--- datafusion/src/physical_plan/analyze.rs | 5 + .../src/physical_plan/coalesce_batches.rs | 5 + .../src/physical_plan/coalesce_partitions.rs | 5 + datafusion/src/physical_plan/cross_join.rs | 5 + datafusion/src/physical_plan/empty.rs | 5 + datafusion/src/physical_plan/explain.rs | 6 +- .../src/physical_plan/file_format/avro.rs | 5 + .../src/physical_plan/file_format/csv.rs | 5 + .../src/physical_plan/file_format/json.rs | 5 + .../src/physical_plan/file_format/parquet.rs | 5 + datafusion/src/physical_plan/filter.rs | 10 + .../src/physical_plan/hash_aggregate.rs | 9 + datafusion/src/physical_plan/hash_join.rs | 5 + datafusion/src/physical_plan/limit.rs | 40 +- datafusion/src/physical_plan/memory.rs | 5 + datafusion/src/physical_plan/mod.rs | 55 ++- datafusion/src/physical_plan/planner.rs | 4 + datafusion/src/physical_plan/projection.rs | 11 +- datafusion/src/physical_plan/repartition.rs | 5 + datafusion/src/physical_plan/sorts/sort.rs | 8 + .../sorts/sort_preserving_merge.rs | 10 +- datafusion/src/physical_plan/union.rs | 7 +- datafusion/src/physical_plan/values.rs | 5 + .../physical_plan/windows/window_agg_exec.rs | 9 + datafusion/src/test/exec.rs | 24 +- datafusion/tests/custom_sources.rs | 4 + datafusion/tests/provider_filter_pushdown.rs | 5 + datafusion/tests/statistics.rs | 9 +- datafusion/tests/user_defined_plan.rs | 5 + 30 files changed, 565 insertions(+), 77 deletions(-) diff --git a/datafusion/src/physical_optimizer/repartition.rs b/datafusion/src/physical_optimizer/repartition.rs index 6f6d67854769..30f4c5b0e038 100644 --- a/datafusion/src/physical_optimizer/repartition.rs +++ b/datafusion/src/physical_optimizer/repartition.rs @@ -19,13 +19,105 @@ use std::sync::Arc; use super::optimizer::PhysicalOptimizerRule; -use crate::physical_plan::Partitioning::*; use crate::physical_plan::{ empty::EmptyExec, repartition::RepartitionExec, ExecutionPlan, }; +use crate::physical_plan::{Distribution, Partitioning::*}; use crate::{error::Result, execution::context::ExecutionConfig}; -/// Optimizer that introduces repartition to introduce more parallelism in the plan +/// Optimizer that introduces repartition to introduce more +/// parallelism in the plan +/// +/// For example, given an input such as: +/// +/// +/// ```text +/// ┌─────────────────────────────────┐ +/// │ │ +/// │ ExecutionPlan │ +/// │ │ +/// └─────────────────────────────────┘ +/// ▲ ▲ +/// │ │ +/// ┌─────┘ └─────┐ +/// │ │ +/// │ │ +/// │ │ +/// ┌───────────┐ ┌───────────┐ +/// │ │ │ │ +/// │ batch A1 │ │ batch B1 │ +/// │ │ │ │ +/// ├───────────┤ ├───────────┤ +/// │ │ │ │ +/// │ batch A2 │ │ batch B2 │ +/// │ │ │ │ +/// ├───────────┤ ├───────────┤ +/// │ │ │ │ +/// │ batch A3 │ │ batch B3 │ +/// │ │ │ │ +/// └───────────┘ └───────────┘ +/// +/// Input Input +/// A B +/// ``` +/// +/// This optimizer will attempt to add a `RepartitionExec` to increase +/// the parallism (to 3 in this case) +/// +/// ```text +/// ┌─────────────────────────────────┐ +/// │ │ +/// │ ExecutionPlan │ +/// │ │ +/// └─────────────────────────────────┘ +/// ▲ ▲ ▲ Input now has 3 +/// │ │ │ partitions +/// ┌───────┘ │ └───────┐ +/// │ │ │ +/// │ │ │ +/// ┌───────────┐ ┌───────────┐ ┌───────────┐ +/// │ │ │ │ │ │ +/// │ batch A1 │ │ batch A3 │ │ batch B3 │ +/// │ │ │ │ │ │ +/// ├───────────┤ ├───────────┤ ├───────────┤ +/// │ │ │ │ │ │ +/// │ batch B2 │ │ batch B1 │ │ batch A2 │ +/// │ │ │ │ │ │ +/// └───────────┘ └───────────┘ └───────────┘ +/// ▲ ▲ ▲ +/// │ │ │ +/// └─────────┐ │ ┌──────────┘ +/// │ │ │ +/// │ │ │ +/// ┌─────────────────────────────────┐ batches are +/// │ RepartitionExec(3) │ repartitioned +/// │ RoundRobin │ +/// │ │ +/// └─────────────────────────────────┘ +/// ▲ ▲ +/// │ │ +/// ┌─────┘ └─────┐ +/// │ │ +/// │ │ +/// │ │ +/// ┌───────────┐ ┌───────────┐ +/// │ │ │ │ +/// │ batch A1 │ │ batch B1 │ +/// │ │ │ │ +/// ├───────────┤ ├───────────┤ +/// │ │ │ │ +/// │ batch A2 │ │ batch B2 │ +/// │ │ │ │ +/// ├───────────┤ ├───────────┤ +/// │ │ │ │ +/// │ batch A3 │ │ batch B3 │ +/// │ │ │ │ +/// └───────────┘ └───────────┘ +/// +/// +/// Input Input +/// A B +/// ``` #[derive(Default)] pub struct Repartition {} @@ -36,18 +128,53 @@ impl Repartition { } } +/// Recursively visits all `plan`s puts and then optionally adds a +/// `RepartitionExec` at the output of `plan` to match +/// `target_partitions` +/// +/// if `can_reorder` is false, means that the output of this node +/// can not be reordered as as something upstream is relying on that order +/// +/// If 'would_benefit` is false, the upstream operator doesn't +/// benefit from additional reordering +/// fn optimize_partitions( target_partitions: usize, plan: Arc, - should_repartition: bool, + can_reorder: bool, + would_benefit: bool, ) -> Result> { - // Recurse into children bottom-up (added nodes should be as deep as possible) + // Recurse into children bottom-up (attempt to repartition as + // early as possible) let new_plan = if plan.children().is_empty() { // leaf node - don't replace children plan.clone() } else { - let should_repartition_children = plan.should_repartition_children(); + let can_reorder_children = + match (plan.relies_on_input_order(), plan.maintains_input_order()) { + (true, _) => { + // `plan` itself relies on the order of its + // children, so don't reorder them! + false + } + (false, false) => { + // `plan` may reorder the input itself, so no need + // to preserve the order of any children + true + } + (false, true) => { + // `plan` will maintain the order, so we can only + // repartition children if it is ok to reorder the + // output of this node + let requires_single_partition = matches!( + plan.required_child_distribution(), + Distribution::SinglePartition + ); + can_reorder && !requires_single_partition + } + }; + let children = plan .children() .iter() @@ -55,14 +182,16 @@ fn optimize_partitions( optimize_partitions( target_partitions, child.clone(), - should_repartition_children, + can_reorder_children, + plan.benefits_from_input_partitioning(), ) }) .collect::>()?; plan.with_new_children(children)? }; - let perform_repartition = match new_plan.output_partitioning() { + // decide if we should bother trying to repartition the output of this plan + let could_repartition = match new_plan.output_partitioning() { // Apply when underlying node has less than `self.target_partitions` amount of concurrency RoundRobinBatch(x) => x < target_partitions, UnknownPartitioning(x) => x < target_partitions, @@ -75,7 +204,7 @@ fn optimize_partitions( // But also not very useful to inlude let is_empty_exec = plan.as_any().downcast_ref::().is_some(); - if perform_repartition && should_repartition && !is_empty_exec { + if would_benefit && could_repartition && can_reorder && !is_empty_exec { Ok(Arc::new(RepartitionExec::try_new( new_plan, RoundRobinBatch(target_partitions), @@ -95,7 +224,7 @@ impl PhysicalOptimizerRule for Repartition { if config.target_partitions == 1 { Ok(plan) } else { - optimize_partitions(config.target_partitions, plan, false) + optimize_partitions(config.target_partitions, plan, false, false) } } @@ -105,6 +234,7 @@ impl PhysicalOptimizerRule for Repartition { } #[cfg(test)] mod tests { + use arrow::compute::SortOptions; use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; use super::*; @@ -114,6 +244,8 @@ mod tests { use crate::physical_plan::filter::FilterExec; use crate::physical_plan::hash_aggregate::{AggregateMode, HashAggregateExec}; use crate::physical_plan::limit::{GlobalLimitExec, LocalLimitExec}; + use crate::physical_plan::projection::ProjectionExec; + use crate::physical_plan::sorts::sort::SortExec; use crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec; use crate::physical_plan::union::UnionExec; use crate::physical_plan::{displayable, Statistics}; @@ -153,6 +285,19 @@ mod tests { Arc::new(FilterExec::try_new(col("c1", &schema()).unwrap(), input).unwrap()) } + fn sort_exec(input: Arc) -> Arc { + let sort_exprs = vec![PhysicalSortExpr { + expr: col("c1", &schema()).unwrap(), + options: SortOptions::default(), + }]; + Arc::new(SortExec::try_new(sort_exprs, input).unwrap()) + } + + fn projection_exec(input: Arc) -> Arc { + let exprs = vec![(col("c1", &schema()).unwrap(), "c1".to_string())]; + Arc::new(ProjectionExec::try_new(exprs, input).unwrap()) + } + fn hash_aggregate(input: Arc) -> Arc { let schema = schema(); Arc::new( @@ -190,38 +335,46 @@ mod tests { .collect() } + /// Runs the repartition optimizer and asserts the plan against the expected + macro_rules! assert_optimized { + ($EXPECTED_LINES: expr, $PLAN: expr) => { + let expected_lines: Vec<&str> = $EXPECTED_LINES.iter().map(|s| *s).collect(); + + // run optimizer + let optimizer = Repartition {}; + let optimized = optimizer + .optimize($PLAN, &ExecutionConfig::new().with_target_partitions(10))?; + + // Now format correctly + let plan = displayable(optimized.as_ref()).indent().to_string(); + let actual_lines = trim_plan_display(&plan); + + assert_eq!( + &expected_lines, &actual_lines, + "\n\nexpected:\n\n{:#?}\nactual:\n\n{:#?}\n\n", + expected_lines, actual_lines + ); + }; + } + #[test] fn added_repartition_to_single_partition() -> Result<()> { - let optimizer = Repartition {}; - - let optimized = optimizer.optimize( - hash_aggregate(parquet_exec()), - &ExecutionConfig::new().with_target_partitions(10), - )?; + let plan = hash_aggregate(parquet_exec()); - let plan = displayable(optimized.as_ref()).indent().to_string(); - - let expected = &[ + let expected = [ "HashAggregateExec: mode=Final, gby=[], aggr=[]", "HashAggregateExec: mode=Partial, gby=[], aggr=[]", "RepartitionExec: partitioning=RoundRobinBatch(10)", "ParquetExec: limit=None, partitions=[x]", ]; - assert_eq!(&trim_plan_display(&plan), &expected); + assert_optimized!(expected, plan); Ok(()) } #[test] fn repartition_deepest_node() -> Result<()> { - let optimizer = Repartition {}; - - let optimized = optimizer.optimize( - hash_aggregate(filter_exec(parquet_exec())), - &ExecutionConfig::new().with_target_partitions(10), - )?; - - let plan = displayable(optimized.as_ref()).indent().to_string(); + let plan = hash_aggregate(filter_exec(parquet_exec())); let expected = &[ "HashAggregateExec: mode=Final, gby=[], aggr=[]", @@ -231,20 +384,64 @@ mod tests { "ParquetExec: limit=None, partitions=[x]", ]; - assert_eq!(&trim_plan_display(&plan), &expected); + assert_optimized!(expected, plan); Ok(()) } #[test] - fn repartition_ignores_limit() -> Result<()> { - let optimizer = Repartition {}; + fn repartition_unsorted_limit() -> Result<()> { + let plan = limit_exec(filter_exec(parquet_exec())); - let optimized = optimizer.optimize( - hash_aggregate(limit_exec(filter_exec(limit_exec(parquet_exec())))), - &ExecutionConfig::new().with_target_partitions(10), - )?; + let expected = &[ + "GlobalLimitExec: limit=100", + "LocalLimitExec: limit=100", + "FilterExec: c1@0", + // nothing sorts the data, so the local limit doesn't require sorted data either + "RepartitionExec: partitioning=RoundRobinBatch(10)", + "ParquetExec: limit=None, partitions=[x]", + ]; - let plan = displayable(optimized.as_ref()).indent().to_string(); + assert_optimized!(expected, plan); + Ok(()) + } + + #[test] + fn repartition_sorted_limit() -> Result<()> { + let plan = limit_exec(sort_exec(parquet_exec())); + + let expected = &[ + "GlobalLimitExec: limit=100", + "LocalLimitExec: limit=100", + // data is sorted so can't repartition here + "SortExec: [c1@0 ASC]", + "ParquetExec: limit=None, partitions=[x]", + ]; + + assert_optimized!(expected, plan); + Ok(()) + } + + #[test] + fn repartition_sorted_limit_with_filter() -> Result<()> { + let plan = limit_exec(filter_exec(sort_exec(parquet_exec()))); + + let expected = &[ + "GlobalLimitExec: limit=100", + "LocalLimitExec: limit=100", + "FilterExec: c1@0", + // data is sorted so can't repartition here even though + // filter would benefit from parallelism, the answers might be wrong + "SortExec: [c1@0 ASC]", + "ParquetExec: limit=None, partitions=[x]", + ]; + + assert_optimized!(expected, plan); + Ok(()) + } + + #[test] + fn repartition_ignores_limit() -> Result<()> { + let plan = hash_aggregate(limit_exec(filter_exec(limit_exec(parquet_exec())))); let expected = &[ "HashAggregateExec: mode=Final, gby=[], aggr=[]", @@ -253,6 +450,7 @@ mod tests { "GlobalLimitExec: limit=100", "LocalLimitExec: limit=100", "FilterExec: c1@0", + // repartition should happen prior to the filter to maximize parallelism "RepartitionExec: partitioning=RoundRobinBatch(10)", "GlobalLimitExec: limit=100", "LocalLimitExec: limit=100", @@ -260,20 +458,15 @@ mod tests { "ParquetExec: limit=None, partitions=[x]", ]; - assert_eq!(&trim_plan_display(&plan), &expected); + assert_optimized!(expected, plan); Ok(()) } + // repartition works differently for limit when there is a sort below it + #[test] fn repartition_ignores_union() -> Result<()> { - let optimizer = Repartition {}; - - let optimized = optimizer.optimize( - Arc::new(UnionExec::new(vec![parquet_exec(); 5])), - &ExecutionConfig::new().with_target_partitions(5), - )?; - - let plan = displayable(optimized.as_ref()).indent().to_string(); + let plan = Arc::new(UnionExec::new(vec![parquet_exec(); 5])); let expected = &[ "UnionExec", @@ -285,28 +478,92 @@ mod tests { "ParquetExec: limit=None, partitions=[x]", ]; - assert_eq!(&trim_plan_display(&plan), &expected); + assert_optimized!(expected, plan); Ok(()) } #[test] fn repartition_ignores_sort_preserving_merge() -> Result<()> { - let optimizer = Repartition {}; + let plan = sort_preserving_merge_exec(parquet_exec()); + + let expected = &[ + "SortPreservingMergeExec: [c1@0 ASC]", + // Expect no repartition of SortPreservingMergeExec + "ParquetExec: limit=None, partitions=[x]", + ]; - let optimized = optimizer.optimize( - sort_preserving_merge_exec(parquet_exec()), - &ExecutionConfig::new().with_target_partitions(5), - )?; + assert_optimized!(expected, plan); + Ok(()) + } - let plan = displayable(optimized.as_ref()).indent().to_string(); + #[test] + fn repartition_does_not_repartition_transitively() -> Result<()> { + let plan = sort_preserving_merge_exec(projection_exec(parquet_exec())); let expected = &[ "SortPreservingMergeExec: [c1@0 ASC]", // Expect no repartition of SortPreservingMergeExec + // even though there is a projection exec between it + "ProjectionExec: expr=[c1@0 as c1]", + "ParquetExec: limit=None, partitions=[x]", + ]; + + assert_optimized!(expected, plan); + Ok(()) + } + + #[test] + fn repartition_transitively_past_sort_with_projection() -> Result<()> { + let plan = sort_preserving_merge_exec(sort_exec(projection_exec(parquet_exec()))); + + let expected = &[ + "SortPreservingMergeExec: [c1@0 ASC]", + // Expect repartition on the input to the sort (as it can benefit from additional parallelism) + "SortExec: [c1@0 ASC]", + "ProjectionExec: expr=[c1@0 as c1]", + "RepartitionExec: partitioning=RoundRobinBatch(10)", + "ParquetExec: limit=None, partitions=[x]", + ]; + + assert_optimized!(expected, plan); + Ok(()) + } + + #[test] + fn repartition_transitively_past_sort_with_filter() -> Result<()> { + let plan = sort_preserving_merge_exec(sort_exec(filter_exec(parquet_exec()))); + + let expected = &[ + "SortPreservingMergeExec: [c1@0 ASC]", + // Expect repartition on the input to the sort (as it can benefit from additional parallelism) + "SortExec: [c1@0 ASC]", + "FilterExec: c1@0", + "RepartitionExec: partitioning=RoundRobinBatch(10)", + "ParquetExec: limit=None, partitions=[x]", + ]; + + assert_optimized!(expected, plan); + Ok(()) + } + + #[test] + fn repartition_transitively_past_sort_with_projection_and_filter() -> Result<()> { + let plan = sort_preserving_merge_exec(sort_exec(projection_exec(filter_exec( + parquet_exec(), + )))); + + let expected = &[ + "SortPreservingMergeExec: [c1@0 ASC]", + // Expect repartition on the input to the sort (as it can benefit from additional parallelism) + "SortExec: [c1@0 ASC]", + "ProjectionExec: expr=[c1@0 as c1]", + "FilterExec: c1@0", + // repartition is lowest down + "RepartitionExec: partitioning=RoundRobinBatch(10)", "ParquetExec: limit=None, partitions=[x]", ]; - assert_eq!(&trim_plan_display(&plan), &expected); + assert_optimized!(expected, plan); Ok(()) } } diff --git a/datafusion/src/physical_plan/analyze.rs b/datafusion/src/physical_plan/analyze.rs index 0a810b915945..a704bacf6f79 100644 --- a/datafusion/src/physical_plan/analyze.rs +++ b/datafusion/src/physical_plan/analyze.rs @@ -30,6 +30,7 @@ use crate::{ use arrow::{array::StringBuilder, datatypes::SchemaRef, record_batch::RecordBatch}; use futures::StreamExt; +use super::expressions::PhysicalSortExpr; use super::{stream::RecordBatchReceiverStream, Distribution, SendableRecordBatchStream}; use crate::execution::runtime_env::RuntimeEnv; use async_trait::async_trait; @@ -82,6 +83,10 @@ impl ExecutionPlan for AnalyzeExec { Partitioning::UnknownPartitioning(1) } + fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { + None + } + fn with_new_children( &self, mut children: Vec>, diff --git a/datafusion/src/physical_plan/coalesce_batches.rs b/datafusion/src/physical_plan/coalesce_batches.rs index ec238ad68cf8..746c74a90214 100644 --- a/datafusion/src/physical_plan/coalesce_batches.rs +++ b/datafusion/src/physical_plan/coalesce_batches.rs @@ -38,6 +38,7 @@ use async_trait::async_trait; use futures::stream::{Stream, StreamExt}; use log::debug; +use super::expressions::PhysicalSortExpr; use super::metrics::{BaselineMetrics, MetricsSet}; use super::{metrics::ExecutionPlanMetricsSet, Statistics}; @@ -97,6 +98,10 @@ impl ExecutionPlan for CoalesceBatchesExec { self.input.output_partitioning() } + fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { + None + } + fn with_new_children( &self, children: Vec>, diff --git a/datafusion/src/physical_plan/coalesce_partitions.rs b/datafusion/src/physical_plan/coalesce_partitions.rs index 2e3a5521d6ca..f5afcdb738e5 100644 --- a/datafusion/src/physical_plan/coalesce_partitions.rs +++ b/datafusion/src/physical_plan/coalesce_partitions.rs @@ -31,6 +31,7 @@ use arrow::record_batch::RecordBatch; use arrow::{datatypes::SchemaRef, error::Result as ArrowResult}; use super::common::AbortOnDropMany; +use super::expressions::PhysicalSortExpr; use super::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet}; use super::{RecordBatchStream, Statistics}; use crate::error::{DataFusionError, Result}; @@ -86,6 +87,10 @@ impl ExecutionPlan for CoalescePartitionsExec { Partitioning::UnknownPartitioning(1) } + fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { + None + } + fn with_new_children( &self, children: Vec>, diff --git a/datafusion/src/physical_plan/cross_join.rs b/datafusion/src/physical_plan/cross_join.rs index e4369c180c85..7e02dd67f625 100644 --- a/datafusion/src/physical_plan/cross_join.rs +++ b/datafusion/src/physical_plan/cross_join.rs @@ -27,6 +27,7 @@ use arrow::record_batch::RecordBatch; use futures::{Stream, TryStreamExt}; +use super::expressions::PhysicalSortExpr; use super::{ coalesce_partitions::CoalescePartitionsExec, join_utils::check_join_is_valid, ColumnStatistics, Statistics, @@ -137,6 +138,10 @@ impl ExecutionPlan for CrossJoinExec { self.right.output_partitioning() } + fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { + None + } + async fn execute( &self, partition: usize, diff --git a/datafusion/src/physical_plan/empty.rs b/datafusion/src/physical_plan/empty.rs index 33a09d97bbe8..9b1c71c0e0a8 100644 --- a/datafusion/src/physical_plan/empty.rs +++ b/datafusion/src/physical_plan/empty.rs @@ -28,6 +28,7 @@ use arrow::array::NullArray; use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; use arrow::record_batch::RecordBatch; +use super::expressions::PhysicalSortExpr; use super::{common, SendableRecordBatchStream, Statistics}; use crate::execution::runtime_env::RuntimeEnv; @@ -98,6 +99,10 @@ impl ExecutionPlan for EmptyExec { Partitioning::UnknownPartitioning(1) } + fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { + None + } + fn with_new_children( &self, children: Vec>, diff --git a/datafusion/src/physical_plan/explain.rs b/datafusion/src/physical_plan/explain.rs index eb18926f9466..5d7cb3603aef 100644 --- a/datafusion/src/physical_plan/explain.rs +++ b/datafusion/src/physical_plan/explain.rs @@ -30,7 +30,7 @@ use crate::{ }; use arrow::{array::StringBuilder, datatypes::SchemaRef, record_batch::RecordBatch}; -use super::SendableRecordBatchStream; +use super::{expressions::PhysicalSortExpr, SendableRecordBatchStream}; use crate::execution::runtime_env::RuntimeEnv; use crate::physical_plan::metrics::{ExecutionPlanMetricsSet, MemTrackingMetrics}; use async_trait::async_trait; @@ -89,6 +89,10 @@ impl ExecutionPlan for ExplainExec { Partitioning::UnknownPartitioning(1) } + fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { + None + } + fn with_new_children( &self, children: Vec>, diff --git a/datafusion/src/physical_plan/file_format/avro.rs b/datafusion/src/physical_plan/file_format/avro.rs index b38968665015..ca94a506906b 100644 --- a/datafusion/src/physical_plan/file_format/avro.rs +++ b/datafusion/src/physical_plan/file_format/avro.rs @@ -19,6 +19,7 @@ #[cfg(feature = "avro")] use crate::avro_to_arrow; use crate::error::{DataFusionError, Result}; +use crate::physical_plan::expressions::PhysicalSortExpr; use crate::physical_plan::{ DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream, Statistics, }; @@ -74,6 +75,10 @@ impl ExecutionPlan for AvroExec { Partitioning::UnknownPartitioning(self.base_config.file_groups.len()) } + fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { + None + } + fn children(&self) -> Vec> { Vec::new() } diff --git a/datafusion/src/physical_plan/file_format/csv.rs b/datafusion/src/physical_plan/file_format/csv.rs index 4cf70f6e5cfd..797463db6446 100644 --- a/datafusion/src/physical_plan/file_format/csv.rs +++ b/datafusion/src/physical_plan/file_format/csv.rs @@ -18,6 +18,7 @@ //! Execution plan for reading CSV files use crate::error::{DataFusionError, Result}; +use crate::physical_plan::expressions::PhysicalSortExpr; use crate::physical_plan::{ DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream, Statistics, }; @@ -88,6 +89,10 @@ impl ExecutionPlan for CsvExec { Partitioning::UnknownPartitioning(self.base_config.file_groups.len()) } + fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { + None + } + fn children(&self) -> Vec> { // this is a leaf node and has no children vec![] diff --git a/datafusion/src/physical_plan/file_format/json.rs b/datafusion/src/physical_plan/file_format/json.rs index ac413062caf8..3e6059f14c88 100644 --- a/datafusion/src/physical_plan/file_format/json.rs +++ b/datafusion/src/physical_plan/file_format/json.rs @@ -20,6 +20,7 @@ use async_trait::async_trait; use crate::error::{DataFusionError, Result}; use crate::execution::runtime_env::RuntimeEnv; +use crate::physical_plan::expressions::PhysicalSortExpr; use crate::physical_plan::{ DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream, Statistics, }; @@ -65,6 +66,10 @@ impl ExecutionPlan for NdJsonExec { Partitioning::UnknownPartitioning(self.base_config.file_groups.len()) } + fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { + None + } + fn children(&self) -> Vec> { Vec::new() } diff --git a/datafusion/src/physical_plan/file_format/parquet.rs b/datafusion/src/physical_plan/file_format/parquet.rs index 40acf5a51c17..5663c6956b90 100644 --- a/datafusion/src/physical_plan/file_format/parquet.rs +++ b/datafusion/src/physical_plan/file_format/parquet.rs @@ -24,6 +24,7 @@ use std::{any::Any, convert::TryInto}; use crate::datasource::file_format::parquet::ChunkObjectReader; use crate::datasource::object_store::ObjectStore; use crate::datasource::PartitionedFile; +use crate::physical_plan::expressions::PhysicalSortExpr; use crate::{ error::{DataFusionError, Result}, logical_plan::{Column, Expr}, @@ -174,6 +175,10 @@ impl ExecutionPlan for ParquetExec { Partitioning::UnknownPartitioning(self.base_config.file_groups.len()) } + fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { + None + } + fn with_new_children( &self, children: Vec>, diff --git a/datafusion/src/physical_plan/filter.rs b/datafusion/src/physical_plan/filter.rs index a48d11236dc8..a4746ff7db97 100644 --- a/datafusion/src/physical_plan/filter.rs +++ b/datafusion/src/physical_plan/filter.rs @@ -23,6 +23,7 @@ use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; +use super::expressions::PhysicalSortExpr; use super::{RecordBatchStream, SendableRecordBatchStream, Statistics}; use crate::error::{DataFusionError, Result}; use crate::physical_plan::{ @@ -104,6 +105,15 @@ impl ExecutionPlan for FilterExec { self.input.output_partitioning() } + fn maintains_input_order(&self) -> bool { + // tell optimizer this operator doesn't reorder its input + true + } + + fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { + self.input.output_ordering() + } + fn with_new_children( &self, children: Vec>, diff --git a/datafusion/src/physical_plan/hash_aggregate.rs b/datafusion/src/physical_plan/hash_aggregate.rs index 2b1a59efb653..717c1ba17800 100644 --- a/datafusion/src/physical_plan/hash_aggregate.rs +++ b/datafusion/src/physical_plan/hash_aggregate.rs @@ -52,6 +52,7 @@ use crate::execution::runtime_env::RuntimeEnv; use async_trait::async_trait; use super::common::AbortOnDropSingle; +use super::expressions::PhysicalSortExpr; use super::metrics::{ self, BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet, RecordOutput, }; @@ -208,6 +209,10 @@ impl ExecutionPlan for HashAggregateExec { self.input.output_partitioning() } + fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { + None + } + async fn execute( &self, partition: usize, @@ -1148,6 +1153,10 @@ mod tests { Partitioning::UnknownPartitioning(1) } + fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { + None + } + fn with_new_children( &self, _: Vec>, diff --git a/datafusion/src/physical_plan/hash_join.rs b/datafusion/src/physical_plan/hash_join.rs index 39479f9485e5..91f7765701a4 100644 --- a/datafusion/src/physical_plan/hash_join.rs +++ b/datafusion/src/physical_plan/hash_join.rs @@ -54,6 +54,7 @@ use hashbrown::raw::RawTable; use super::{ coalesce_partitions::CoalescePartitionsExec, + expressions::PhysicalSortExpr, join_utils::{build_join_schema, check_join_is_valid, ColumnIndex, JoinOn, JoinSide}, }; use super::{ @@ -278,6 +279,10 @@ impl ExecutionPlan for HashJoinExec { self.right.output_partitioning() } + fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { + None + } + async fn execute( &self, partition: usize, diff --git a/datafusion/src/physical_plan/limit.rs b/datafusion/src/physical_plan/limit.rs index 587780d9de4a..f150c5601294 100644 --- a/datafusion/src/physical_plan/limit.rs +++ b/datafusion/src/physical_plan/limit.rs @@ -35,6 +35,7 @@ use arrow::datatypes::SchemaRef; use arrow::error::Result as ArrowResult; use arrow::record_batch::RecordBatch; +use super::expressions::PhysicalSortExpr; use super::{ metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet}, RecordBatchStream, SendableRecordBatchStream, Statistics, @@ -99,6 +100,22 @@ impl ExecutionPlan for GlobalLimitExec { Partitioning::UnknownPartitioning(1) } + fn relies_on_input_order(&self) -> bool { + self.input.output_ordering().is_some() + } + + fn maintains_input_order(&self) -> bool { + true + } + + fn benefits_from_input_partitioning(&self) -> bool { + false + } + + fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { + self.input.output_ordering() + } + fn with_new_children( &self, children: Vec>, @@ -232,6 +249,24 @@ impl ExecutionPlan for LocalLimitExec { self.input.output_partitioning() } + fn relies_on_input_order(&self) -> bool { + self.input.output_ordering().is_some() + } + + fn benefits_from_input_partitioning(&self) -> bool { + false + } + + // Local limit does not make any attempt to maintain the input + // sortedness (if there is more than one partition) + fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { + if self.output_partitioning().partition_count() == 1 { + self.input.output_ordering() + } else { + None + } + } + fn with_new_children( &self, children: Vec>, @@ -300,11 +335,6 @@ impl ExecutionPlan for LocalLimitExec { _ => Statistics::default(), } } - - fn should_repartition_children(&self) -> bool { - // No reason to repartition children as this node is just limiting each input partition. - false - } } /// Truncate a RecordBatch to maximum of n rows diff --git a/datafusion/src/physical_plan/memory.rs b/datafusion/src/physical_plan/memory.rs index 8e5f37953d75..5900e804f67b 100644 --- a/datafusion/src/physical_plan/memory.rs +++ b/datafusion/src/physical_plan/memory.rs @@ -22,6 +22,7 @@ use std::any::Any; use std::sync::Arc; use std::task::{Context, Poll}; +use super::expressions::PhysicalSortExpr; use super::{ common, project_schema, DisplayFormatType, ExecutionPlan, Partitioning, RecordBatchStream, SendableRecordBatchStream, Statistics, @@ -77,6 +78,10 @@ impl ExecutionPlan for MemoryExec { Partitioning::UnknownPartitioning(self.partitions.len()) } + fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { + None + } + fn with_new_children( &self, _: Vec>, diff --git a/datafusion/src/physical_plan/mod.rs b/datafusion/src/physical_plan/mod.rs index ac70f2f90ae2..2023a67bd0e9 100644 --- a/datafusion/src/physical_plan/mod.rs +++ b/datafusion/src/physical_plan/mod.rs @@ -147,24 +147,59 @@ pub trait ExecutionPlan: Debug + Send + Sync { Distribution::UnspecifiedDistribution } - /// Returns `true` if the direct children of this `ExecutionPlan` should be repartitioned - /// to introduce greater concurrency to the plan + /// Returns `true` if this operator relies on its inputs being + /// produced in a certain order (for example that they are sorted a particular way) for correctness. /// - /// The default implementation returns `true` unless `Self::required_child_distribution` - /// returns `Distribution::SinglePartition` + /// If `true` is returned, DataFusion will not apply certain + /// optimizations which might reorder the inputs (such as + /// repartitioning to increase concurrency). /// - /// Operators that do not benefit from additional partitioning may want to return `false` - fn should_repartition_children(&self) -> bool { - !matches!( - self.required_child_distribution(), - Distribution::SinglePartition - ) + /// The default implementation returns `false` + fn relies_on_input_order(&self) -> bool { + false } + /// Returns `false` if this operator's implementation may reorder + /// rows within or between partitions. + /// + /// For example, Projection, Filter, and Limit maintain the order + /// of inputs -- they may transform values (Projection) or not + /// produce the same number of rows that went in (Filter and + /// Limit), but the rows that are produced go in the same way. + /// + /// DataFusion uses this metadata to apply certain optimizations + /// such as automatically repartitioning correctly. + /// + /// The default implementation returns `false` + fn maintains_input_order(&self) -> bool { + false + } + + /// Returns `true` if this operator would benefit from + /// partitioning its input (and thus from more parallelism). For + /// operators that do very little work the overhead of extra + /// parallelism may outweigh any benefits + /// + /// The default implementation returns `true` + fn benefits_from_input_partitioning(&self) -> bool { + // give me MOAR CPUs + true + } + + /// If the output of this operator is sorted, returns `Some(keys)` + /// with the description of how it was sorted. + /// + /// For example, Sort, (obviously) produces sorted output as does + /// SortPreservingMergeStream. Less obviously `Projection` + /// produces sorted output if its input was sorted as it does not + /// reorder the input rows + fn output_ordering(&self) -> Option<&[PhysicalSortExpr]>; + /// Get a list of child execution plans that provide the input for this plan. The returned list /// will be empty for leaf nodes, will contain a single value for unary nodes, or two /// values for binary nodes (such as joins). fn children(&self) -> Vec>; + /// Returns a new plan where all children were replaced by new plans. /// The size of `children` must be equal to the size of `ExecutionPlan::children()`. fn with_new_children( diff --git a/datafusion/src/physical_plan/planner.rs b/datafusion/src/physical_plan/planner.rs index bf8be3df720b..cbe2c00d925a 100644 --- a/datafusion/src/physical_plan/planner.rs +++ b/datafusion/src/physical_plan/planner.rs @@ -1877,6 +1877,10 @@ mod tests { Partitioning::UnknownPartitioning(1) } + fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { + None + } + fn children(&self) -> Vec> { vec![] } diff --git a/datafusion/src/physical_plan/projection.rs b/datafusion/src/physical_plan/projection.rs index 4d0cc61c99e2..e6cb3ff7ef86 100644 --- a/datafusion/src/physical_plan/projection.rs +++ b/datafusion/src/physical_plan/projection.rs @@ -34,7 +34,7 @@ use arrow::datatypes::{Field, Schema, SchemaRef}; use arrow::error::Result as ArrowResult; use arrow::record_batch::RecordBatch; -use super::expressions::Column; +use super::expressions::{Column, PhysicalSortExpr}; use super::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet}; use super::{RecordBatchStream, SendableRecordBatchStream, Statistics}; use crate::execution::runtime_env::RuntimeEnv; @@ -122,6 +122,15 @@ impl ExecutionPlan for ProjectionExec { self.input.output_partitioning() } + fn maintains_input_order(&self) -> bool { + // tell optimizer this operator doesn't reorder its input + true + } + + fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { + self.input.output_ordering() + } + fn with_new_children( &self, children: Vec>, diff --git a/datafusion/src/physical_plan/repartition.rs b/datafusion/src/physical_plan/repartition.rs index 86866728cdda..d65b02928390 100644 --- a/datafusion/src/physical_plan/repartition.rs +++ b/datafusion/src/physical_plan/repartition.rs @@ -32,6 +32,7 @@ use arrow::{compute::take, datatypes::SchemaRef}; use tokio_stream::wrappers::UnboundedReceiverStream; use super::common::{AbortOnDropMany, AbortOnDropSingle}; +use super::expressions::PhysicalSortExpr; use super::metrics::{self, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet}; use super::{RecordBatchStream, SendableRecordBatchStream}; use async_trait::async_trait; @@ -165,6 +166,10 @@ impl ExecutionPlan for RepartitionExec { self.partitioning.clone() } + fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { + None + } + async fn execute( &self, partition: usize, diff --git a/datafusion/src/physical_plan/sorts/sort.rs b/datafusion/src/physical_plan/sorts/sort.rs index 7f7f58104fc8..87f09403bcbc 100644 --- a/datafusion/src/physical_plan/sorts/sort.rs +++ b/datafusion/src/physical_plan/sorts/sort.rs @@ -445,6 +445,14 @@ impl ExecutionPlan for SortExec { vec![self.input.clone()] } + fn benefits_from_input_partitioning(&self) -> bool { + false + } + + fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { + Some(&self.expr) + } + fn with_new_children( &self, children: Vec>, diff --git a/datafusion/src/physical_plan/sorts/sort_preserving_merge.rs b/datafusion/src/physical_plan/sorts/sort_preserving_merge.rs index d015e0c51f39..a948ceb975e0 100644 --- a/datafusion/src/physical_plan/sorts/sort_preserving_merge.rs +++ b/datafusion/src/physical_plan/sorts/sort_preserving_merge.rs @@ -128,10 +128,12 @@ impl ExecutionPlan for SortPreservingMergeExec { Distribution::UnspecifiedDistribution } - fn should_repartition_children(&self) -> bool { - // if the children are repartitioned they may no longer remain - // sorted - false + fn relies_on_input_order(&self) -> bool { + true + } + + fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { + Some(&self.expr) } fn children(&self) -> Vec> { diff --git a/datafusion/src/physical_plan/union.rs b/datafusion/src/physical_plan/union.rs index d2c170bc27f8..7eaa873add92 100644 --- a/datafusion/src/physical_plan/union.rs +++ b/datafusion/src/physical_plan/union.rs @@ -27,6 +27,7 @@ use arrow::{datatypes::SchemaRef, record_batch::RecordBatch}; use futures::StreamExt; use super::{ + expressions::PhysicalSortExpr, metrics::{ExecutionPlanMetricsSet, MetricsSet}, ColumnStatistics, DisplayFormatType, ExecutionPlan, Partitioning, RecordBatchStream, SendableRecordBatchStream, Statistics, @@ -85,6 +86,10 @@ impl ExecutionPlan for UnionExec { Partitioning::UnknownPartitioning(num_partitions) } + fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { + None + } + fn with_new_children( &self, children: Vec>, @@ -144,7 +149,7 @@ impl ExecutionPlan for UnionExec { .unwrap_or_default() } - fn should_repartition_children(&self) -> bool { + fn benefits_from_input_partitioning(&self) -> bool { false } } diff --git a/datafusion/src/physical_plan/values.rs b/datafusion/src/physical_plan/values.rs index c3a7ea5c162c..387041dbce07 100644 --- a/datafusion/src/physical_plan/values.rs +++ b/datafusion/src/physical_plan/values.rs @@ -17,6 +17,7 @@ //! Values execution plan +use super::expressions::PhysicalSortExpr; use super::{common, SendableRecordBatchStream, Statistics}; use crate::error::{DataFusionError, Result}; use crate::execution::runtime_env::RuntimeEnv; @@ -119,6 +120,10 @@ impl ExecutionPlan for ValuesExec { Partitioning::UnknownPartitioning(1) } + fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { + None + } + fn with_new_children( &self, children: Vec>, diff --git a/datafusion/src/physical_plan/windows/window_agg_exec.rs b/datafusion/src/physical_plan/windows/window_agg_exec.rs index 491e0ebf45f8..752d6e263668 100644 --- a/datafusion/src/physical_plan/windows/window_agg_exec.rs +++ b/datafusion/src/physical_plan/windows/window_agg_exec.rs @@ -20,6 +20,7 @@ use crate::error::{DataFusionError, Result}; use crate::execution::runtime_env::RuntimeEnv; use crate::physical_plan::common::AbortOnDropSingle; +use crate::physical_plan::expressions::PhysicalSortExpr; use crate::physical_plan::metrics::{ BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet, }; @@ -114,6 +115,14 @@ impl ExecutionPlan for WindowAggExec { self.input.output_partitioning() } + fn maintains_input_order(&self) -> bool { + true + } + + fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { + self.input.output_ordering() + } + fn required_child_distribution(&self) -> Distribution { if self .window_expr() diff --git a/datafusion/src/test/exec.rs b/datafusion/src/test/exec.rs index 39b8e5c11f5b..5a6b27865d13 100644 --- a/datafusion/src/test/exec.rs +++ b/datafusion/src/test/exec.rs @@ -33,7 +33,6 @@ use arrow::{ }; use futures::Stream; -use crate::execution::runtime_env::RuntimeEnv; use crate::physical_plan::{ common, DisplayFormatType, ExecutionPlan, Partitioning, RecordBatchStream, SendableRecordBatchStream, Statistics, @@ -42,6 +41,9 @@ use crate::{ error::{DataFusionError, Result}, physical_plan::stream::RecordBatchReceiverStream, }; +use crate::{ + execution::runtime_env::RuntimeEnv, physical_plan::expressions::PhysicalSortExpr, +}; /// Index into the data that has been returned so far #[derive(Debug, Default, Clone)] @@ -151,6 +153,10 @@ impl ExecutionPlan for MockExec { Partitioning::UnknownPartitioning(1) } + fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { + None + } + fn children(&self) -> Vec> { unimplemented!() } @@ -286,6 +292,10 @@ impl ExecutionPlan for BarrierExec { Partitioning::UnknownPartitioning(self.data.len()) } + fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { + None + } + fn children(&self) -> Vec> { unimplemented!() } @@ -383,6 +393,10 @@ impl ExecutionPlan for ErrorExec { Partitioning::UnknownPartitioning(1) } + fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { + None + } + fn children(&self) -> Vec> { unimplemented!() } @@ -459,6 +473,10 @@ impl ExecutionPlan for StatisticsExec { Partitioning::UnknownPartitioning(2) } + fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { + None + } + fn children(&self) -> Vec> { vec![] } @@ -560,6 +578,10 @@ impl ExecutionPlan for BlockingExec { Partitioning::UnknownPartitioning(self.n_partitions) } + fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { + None + } + fn with_new_children( &self, _: Vec>, diff --git a/datafusion/tests/custom_sources.rs b/datafusion/tests/custom_sources.rs index 0e7f733232fa..926a017f14af 100644 --- a/datafusion/tests/custom_sources.rs +++ b/datafusion/tests/custom_sources.rs @@ -22,6 +22,7 @@ use arrow::error::Result as ArrowResult; use arrow::record_batch::RecordBatch; use datafusion::from_slice::FromSlice; use datafusion::physical_plan::empty::EmptyExec; +use datafusion::physical_plan::expressions::PhysicalSortExpr; use datafusion::scalar::ScalarValue; use datafusion::{datasource::TableProvider, physical_plan::collect}; use datafusion::{ @@ -113,6 +114,9 @@ impl ExecutionPlan for CustomExecutionPlan { fn output_partitioning(&self) -> Partitioning { Partitioning::UnknownPartitioning(1) } + fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { + None + } fn children(&self) -> Vec> { vec![] } diff --git a/datafusion/tests/provider_filter_pushdown.rs b/datafusion/tests/provider_filter_pushdown.rs index 3aac5a8f3662..203fb7ce56ff 100644 --- a/datafusion/tests/provider_filter_pushdown.rs +++ b/datafusion/tests/provider_filter_pushdown.rs @@ -25,6 +25,7 @@ use datafusion::execution::context::ExecutionContext; use datafusion::execution::runtime_env::RuntimeEnv; use datafusion::logical_plan::Expr; use datafusion::physical_plan::common::SizedRecordBatchStream; +use datafusion::physical_plan::expressions::PhysicalSortExpr; use datafusion::physical_plan::metrics::{ExecutionPlanMetricsSet, MemTrackingMetrics}; use datafusion::physical_plan::{ DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream, Statistics, @@ -69,6 +70,10 @@ impl ExecutionPlan for CustomPlan { Partitioning::UnknownPartitioning(1) } + fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { + None + } + fn children(&self) -> Vec> { vec![] } diff --git a/datafusion/tests/statistics.rs b/datafusion/tests/statistics.rs index 3bc3720c670e..c5fba894e686 100644 --- a/datafusion/tests/statistics.rs +++ b/datafusion/tests/statistics.rs @@ -25,8 +25,9 @@ use datafusion::{ error::{DataFusionError, Result}, logical_plan::Expr, physical_plan::{ - project_schema, ColumnStatistics, DisplayFormatType, ExecutionPlan, Partitioning, - SendableRecordBatchStream, Statistics, + expressions::PhysicalSortExpr, project_schema, ColumnStatistics, + DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream, + Statistics, }, prelude::ExecutionContext, scalar::ScalarValue, @@ -119,6 +120,10 @@ impl ExecutionPlan for StatisticsValidation { Partitioning::UnknownPartitioning(2) } + fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { + None + } + fn children(&self) -> Vec> { vec![] } diff --git a/datafusion/tests/user_defined_plan.rs b/datafusion/tests/user_defined_plan.rs index a4d9e97b1cf9..9b20b539283d 100644 --- a/datafusion/tests/user_defined_plan.rs +++ b/datafusion/tests/user_defined_plan.rs @@ -74,6 +74,7 @@ use datafusion::{ logical_plan::{Expr, LogicalPlan, UserDefinedLogicalNode}, optimizer::{optimizer::OptimizerRule, utils::optimize_children}, physical_plan::{ + expressions::PhysicalSortExpr, planner::{DefaultPhysicalPlanner, ExtensionPlanner}, DisplayFormatType, Distribution, ExecutionPlan, Partitioning, PhysicalPlanner, RecordBatchStream, SendableRecordBatchStream, Statistics, @@ -432,6 +433,10 @@ impl ExecutionPlan for TopKExec { Partitioning::UnknownPartitioning(1) } + fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { + None + } + fn required_child_distribution(&self) -> Distribution { Distribution::SinglePartition } From 9dd4524d4d3903e75907f2190de5b09d39ea5e34 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Mon, 7 Feb 2022 16:04:23 -0500 Subject: [PATCH 03/11] fix: fix ballitsa --- .../src/execution_plans/distributed_query.rs | 5 +++++ .../core/src/execution_plans/shuffle_reader.rs | 5 +++++ .../core/src/execution_plans/shuffle_writer.rs | 5 +++++ .../src/execution_plans/unresolved_shuffle.rs | 5 +++++ ballista/rust/executor/src/collect.rs | 5 +++++ datafusion/src/physical_plan/mod.rs | 18 +++++++++--------- 6 files changed, 34 insertions(+), 9 deletions(-) diff --git a/ballista/rust/core/src/execution_plans/distributed_query.rs b/ballista/rust/core/src/execution_plans/distributed_query.rs index 619cc9bc925d..d96a5cd1b8ba 100644 --- a/ballista/rust/core/src/execution_plans/distributed_query.rs +++ b/ballista/rust/core/src/execution_plans/distributed_query.rs @@ -33,6 +33,7 @@ use crate::utils::WrappedStream; use datafusion::arrow::datatypes::{Schema, SchemaRef}; use datafusion::error::{DataFusionError, Result}; use datafusion::logical_plan::LogicalPlan; +use datafusion::physical_plan::expressions::PhysicalSortExpr; use datafusion::physical_plan::{ DisplayFormatType, ExecutionPlan, Partitioning, RecordBatchStream, SendableRecordBatchStream, Statistics, @@ -82,6 +83,10 @@ impl ExecutionPlan for DistributedQueryExec { Partitioning::UnknownPartitioning(1) } + fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { + None + } + fn children(&self) -> Vec> { vec![] } diff --git a/ballista/rust/core/src/execution_plans/shuffle_reader.rs b/ballista/rust/core/src/execution_plans/shuffle_reader.rs index ea3381deb088..82aed041dbf4 100644 --- a/ballista/rust/core/src/execution_plans/shuffle_reader.rs +++ b/ballista/rust/core/src/execution_plans/shuffle_reader.rs @@ -28,6 +28,7 @@ use datafusion::arrow::datatypes::SchemaRef; use datafusion::arrow::error::Result as ArrowResult; use datafusion::arrow::record_batch::RecordBatch; use datafusion::execution::runtime_env::RuntimeEnv; +use datafusion::physical_plan::expressions::PhysicalSortExpr; use datafusion::physical_plan::metrics::{ ExecutionPlanMetricsSet, MetricBuilder, MetricsSet, }; @@ -85,6 +86,10 @@ impl ExecutionPlan for ShuffleReaderExec { Partitioning::UnknownPartitioning(self.partition.len()) } + fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { + None + } + fn children(&self) -> Vec> { vec![] } diff --git a/ballista/rust/core/src/execution_plans/shuffle_writer.rs b/ballista/rust/core/src/execution_plans/shuffle_writer.rs index 724bb3518d74..023b9976b9f5 100644 --- a/ballista/rust/core/src/execution_plans/shuffle_writer.rs +++ b/ballista/rust/core/src/execution_plans/shuffle_writer.rs @@ -20,6 +20,7 @@ //! partition is re-partitioned and streamed to disk in Arrow IPC format. Future stages of the query //! will use the ShuffleReaderExec to read these results. +use datafusion::physical_plan::expressions::PhysicalSortExpr; use parking_lot::Mutex; use std::fs::File; use std::iter::Iterator; @@ -334,6 +335,10 @@ impl ExecutionPlan for ShuffleWriterExec { self.plan.output_partitioning() } + fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { + None + } + fn children(&self) -> Vec> { vec![self.plan.clone()] } diff --git a/ballista/rust/core/src/execution_plans/unresolved_shuffle.rs b/ballista/rust/core/src/execution_plans/unresolved_shuffle.rs index e14c1ebf0e65..9e9020eb95f6 100644 --- a/ballista/rust/core/src/execution_plans/unresolved_shuffle.rs +++ b/ballista/rust/core/src/execution_plans/unresolved_shuffle.rs @@ -23,6 +23,7 @@ use crate::serde::scheduler::PartitionLocation; use async_trait::async_trait; use datafusion::arrow::datatypes::SchemaRef; use datafusion::execution::runtime_env::RuntimeEnv; +use datafusion::physical_plan::expressions::PhysicalSortExpr; use datafusion::physical_plan::{ DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream, Statistics, }; @@ -85,6 +86,10 @@ impl ExecutionPlan for UnresolvedShuffleExec { Partitioning::UnknownPartitioning(self.output_partition_count) } + fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { + None + } + fn children(&self) -> Vec> { vec![] } diff --git a/ballista/rust/executor/src/collect.rs b/ballista/rust/executor/src/collect.rs index 12c26ef58730..37a7f7bb0d1b 100644 --- a/ballista/rust/executor/src/collect.rs +++ b/ballista/rust/executor/src/collect.rs @@ -28,6 +28,7 @@ use datafusion::arrow::{ }; use datafusion::error::DataFusionError; use datafusion::execution::runtime_env::RuntimeEnv; +use datafusion::physical_plan::expressions::PhysicalSortExpr; use datafusion::physical_plan::{ DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream, Statistics, }; @@ -62,6 +63,10 @@ impl ExecutionPlan for CollectExec { Partitioning::UnknownPartitioning(1) } + fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { + None + } + fn children(&self) -> Vec> { vec![self.plan.clone()] } diff --git a/datafusion/src/physical_plan/mod.rs b/datafusion/src/physical_plan/mod.rs index 2023a67bd0e9..5d1a474ed528 100644 --- a/datafusion/src/physical_plan/mod.rs +++ b/datafusion/src/physical_plan/mod.rs @@ -142,6 +142,15 @@ pub trait ExecutionPlan: Debug + Send + Sync { /// Specifies the output partitioning scheme of this plan fn output_partitioning(&self) -> Partitioning; + /// If the output of this operator is sorted, returns `Some(keys)` + /// with the description of how it was sorted. + /// + /// For example, Sort, (obviously) produces sorted output as does + /// SortPreservingMergeStream. Less obviously `Projection` + /// produces sorted output if its input was sorted as it does not + /// reorder the input rows + fn output_ordering(&self) -> Option<&[PhysicalSortExpr]>; + /// Specifies the data distribution requirements of all the children for this operator fn required_child_distribution(&self) -> Distribution { Distribution::UnspecifiedDistribution @@ -186,15 +195,6 @@ pub trait ExecutionPlan: Debug + Send + Sync { true } - /// If the output of this operator is sorted, returns `Some(keys)` - /// with the description of how it was sorted. - /// - /// For example, Sort, (obviously) produces sorted output as does - /// SortPreservingMergeStream. Less obviously `Projection` - /// produces sorted output if its input was sorted as it does not - /// reorder the input rows - fn output_ordering(&self) -> Option<&[PhysicalSortExpr]>; - /// Get a list of child execution plans that provide the input for this plan. The returned list /// will be empty for leaf nodes, will contain a single value for unary nodes, or two /// values for binary nodes (such as joins). From 7c974b3964696f5e3537819cdc4cb193938bf1ec Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Tue, 8 Feb 2022 11:56:45 -0500 Subject: [PATCH 04/11] Update datafusion/src/physical_optimizer/repartition.rs Co-authored-by: xudong.w --- datafusion/src/physical_optimizer/repartition.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/src/physical_optimizer/repartition.rs b/datafusion/src/physical_optimizer/repartition.rs index 30f4c5b0e038..d7db468ae543 100644 --- a/datafusion/src/physical_optimizer/repartition.rs +++ b/datafusion/src/physical_optimizer/repartition.rs @@ -201,7 +201,7 @@ fn optimize_partitions( }; // TODO: EmptyExec causes failures with RepartitionExec - // But also not very useful to inlude + // But also not very useful to include let is_empty_exec = plan.as_any().downcast_ref::().is_some(); if would_benefit && could_repartition && can_reorder && !is_empty_exec { From 09aa5ee7d87f92fce4f4ae7d8b254cd4892675ee Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Tue, 8 Feb 2022 11:57:28 -0500 Subject: [PATCH 05/11] Update datafusion/src/physical_optimizer/repartition.rs Co-authored-by: xudong.w --- datafusion/src/physical_optimizer/repartition.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/src/physical_optimizer/repartition.rs b/datafusion/src/physical_optimizer/repartition.rs index d7db468ae543..fae8b08cc2cd 100644 --- a/datafusion/src/physical_optimizer/repartition.rs +++ b/datafusion/src/physical_optimizer/repartition.rs @@ -136,7 +136,7 @@ impl Repartition { /// can not be reordered as as something upstream is relying on that order /// /// If 'would_benefit` is false, the upstream operator doesn't -/// benefit from additional reordering +/// benefit from additional repartition /// fn optimize_partitions( target_partitions: usize, From 64ffb84c3560b1aa128d2eff7d3bfceb7594252c Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Tue, 8 Feb 2022 12:13:52 -0500 Subject: [PATCH 06/11] Add more comments --- datafusion/src/physical_optimizer/repartition.rs | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/datafusion/src/physical_optimizer/repartition.rs b/datafusion/src/physical_optimizer/repartition.rs index fae8b08cc2cd..385f1fe33492 100644 --- a/datafusion/src/physical_optimizer/repartition.rs +++ b/datafusion/src/physical_optimizer/repartition.rs @@ -130,7 +130,17 @@ impl Repartition { /// Recursively visits all `plan`s puts and then optionally adds a /// `RepartitionExec` at the output of `plan` to match -/// `target_partitions` +/// `target_partitions` in an attempt to increase the overall parallelism. +/// +/// It does so using depth first scan of the tree, and repartitions +/// any plan that: +/// +/// 1. Has fewer partitions than `target_partitions` +/// +/// 2. Has a direct parent that `benefits_from_input_partitioning` +/// +/// 3. Does not have a parent that `relies_on_input_order` unless there +/// is an intervening node that does not `maintain_input_order` /// /// if `can_reorder` is false, means that the output of this node /// can not be reordered as as something upstream is relying on that order From 785e0e09193642928f6e3dec5fd17e6ddccdf1f7 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Tue, 8 Feb 2022 12:24:43 -0500 Subject: [PATCH 07/11] Remove special `EmptyExec` case --- datafusion/src/physical_optimizer/repartition.rs | 12 +++--------- 1 file changed, 3 insertions(+), 9 deletions(-) diff --git a/datafusion/src/physical_optimizer/repartition.rs b/datafusion/src/physical_optimizer/repartition.rs index 385f1fe33492..ba67a91d4b90 100644 --- a/datafusion/src/physical_optimizer/repartition.rs +++ b/datafusion/src/physical_optimizer/repartition.rs @@ -19,9 +19,7 @@ use std::sync::Arc; use super::optimizer::PhysicalOptimizerRule; -use crate::physical_plan::{ - empty::EmptyExec, repartition::RepartitionExec, ExecutionPlan, -}; +use crate::physical_plan::{repartition::RepartitionExec, ExecutionPlan}; use crate::physical_plan::{Distribution, Partitioning::*}; use crate::{error::Result, execution::context::ExecutionConfig}; @@ -159,7 +157,7 @@ fn optimize_partitions( let new_plan = if plan.children().is_empty() { // leaf node - don't replace children - plan.clone() + plan } else { let can_reorder_children = match (plan.relies_on_input_order(), plan.maintains_input_order()) { @@ -210,11 +208,7 @@ fn optimize_partitions( Hash(_, _) => false, }; - // TODO: EmptyExec causes failures with RepartitionExec - // But also not very useful to include - let is_empty_exec = plan.as_any().downcast_ref::().is_some(); - - if would_benefit && could_repartition && can_reorder && !is_empty_exec { + if would_benefit && could_repartition && can_reorder { Ok(Arc::new(RepartitionExec::try_new( new_plan, RoundRobinBatch(target_partitions), From 6583b7eb9559b8a1d5c15dcd3e480310974b13dd Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Tue, 8 Feb 2022 12:34:30 -0500 Subject: [PATCH 08/11] restore default for benefits_from_input_partitioning --- datafusion/src/physical_plan/mod.rs | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/datafusion/src/physical_plan/mod.rs b/datafusion/src/physical_plan/mod.rs index 5d1a474ed528..909322e71924 100644 --- a/datafusion/src/physical_plan/mod.rs +++ b/datafusion/src/physical_plan/mod.rs @@ -189,10 +189,15 @@ pub trait ExecutionPlan: Debug + Send + Sync { /// operators that do very little work the overhead of extra /// parallelism may outweigh any benefits /// - /// The default implementation returns `true` + /// The default implementation returns `true` unless this operator + /// has signalled it requiers a single child input partition. fn benefits_from_input_partitioning(&self) -> bool { - // give me MOAR CPUs - true + // By default try to maximize parallelism with more CPUs if + // possible + !matches!( + self.required_child_distribution(), + Distribution::SinglePartition + ) } /// Get a list of child execution plans that provide the input for this plan. The returned list From 55aef2cf9c6c544064d3d9697b68bf7703da77f5 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Tue, 8 Feb 2022 12:56:43 -0500 Subject: [PATCH 09/11] avoid unecessary check --- datafusion/src/physical_optimizer/repartition.rs | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/datafusion/src/physical_optimizer/repartition.rs b/datafusion/src/physical_optimizer/repartition.rs index ba67a91d4b90..540cb8c4800a 100644 --- a/datafusion/src/physical_optimizer/repartition.rs +++ b/datafusion/src/physical_optimizer/repartition.rs @@ -20,7 +20,7 @@ use std::sync::Arc; use super::optimizer::PhysicalOptimizerRule; use crate::physical_plan::{repartition::RepartitionExec, ExecutionPlan}; -use crate::physical_plan::{Distribution, Partitioning::*}; +use crate::physical_plan::Partitioning::*; use crate::{error::Result, execution::context::ExecutionConfig}; /// Optimizer that introduces repartition to introduce more @@ -175,11 +175,7 @@ fn optimize_partitions( // `plan` will maintain the order, so we can only // repartition children if it is ok to reorder the // output of this node - let requires_single_partition = matches!( - plan.required_child_distribution(), - Distribution::SinglePartition - ); - can_reorder && !requires_single_partition + can_reorder } }; From a809bbddae56517722909de138734c6ea720315a Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Tue, 8 Feb 2022 13:18:40 -0500 Subject: [PATCH 10/11] default relies_on_input_order to true --- .../src/execution_plans/distributed_query.rs | 4 ++++ .../src/execution_plans/shuffle_reader.rs | 4 ++++ .../src/execution_plans/shuffle_writer.rs | 4 ++++ .../src/execution_plans/unresolved_shuffle.rs | 4 ++++ .../src/physical_optimizer/repartition.rs | 2 +- datafusion/src/physical_plan/analyze.rs | 4 ++++ .../src/physical_plan/coalesce_batches.rs | 4 ++++ .../src/physical_plan/coalesce_partitions.rs | 4 ++++ datafusion/src/physical_plan/cross_join.rs | 4 ++++ datafusion/src/physical_plan/explain.rs | 4 ++++ .../src/physical_plan/file_format/avro.rs | 4 ++++ .../src/physical_plan/file_format/csv.rs | 4 ++++ .../src/physical_plan/file_format/json.rs | 4 ++++ .../src/physical_plan/file_format/parquet.rs | 4 ++++ datafusion/src/physical_plan/filter.rs | 8 +++++-- .../src/physical_plan/hash_aggregate.rs | 4 ++++ datafusion/src/physical_plan/hash_join.rs | 4 ++++ datafusion/src/physical_plan/memory.rs | 4 ++++ datafusion/src/physical_plan/mod.rs | 23 +++++++++++++++---- datafusion/src/physical_plan/planner.rs | 4 ++++ datafusion/src/physical_plan/projection.rs | 8 +++++-- datafusion/src/physical_plan/repartition.rs | 4 ++++ datafusion/src/physical_plan/sorts/sort.rs | 5 ++++ datafusion/src/physical_plan/union.rs | 4 ++++ datafusion/src/physical_plan/values.rs | 4 ++++ .../physical_plan/windows/window_agg_exec.rs | 8 +++++-- 26 files changed, 122 insertions(+), 12 deletions(-) diff --git a/ballista/rust/core/src/execution_plans/distributed_query.rs b/ballista/rust/core/src/execution_plans/distributed_query.rs index d96a5cd1b8ba..22160158cb34 100644 --- a/ballista/rust/core/src/execution_plans/distributed_query.rs +++ b/ballista/rust/core/src/execution_plans/distributed_query.rs @@ -87,6 +87,10 @@ impl ExecutionPlan for DistributedQueryExec { None } + fn relies_on_input_order(&self) -> bool { + false + } + fn children(&self) -> Vec> { vec![] } diff --git a/ballista/rust/core/src/execution_plans/shuffle_reader.rs b/ballista/rust/core/src/execution_plans/shuffle_reader.rs index 82aed041dbf4..7482c1843ed3 100644 --- a/ballista/rust/core/src/execution_plans/shuffle_reader.rs +++ b/ballista/rust/core/src/execution_plans/shuffle_reader.rs @@ -90,6 +90,10 @@ impl ExecutionPlan for ShuffleReaderExec { None } + fn relies_on_input_order(&self) -> bool { + false + } + fn children(&self) -> Vec> { vec![] } diff --git a/ballista/rust/core/src/execution_plans/shuffle_writer.rs b/ballista/rust/core/src/execution_plans/shuffle_writer.rs index 023b9976b9f5..fbce0653a9cc 100644 --- a/ballista/rust/core/src/execution_plans/shuffle_writer.rs +++ b/ballista/rust/core/src/execution_plans/shuffle_writer.rs @@ -339,6 +339,10 @@ impl ExecutionPlan for ShuffleWriterExec { None } + fn relies_on_input_order(&self) -> bool { + false + } + fn children(&self) -> Vec> { vec![self.plan.clone()] } diff --git a/ballista/rust/core/src/execution_plans/unresolved_shuffle.rs b/ballista/rust/core/src/execution_plans/unresolved_shuffle.rs index 9e9020eb95f6..90b7c7931917 100644 --- a/ballista/rust/core/src/execution_plans/unresolved_shuffle.rs +++ b/ballista/rust/core/src/execution_plans/unresolved_shuffle.rs @@ -90,6 +90,10 @@ impl ExecutionPlan for UnresolvedShuffleExec { None } + fn relies_on_input_order(&self) -> bool { + false + } + fn children(&self) -> Vec> { vec![] } diff --git a/datafusion/src/physical_optimizer/repartition.rs b/datafusion/src/physical_optimizer/repartition.rs index 540cb8c4800a..ae074d2893da 100644 --- a/datafusion/src/physical_optimizer/repartition.rs +++ b/datafusion/src/physical_optimizer/repartition.rs @@ -19,8 +19,8 @@ use std::sync::Arc; use super::optimizer::PhysicalOptimizerRule; -use crate::physical_plan::{repartition::RepartitionExec, ExecutionPlan}; use crate::physical_plan::Partitioning::*; +use crate::physical_plan::{repartition::RepartitionExec, ExecutionPlan}; use crate::{error::Result, execution::context::ExecutionConfig}; /// Optimizer that introduces repartition to introduce more diff --git a/datafusion/src/physical_plan/analyze.rs b/datafusion/src/physical_plan/analyze.rs index a704bacf6f79..6857ad532273 100644 --- a/datafusion/src/physical_plan/analyze.rs +++ b/datafusion/src/physical_plan/analyze.rs @@ -87,6 +87,10 @@ impl ExecutionPlan for AnalyzeExec { None } + fn relies_on_input_order(&self) -> bool { + false + } + fn with_new_children( &self, mut children: Vec>, diff --git a/datafusion/src/physical_plan/coalesce_batches.rs b/datafusion/src/physical_plan/coalesce_batches.rs index 746c74a90214..0d6fe38636f6 100644 --- a/datafusion/src/physical_plan/coalesce_batches.rs +++ b/datafusion/src/physical_plan/coalesce_batches.rs @@ -102,6 +102,10 @@ impl ExecutionPlan for CoalesceBatchesExec { None } + fn relies_on_input_order(&self) -> bool { + false + } + fn with_new_children( &self, children: Vec>, diff --git a/datafusion/src/physical_plan/coalesce_partitions.rs b/datafusion/src/physical_plan/coalesce_partitions.rs index f5afcdb738e5..20b548733715 100644 --- a/datafusion/src/physical_plan/coalesce_partitions.rs +++ b/datafusion/src/physical_plan/coalesce_partitions.rs @@ -91,6 +91,10 @@ impl ExecutionPlan for CoalescePartitionsExec { None } + fn relies_on_input_order(&self) -> bool { + false + } + fn with_new_children( &self, children: Vec>, diff --git a/datafusion/src/physical_plan/cross_join.rs b/datafusion/src/physical_plan/cross_join.rs index 7e02dd67f625..82ee5618f5f0 100644 --- a/datafusion/src/physical_plan/cross_join.rs +++ b/datafusion/src/physical_plan/cross_join.rs @@ -142,6 +142,10 @@ impl ExecutionPlan for CrossJoinExec { None } + fn relies_on_input_order(&self) -> bool { + false + } + async fn execute( &self, partition: usize, diff --git a/datafusion/src/physical_plan/explain.rs b/datafusion/src/physical_plan/explain.rs index 5d7cb3603aef..0955655a1929 100644 --- a/datafusion/src/physical_plan/explain.rs +++ b/datafusion/src/physical_plan/explain.rs @@ -93,6 +93,10 @@ impl ExecutionPlan for ExplainExec { None } + fn relies_on_input_order(&self) -> bool { + false + } + fn with_new_children( &self, children: Vec>, diff --git a/datafusion/src/physical_plan/file_format/avro.rs b/datafusion/src/physical_plan/file_format/avro.rs index ca94a506906b..ba0873d78b2b 100644 --- a/datafusion/src/physical_plan/file_format/avro.rs +++ b/datafusion/src/physical_plan/file_format/avro.rs @@ -79,6 +79,10 @@ impl ExecutionPlan for AvroExec { None } + fn relies_on_input_order(&self) -> bool { + false + } + fn children(&self) -> Vec> { Vec::new() } diff --git a/datafusion/src/physical_plan/file_format/csv.rs b/datafusion/src/physical_plan/file_format/csv.rs index 797463db6446..709705b5066d 100644 --- a/datafusion/src/physical_plan/file_format/csv.rs +++ b/datafusion/src/physical_plan/file_format/csv.rs @@ -89,6 +89,10 @@ impl ExecutionPlan for CsvExec { Partitioning::UnknownPartitioning(self.base_config.file_groups.len()) } + fn relies_on_input_order(&self) -> bool { + false + } + fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { None } diff --git a/datafusion/src/physical_plan/file_format/json.rs b/datafusion/src/physical_plan/file_format/json.rs index 3e6059f14c88..6c5ffcd99eac 100644 --- a/datafusion/src/physical_plan/file_format/json.rs +++ b/datafusion/src/physical_plan/file_format/json.rs @@ -70,6 +70,10 @@ impl ExecutionPlan for NdJsonExec { None } + fn relies_on_input_order(&self) -> bool { + false + } + fn children(&self) -> Vec> { Vec::new() } diff --git a/datafusion/src/physical_plan/file_format/parquet.rs b/datafusion/src/physical_plan/file_format/parquet.rs index 5663c6956b90..539904e58367 100644 --- a/datafusion/src/physical_plan/file_format/parquet.rs +++ b/datafusion/src/physical_plan/file_format/parquet.rs @@ -179,6 +179,10 @@ impl ExecutionPlan for ParquetExec { None } + fn relies_on_input_order(&self) -> bool { + false + } + fn with_new_children( &self, children: Vec>, diff --git a/datafusion/src/physical_plan/filter.rs b/datafusion/src/physical_plan/filter.rs index a4746ff7db97..41250582e9de 100644 --- a/datafusion/src/physical_plan/filter.rs +++ b/datafusion/src/physical_plan/filter.rs @@ -105,13 +105,17 @@ impl ExecutionPlan for FilterExec { self.input.output_partitioning() } + fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { + self.input.output_ordering() + } + fn maintains_input_order(&self) -> bool { // tell optimizer this operator doesn't reorder its input true } - fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { - self.input.output_ordering() + fn relies_on_input_order(&self) -> bool { + false } fn with_new_children( diff --git a/datafusion/src/physical_plan/hash_aggregate.rs b/datafusion/src/physical_plan/hash_aggregate.rs index 717c1ba17800..b727cdd2e970 100644 --- a/datafusion/src/physical_plan/hash_aggregate.rs +++ b/datafusion/src/physical_plan/hash_aggregate.rs @@ -213,6 +213,10 @@ impl ExecutionPlan for HashAggregateExec { None } + fn relies_on_input_order(&self) -> bool { + false + } + async fn execute( &self, partition: usize, diff --git a/datafusion/src/physical_plan/hash_join.rs b/datafusion/src/physical_plan/hash_join.rs index 91f7765701a4..d276ac2e72de 100644 --- a/datafusion/src/physical_plan/hash_join.rs +++ b/datafusion/src/physical_plan/hash_join.rs @@ -283,6 +283,10 @@ impl ExecutionPlan for HashJoinExec { None } + fn relies_on_input_order(&self) -> bool { + false + } + async fn execute( &self, partition: usize, diff --git a/datafusion/src/physical_plan/memory.rs b/datafusion/src/physical_plan/memory.rs index 5900e804f67b..cc8208346516 100644 --- a/datafusion/src/physical_plan/memory.rs +++ b/datafusion/src/physical_plan/memory.rs @@ -82,6 +82,10 @@ impl ExecutionPlan for MemoryExec { None } + fn relies_on_input_order(&self) -> bool { + false + } + fn with_new_children( &self, _: Vec>, diff --git a/datafusion/src/physical_plan/mod.rs b/datafusion/src/physical_plan/mod.rs index 909322e71924..455d9a5c3761 100644 --- a/datafusion/src/physical_plan/mod.rs +++ b/datafusion/src/physical_plan/mod.rs @@ -148,24 +148,33 @@ pub trait ExecutionPlan: Debug + Send + Sync { /// For example, Sort, (obviously) produces sorted output as does /// SortPreservingMergeStream. Less obviously `Projection` /// produces sorted output if its input was sorted as it does not - /// reorder the input rows + /// reorder the input rows, + /// + /// It is safe to return `None` here if your operator does not + /// have any particular output order here fn output_ordering(&self) -> Option<&[PhysicalSortExpr]>; - /// Specifies the data distribution requirements of all the children for this operator + /// Specifies the data distribution requirements of all the + /// children for this operator fn required_child_distribution(&self) -> Distribution { Distribution::UnspecifiedDistribution } /// Returns `true` if this operator relies on its inputs being - /// produced in a certain order (for example that they are sorted a particular way) for correctness. + /// produced in a certain order (for example that they are sorted + /// a particular way) for correctness. /// /// If `true` is returned, DataFusion will not apply certain /// optimizations which might reorder the inputs (such as /// repartitioning to increase concurrency). /// - /// The default implementation returns `false` + /// The default implementation returns `true` + /// + /// WARNING: if you override this default and return `false`, your + /// operator can not rely on datafusion preserving the input order + /// as it will likely not. fn relies_on_input_order(&self) -> bool { - false + true } /// Returns `false` if this operator's implementation may reorder @@ -180,6 +189,10 @@ pub trait ExecutionPlan: Debug + Send + Sync { /// such as automatically repartitioning correctly. /// /// The default implementation returns `false` + /// + /// WARNING: if you override this default, you *MUST* ensure that + /// the operator's maintains the ordering invariant or else + /// DataFusion may produce incorrect results. fn maintains_input_order(&self) -> bool { false } diff --git a/datafusion/src/physical_plan/planner.rs b/datafusion/src/physical_plan/planner.rs index cbe2c00d925a..8674b61260a9 100644 --- a/datafusion/src/physical_plan/planner.rs +++ b/datafusion/src/physical_plan/planner.rs @@ -1881,6 +1881,10 @@ mod tests { None } + fn relies_on_input_order(&self) -> bool { + false + } + fn children(&self) -> Vec> { vec![] } diff --git a/datafusion/src/physical_plan/projection.rs b/datafusion/src/physical_plan/projection.rs index e6cb3ff7ef86..5940b64957c1 100644 --- a/datafusion/src/physical_plan/projection.rs +++ b/datafusion/src/physical_plan/projection.rs @@ -122,13 +122,17 @@ impl ExecutionPlan for ProjectionExec { self.input.output_partitioning() } + fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { + self.input.output_ordering() + } + fn maintains_input_order(&self) -> bool { // tell optimizer this operator doesn't reorder its input true } - fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { - self.input.output_ordering() + fn relies_on_input_order(&self) -> bool { + false } fn with_new_children( diff --git a/datafusion/src/physical_plan/repartition.rs b/datafusion/src/physical_plan/repartition.rs index d65b02928390..55328c40c951 100644 --- a/datafusion/src/physical_plan/repartition.rs +++ b/datafusion/src/physical_plan/repartition.rs @@ -147,6 +147,10 @@ impl ExecutionPlan for RepartitionExec { vec![self.input.clone()] } + fn relies_on_input_order(&self) -> bool { + false + } + fn with_new_children( &self, children: Vec>, diff --git a/datafusion/src/physical_plan/sorts/sort.rs b/datafusion/src/physical_plan/sorts/sort.rs index 87f09403bcbc..1428e1627d8f 100644 --- a/datafusion/src/physical_plan/sorts/sort.rs +++ b/datafusion/src/physical_plan/sorts/sort.rs @@ -445,6 +445,11 @@ impl ExecutionPlan for SortExec { vec![self.input.clone()] } + fn relies_on_input_order(&self) -> bool { + // this operator resorts everything + false + } + fn benefits_from_input_partitioning(&self) -> bool { false } diff --git a/datafusion/src/physical_plan/union.rs b/datafusion/src/physical_plan/union.rs index 7eaa873add92..03c41f4e1d60 100644 --- a/datafusion/src/physical_plan/union.rs +++ b/datafusion/src/physical_plan/union.rs @@ -90,6 +90,10 @@ impl ExecutionPlan for UnionExec { None } + fn relies_on_input_order(&self) -> bool { + false + } + fn with_new_children( &self, children: Vec>, diff --git a/datafusion/src/physical_plan/values.rs b/datafusion/src/physical_plan/values.rs index 387041dbce07..da39b3eb4bbc 100644 --- a/datafusion/src/physical_plan/values.rs +++ b/datafusion/src/physical_plan/values.rs @@ -124,6 +124,10 @@ impl ExecutionPlan for ValuesExec { None } + fn relies_on_input_order(&self) -> bool { + false + } + fn with_new_children( &self, children: Vec>, diff --git a/datafusion/src/physical_plan/windows/window_agg_exec.rs b/datafusion/src/physical_plan/windows/window_agg_exec.rs index 752d6e263668..163868d07838 100644 --- a/datafusion/src/physical_plan/windows/window_agg_exec.rs +++ b/datafusion/src/physical_plan/windows/window_agg_exec.rs @@ -115,12 +115,16 @@ impl ExecutionPlan for WindowAggExec { self.input.output_partitioning() } + fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { + self.input.output_ordering() + } + fn maintains_input_order(&self) -> bool { true } - fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { - self.input.output_ordering() + fn relies_on_input_order(&self) -> bool { + true } fn required_child_distribution(&self) -> Distribution { From b6b662eaf6594d45b2f2631e7c66af84a97cf36f Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Tue, 8 Feb 2022 13:59:54 -0500 Subject: [PATCH 11/11] fix test --- datafusion/tests/user_defined_plan.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/datafusion/tests/user_defined_plan.rs b/datafusion/tests/user_defined_plan.rs index 9b20b539283d..17578047378a 100644 --- a/datafusion/tests/user_defined_plan.rs +++ b/datafusion/tests/user_defined_plan.rs @@ -437,6 +437,10 @@ impl ExecutionPlan for TopKExec { None } + fn relies_on_input_order(&self) -> bool { + false + } + fn required_child_distribution(&self) -> Distribution { Distribution::SinglePartition }