diff --git a/datafusion/core/src/dataframe.rs b/datafusion/core/src/dataframe.rs index aa5958fe6221..3d1c8d009418 100644 --- a/datafusion/core/src/dataframe.rs +++ b/datafusion/core/src/dataframe.rs @@ -842,6 +842,8 @@ mod tests { use super::*; use crate::execution::options::{CsvReadOptions, ParquetReadOptions}; use crate::physical_plan::ColumnarValue; + use crate::physical_plan::Partitioning; + use crate::physical_plan::PhysicalExpr; use crate::test_util; use crate::test_util::parquet_test_data; use crate::{assert_batches_sorted_eq, execution::context::SessionContext}; @@ -851,6 +853,7 @@ mod tests { avg, cast, count, count_distinct, create_udf, lit, max, min, sum, BuiltInWindowFunction, ScalarFunctionImplementation, Volatility, WindowFunction, }; + use datafusion_physical_expr::expressions::Column; #[tokio::test] async fn select_columns() -> Result<()> { @@ -1515,4 +1518,163 @@ mod tests { Ok(()) } + + #[tokio::test] + async fn partition_aware_union() -> Result<()> { + let left = test_table().await?.select_columns(&["c1", "c2"])?; + let right = test_table_with_name("c2") + .await? + .select_columns(&["c1", "c3"])? + .with_column_renamed("c2.c1", "c2_c1")?; + + let left_rows = left.collect().await?; + let right_rows = right.collect().await?; + let join1 = + left.join(right.clone(), JoinType::Inner, &["c1"], &["c2_c1"], None)?; + let join2 = left.join(right, JoinType::Inner, &["c1"], &["c2_c1"], None)?; + + let union = join1.union(join2)?; + + let union_rows = union.collect().await?; + + assert_eq!(100, left_rows.iter().map(|x| x.num_rows()).sum::()); + assert_eq!(100, right_rows.iter().map(|x| x.num_rows()).sum::()); + assert_eq!(4016, union_rows.iter().map(|x| x.num_rows()).sum::()); + + let physical_plan = union.create_physical_plan().await?; + let default_partition_count = + SessionContext::new().copied_config().target_partitions; + + // For partition aware union, the output partition count should not be changed. + assert_eq!( + physical_plan.output_partitioning().partition_count(), + default_partition_count + ); + // For partition aware union, the output partition is the same with the union's inputs + for child in physical_plan.children() { + assert_eq!( + physical_plan.output_partitioning(), + child.output_partitioning() + ); + } + + Ok(()) + } + + #[tokio::test] + async fn non_partition_aware_union() -> Result<()> { + let left = test_table().await?.select_columns(&["c1", "c2"])?; + let right = test_table_with_name("c2") + .await? + .select_columns(&["c1", "c2"])? + .with_column_renamed("c2.c1", "c2_c1")? + .with_column_renamed("c2.c2", "c2_c2")?; + + let left_rows = left.collect().await?; + let right_rows = right.collect().await?; + let join1 = left.join( + right.clone(), + JoinType::Inner, + &["c1", "c2"], + &["c2_c1", "c2_c2"], + None, + )?; + + // join key ordering is different + let join2 = left.join( + right, + JoinType::Inner, + &["c2", "c1"], + &["c2_c2", "c2_c1"], + None, + )?; + + let union = join1.union(join2)?; + + let union_rows = union.collect().await?; + + assert_eq!(100, left_rows.iter().map(|x| x.num_rows()).sum::()); + assert_eq!(100, right_rows.iter().map(|x| x.num_rows()).sum::()); + assert_eq!(916, union_rows.iter().map(|x| x.num_rows()).sum::()); + + let physical_plan = union.create_physical_plan().await?; + let default_partition_count = + SessionContext::new().copied_config().target_partitions; + + // For non-partition aware union, the output partitioning count should be the combination of all output partitions count + assert!(matches!( + physical_plan.output_partitioning(), + Partitioning::UnknownPartitioning(partition_count) if partition_count == default_partition_count * 2)); + Ok(()) + } + + #[tokio::test] + async fn verify_join_output_partitioning() -> Result<()> { + let left = test_table().await?.select_columns(&["c1", "c2"])?; + let right = test_table_with_name("c2") + .await? + .select_columns(&["c1", "c2"])? + .with_column_renamed("c2.c1", "c2_c1")? + .with_column_renamed("c2.c2", "c2_c2")?; + + let all_join_types = vec![ + JoinType::Inner, + JoinType::Left, + JoinType::Right, + JoinType::Full, + JoinType::LeftSemi, + JoinType::RightSemi, + JoinType::LeftAnti, + JoinType::RightAnti, + ]; + + let default_partition_count = + SessionContext::new().copied_config().target_partitions; + + for join_type in all_join_types { + let join = left.join( + right.clone(), + join_type, + &["c1", "c2"], + &["c2_c1", "c2_c2"], + None, + )?; + let physical_plan = join.create_physical_plan().await?; + let out_partitioning = physical_plan.output_partitioning(); + let join_schema = physical_plan.schema(); + + match join_type { + JoinType::Inner + | JoinType::Left + | JoinType::LeftSemi + | JoinType::LeftAnti => { + let left_exprs: Vec> = vec![ + Arc::new(Column::new_with_schema("c1", &join_schema).unwrap()), + Arc::new(Column::new_with_schema("c2", &join_schema).unwrap()), + ]; + assert_eq!( + out_partitioning, + Partitioning::Hash(left_exprs, default_partition_count) + ); + } + JoinType::Right | JoinType::RightSemi | JoinType::RightAnti => { + let right_exprs: Vec> = vec![ + Arc::new(Column::new_with_schema("c2_c1", &join_schema).unwrap()), + Arc::new(Column::new_with_schema("c2_c2", &join_schema).unwrap()), + ]; + assert_eq!( + out_partitioning, + Partitioning::Hash(right_exprs, default_partition_count) + ); + } + JoinType::Full => { + assert!(matches!( + out_partitioning, + Partitioning::UnknownPartitioning(partition_count) if partition_count == default_partition_count)); + } + } + } + + Ok(()) + } } diff --git a/datafusion/core/src/physical_optimizer/merge_exec.rs b/datafusion/core/src/physical_optimizer/merge_exec.rs index 960d0582fd81..1ed0f6196e75 100644 --- a/datafusion/core/src/physical_optimizer/merge_exec.rs +++ b/datafusion/core/src/physical_optimizer/merge_exec.rs @@ -52,27 +52,20 @@ impl PhysicalOptimizerRule for AddCoalescePartitionsExec { .iter() .map(|child| self.optimize(child.clone(), _config)) .collect::>>()?; - match plan.required_child_distribution() { - Distribution::UnspecifiedDistribution => { - with_new_children_if_necessary(plan, children) - } - Distribution::HashPartitioned(_) => { - with_new_children_if_necessary(plan, children) - } - Distribution::SinglePartition => with_new_children_if_necessary( - plan, - children - .iter() - .map(|child| { - if child.output_partitioning().partition_count() == 1 { - child.clone() - } else { - Arc::new(CoalescePartitionsExec::new(child.clone())) - } - }) - .collect(), - ), - } + assert_eq!(children.len(), plan.required_input_distribution().len()); + let new_children = children + .into_iter() + .zip(plan.required_input_distribution()) + .map(|(child, dist)| match dist { + Distribution::SinglePartition + if child.output_partitioning().partition_count() > 1 => + { + Arc::new(CoalescePartitionsExec::new(child.clone())) + } + _ => child, + }) + .collect::>(); + with_new_children_if_necessary(plan, new_children) } } diff --git a/datafusion/core/src/physical_plan/aggregates/mod.rs b/datafusion/core/src/physical_plan/aggregates/mod.rs index e453b65cc94c..6fda8f77bf6d 100644 --- a/datafusion/core/src/physical_plan/aggregates/mod.rs +++ b/datafusion/core/src/physical_plan/aggregates/mod.rs @@ -37,6 +37,7 @@ use datafusion_physical_expr::{ expressions, AggregateExpr, PhysicalExpr, PhysicalSortExpr, }; use std::any::Any; +use std::collections::HashMap; use std::sync::Arc; @@ -45,9 +46,11 @@ mod no_grouping; mod row_hash; use crate::physical_plan::aggregates::row_hash::GroupedHashAggregateStreamV2; +use crate::physical_plan::EquivalenceProperties; pub use datafusion_expr::AggregateFunction; use datafusion_physical_expr::aggregate::row_accumulator::RowAccumulator; pub use datafusion_physical_expr::expressions::create_aggregate_expr; +use datafusion_physical_expr::normalize_out_expr_with_alias_schema; use datafusion_row::{row_supported, RowType}; /// Hash aggregate modes @@ -163,6 +166,9 @@ pub struct AggregateExec { /// same as input.schema() but for the final aggregate it will be the same as the input /// to the partial aggregate input_schema: SchemaRef, + /// The alias map used to normalize out expressions like Partitioning and PhysicalSortExpr + /// The key is the column from the input schema and the values are the columns from the output schema + alias_map: HashMap>, /// Execution Metrics metrics: ExecutionPlanMetricsSet, } @@ -186,6 +192,18 @@ impl AggregateExec { let schema = Arc::new(schema); + let mut alias_map: HashMap> = HashMap::new(); + for (expression, name) in group_by.expr.iter() { + if let Some(column) = expression.as_any().downcast_ref::() { + let new_col_idx = schema.index_of(name)?; + // When the column name is the same, but index does not equal, treat it as Alias + if (column.name() != name) || (column.index() != new_col_idx) { + let entry = alias_map.entry(column.clone()).or_insert_with(Vec::new); + entry.push(Column::new(name, new_col_idx)); + } + }; + } + Ok(AggregateExec { mode, group_by, @@ -193,6 +211,7 @@ impl AggregateExec { input, schema, input_schema, + alias_map, metrics: ExecutionPlanMetricsSet::new(), }) } @@ -255,25 +274,51 @@ impl ExecutionPlan for AggregateExec { /// Get the output partitioning of this plan fn output_partitioning(&self) -> Partitioning { - self.input.output_partitioning() + match &self.mode { + AggregateMode::Partial => { + // Partial Aggregation will not change the output partitioning but need to respect the Alias + let input_partition = self.input.output_partitioning(); + match input_partition { + Partitioning::Hash(exprs, part) => { + let normalized_exprs = exprs + .into_iter() + .map(|expr| { + normalize_out_expr_with_alias_schema( + expr, + &self.alias_map, + &self.schema, + ) + }) + .collect::>(); + Partitioning::Hash(normalized_exprs, part) + } + _ => input_partition, + } + } + // Final Aggregation's output partitioning is the same as its real input + _ => self.input.output_partitioning(), + } } fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { None } - fn required_child_distribution(&self) -> Distribution { + fn required_input_distribution(&self) -> Vec { match &self.mode { - AggregateMode::Partial => Distribution::UnspecifiedDistribution, - AggregateMode::FinalPartitioned => Distribution::HashPartitioned( - self.group_by.expr.iter().map(|x| x.0.clone()).collect(), - ), - AggregateMode::Final => Distribution::SinglePartition, + AggregateMode::Partial => vec![Distribution::UnspecifiedDistribution], + AggregateMode::FinalPartitioned => { + vec![Distribution::HashPartitioned(self.output_group_expr())] + } + AggregateMode::Final => vec![Distribution::SinglePartition], } } - fn relies_on_input_order(&self) -> bool { - false + fn equivalence_properties(&self) -> EquivalenceProperties { + let mut input_equivalence_properties = self.input.equivalence_properties(); + input_equivalence_properties.merge_properties_with_alias(&self.alias_map); + input_equivalence_properties.truncate_properties_not_in_schema(&self.schema); + input_equivalence_properties } fn children(&self) -> Vec> { diff --git a/datafusion/core/src/physical_plan/analyze.rs b/datafusion/core/src/physical_plan/analyze.rs index 8134ee7d2f2d..b0578bd486de 100644 --- a/datafusion/core/src/physical_plan/analyze.rs +++ b/datafusion/core/src/physical_plan/analyze.rs @@ -72,8 +72,8 @@ impl ExecutionPlan for AnalyzeExec { } /// Specifies we want the input as a single stream - fn required_child_distribution(&self) -> Distribution { - Distribution::SinglePartition + fn required_input_distribution(&self) -> Vec { + vec![Distribution::SinglePartition] } /// Get the output partitioning of this plan @@ -85,10 +85,6 @@ impl ExecutionPlan for AnalyzeExec { None } - fn relies_on_input_order(&self) -> bool { - false - } - fn with_new_children( self: Arc, mut children: Vec>, diff --git a/datafusion/core/src/physical_plan/coalesce_batches.rs b/datafusion/core/src/physical_plan/coalesce_batches.rs index 317500ddc904..e7c492732d19 100644 --- a/datafusion/core/src/physical_plan/coalesce_batches.rs +++ b/datafusion/core/src/physical_plan/coalesce_batches.rs @@ -25,8 +25,8 @@ use std::task::{Context, Poll}; use crate::error::Result; use crate::physical_plan::{ - DisplayFormatType, ExecutionPlan, Partitioning, RecordBatchStream, - SendableRecordBatchStream, + DisplayFormatType, EquivalenceProperties, ExecutionPlan, Partitioning, + RecordBatchStream, SendableRecordBatchStream, }; use crate::execution::context::TaskContext; @@ -100,8 +100,8 @@ impl ExecutionPlan for CoalesceBatchesExec { None } - fn relies_on_input_order(&self) -> bool { - false + fn equivalence_properties(&self) -> EquivalenceProperties { + self.input.equivalence_properties() } fn with_new_children( diff --git a/datafusion/core/src/physical_plan/coalesce_partitions.rs b/datafusion/core/src/physical_plan/coalesce_partitions.rs index d1c797eacd5c..816a9c9403c6 100644 --- a/datafusion/core/src/physical_plan/coalesce_partitions.rs +++ b/datafusion/core/src/physical_plan/coalesce_partitions.rs @@ -33,7 +33,9 @@ use super::expressions::PhysicalSortExpr; use super::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet}; use super::{RecordBatchStream, Statistics}; use crate::error::{DataFusionError, Result}; -use crate::physical_plan::{DisplayFormatType, ExecutionPlan, Partitioning}; +use crate::physical_plan::{ + DisplayFormatType, EquivalenceProperties, ExecutionPlan, Partitioning, +}; use super::SendableRecordBatchStream; use crate::execution::context::TaskContext; @@ -87,8 +89,8 @@ impl ExecutionPlan for CoalescePartitionsExec { None } - fn relies_on_input_order(&self) -> bool { - false + fn equivalence_properties(&self) -> EquivalenceProperties { + self.input.equivalence_properties() } fn with_new_children( diff --git a/datafusion/core/src/physical_plan/empty.rs b/datafusion/core/src/physical_plan/empty.rs index c693764c87aa..4751dade1dda 100644 --- a/datafusion/core/src/physical_plan/empty.rs +++ b/datafusion/core/src/physical_plan/empty.rs @@ -22,7 +22,7 @@ use std::sync::Arc; use crate::error::{DataFusionError, Result}; use crate::physical_plan::{ - memory::MemoryStream, DisplayFormatType, Distribution, ExecutionPlan, Partitioning, + memory::MemoryStream, DisplayFormatType, ExecutionPlan, Partitioning, }; use arrow::array::NullArray; use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; @@ -98,10 +98,6 @@ impl ExecutionPlan for EmptyExec { vec![] } - fn required_child_distribution(&self) -> Distribution { - Distribution::UnspecifiedDistribution - } - /// Get the output partitioning of this plan fn output_partitioning(&self) -> Partitioning { Partitioning::UnknownPartitioning(self.partitions) diff --git a/datafusion/core/src/physical_plan/explain.rs b/datafusion/core/src/physical_plan/explain.rs index 15f459fb045b..ac350b1837e8 100644 --- a/datafusion/core/src/physical_plan/explain.rs +++ b/datafusion/core/src/physical_plan/explain.rs @@ -97,10 +97,6 @@ impl ExecutionPlan for ExplainExec { None } - fn relies_on_input_order(&self) -> bool { - false - } - fn with_new_children( self: Arc, _: Vec>, diff --git a/datafusion/core/src/physical_plan/file_format/avro.rs b/datafusion/core/src/physical_plan/file_format/avro.rs index 2aab84fadbcf..38178a976804 100644 --- a/datafusion/core/src/physical_plan/file_format/avro.rs +++ b/datafusion/core/src/physical_plan/file_format/avro.rs @@ -76,10 +76,6 @@ impl ExecutionPlan for AvroExec { None } - fn relies_on_input_order(&self) -> bool { - false - } - fn children(&self) -> Vec> { Vec::new() } diff --git a/datafusion/core/src/physical_plan/file_format/csv.rs b/datafusion/core/src/physical_plan/file_format/csv.rs index d086a7798235..51180c0f00a8 100644 --- a/datafusion/core/src/physical_plan/file_format/csv.rs +++ b/datafusion/core/src/physical_plan/file_format/csv.rs @@ -109,10 +109,6 @@ impl ExecutionPlan for CsvExec { Partitioning::UnknownPartitioning(self.base_config.file_groups.len()) } - fn relies_on_input_order(&self) -> bool { - false - } - fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { None } diff --git a/datafusion/core/src/physical_plan/file_format/json.rs b/datafusion/core/src/physical_plan/file_format/json.rs index c8c5d71bd73f..ceb9e7958934 100644 --- a/datafusion/core/src/physical_plan/file_format/json.rs +++ b/datafusion/core/src/physical_plan/file_format/json.rs @@ -91,10 +91,6 @@ impl ExecutionPlan for NdJsonExec { None } - fn relies_on_input_order(&self) -> bool { - false - } - fn children(&self) -> Vec> { Vec::new() } diff --git a/datafusion/core/src/physical_plan/file_format/parquet.rs b/datafusion/core/src/physical_plan/file_format/parquet.rs index 24159a337643..8f0f4010ac91 100644 --- a/datafusion/core/src/physical_plan/file_format/parquet.rs +++ b/datafusion/core/src/physical_plan/file_format/parquet.rs @@ -307,10 +307,6 @@ impl ExecutionPlan for ParquetExec { None } - fn relies_on_input_order(&self) -> bool { - false - } - fn with_new_children( self: Arc, _: Vec>, diff --git a/datafusion/core/src/physical_plan/filter.rs b/datafusion/core/src/physical_plan/filter.rs index b4e3edaee05f..17a2355d0578 100644 --- a/datafusion/core/src/physical_plan/filter.rs +++ b/datafusion/core/src/physical_plan/filter.rs @@ -28,13 +28,17 @@ use super::{RecordBatchStream, SendableRecordBatchStream, Statistics}; use crate::error::{DataFusionError, Result}; use crate::physical_plan::{ metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet}, - DisplayFormatType, ExecutionPlan, Partitioning, PhysicalExpr, + Column, DisplayFormatType, EquivalenceProperties, ExecutionPlan, Partitioning, + PhysicalExpr, }; use arrow::array::BooleanArray; use arrow::compute::filter_record_batch; use arrow::datatypes::{DataType, SchemaRef}; use arrow::error::Result as ArrowResult; use arrow::record_batch::RecordBatch; +use datafusion_expr::Operator; +use datafusion_physical_expr::expressions::BinaryExpr; +use datafusion_physical_expr::split_conjunction; use log::debug; @@ -113,8 +117,14 @@ impl ExecutionPlan for FilterExec { true } - fn relies_on_input_order(&self) -> bool { - false + fn equivalence_properties(&self) -> EquivalenceProperties { + // Combine the equal predicates with the input equivalence properties + let mut input_properties = self.input.equivalence_properties(); + let (equal_pairs, _ne_pairs) = collect_columns_from_predicate(&self.predicate); + for new_condition in equal_pairs { + input_properties.add_equal_conditions(new_condition) + } + input_properties } fn with_new_children( @@ -231,6 +241,38 @@ impl RecordBatchStream for FilterExecStream { } } +/// Return the equals Column-Pairs and Non-equals Column-Pairs +fn collect_columns_from_predicate(predicate: &Arc) -> EqualAndNonEqual { + let mut eq_predicate_columns: Vec<(&Column, &Column)> = Vec::new(); + let mut ne_predicate_columns: Vec<(&Column, &Column)> = Vec::new(); + + let predicates = split_conjunction(predicate); + predicates.into_iter().for_each(|p| { + if let Some(binary) = p.as_any().downcast_ref::() { + let left = binary.left(); + let right = binary.right(); + if left.as_any().is::() && right.as_any().is::() { + let left_column = left.as_any().downcast_ref::().unwrap(); + let right_column = right.as_any().downcast_ref::().unwrap(); + match binary.op() { + Operator::Eq => { + eq_predicate_columns.push((left_column, right_column)) + } + Operator::NotEq => { + ne_predicate_columns.push((left_column, right_column)) + } + _ => {} + } + } + } + }); + + (eq_predicate_columns, ne_predicate_columns) +} +/// The equals Column-Pairs and Non-equals Column-Pairs in the Predicates +pub type EqualAndNonEqual<'a> = + (Vec<(&'a Column, &'a Column)>, Vec<(&'a Column, &'a Column)>); + #[cfg(test)] mod tests { @@ -295,4 +337,47 @@ mod tests { Ok(()) } + + #[tokio::test] + async fn collect_columns_predicates() -> Result<()> { + let schema = test_util::aggr_test_schema(); + let predicate: Arc = binary( + binary( + binary(col("c2", &schema)?, Operator::GtEq, lit(1u32), &schema)?, + Operator::And, + binary(col("c2", &schema)?, Operator::Eq, lit(4u32), &schema)?, + &schema, + )?, + Operator::And, + binary( + binary( + col("c2", &schema)?, + Operator::Eq, + col("c9", &schema)?, + &schema, + )?, + Operator::And, + binary( + col("c1", &schema)?, + Operator::NotEq, + col("c13", &schema)?, + &schema, + )?, + &schema, + )?, + &schema, + )?; + + let (equal_pairs, ne_pairs) = collect_columns_from_predicate(&predicate); + + assert_eq!(1, equal_pairs.len()); + assert_eq!(equal_pairs[0].0.name(), "c2"); + assert_eq!(equal_pairs[0].1.name(), "c9"); + + assert_eq!(1, ne_pairs.len()); + assert_eq!(ne_pairs[0].0.name(), "c1"); + assert_eq!(ne_pairs[0].1.name(), "c13"); + + Ok(()) + } } diff --git a/datafusion/core/src/physical_plan/joins/cross_join.rs b/datafusion/core/src/physical_plan/joins/cross_join.rs index 7a35116a4658..a71e06ccec7a 100644 --- a/datafusion/core/src/physical_plan/joins/cross_join.rs +++ b/datafusion/core/src/physical_plan/joins/cross_join.rs @@ -29,16 +29,19 @@ use arrow::record_batch::RecordBatch; use crate::execution::context::TaskContext; use crate::physical_plan::{ coalesce_batches::concat_batches, coalesce_partitions::CoalescePartitionsExec, - ColumnStatistics, DisplayFormatType, ExecutionPlan, Partitioning, RecordBatchStream, - SendableRecordBatchStream, Statistics, + ColumnStatistics, DisplayFormatType, EquivalenceProperties, ExecutionPlan, + Partitioning, PhysicalSortExpr, RecordBatchStream, SendableRecordBatchStream, + Statistics, }; use crate::{error::Result, scalar::ScalarValue}; use async_trait::async_trait; -use datafusion_physical_expr::PhysicalSortExpr; use log::debug; use std::time::Instant; -use super::utils::{check_join_is_valid, OnceAsync, OnceFut}; +use super::utils::{ + adjust_right_output_partitioning, check_join_is_valid, + cross_join_equivalence_properties, OnceAsync, OnceFut, +}; /// Data of the left side type JoinLeftData = RecordBatch; @@ -153,16 +156,27 @@ impl ExecutionPlan for CrossJoinExec { )?)) } + // TODO optimize CrossJoin implementation to generate M * N partitions fn output_partitioning(&self) -> Partitioning { - self.right.output_partitioning() + let left_columns_len = self.left.schema().fields.len(); + adjust_right_output_partitioning( + self.right.output_partitioning(), + left_columns_len, + ) } + // TODO check the output ordering of CrossJoin fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { None } - fn relies_on_input_order(&self) -> bool { - false + fn equivalence_properties(&self) -> EquivalenceProperties { + let left_columns_len = self.left.schema().fields.len(); + cross_join_equivalence_properties( + self.left.equivalence_properties(), + self.right.equivalence_properties(), + left_columns_len, + ) } fn execute( diff --git a/datafusion/core/src/physical_plan/joins/hash_join.rs b/datafusion/core/src/physical_plan/joins/hash_join.rs index 839a24112a0e..07b4d6f4a109 100644 --- a/datafusion/core/src/physical_plan/joins/hash_join.rs +++ b/datafusion/core/src/physical_plan/joins/hash_join.rs @@ -62,12 +62,13 @@ use crate::physical_plan::{ expressions::PhysicalSortExpr, hash_utils::create_hashes, joins::utils::{ - build_join_schema, check_join_is_valid, estimate_join_statistics, ColumnIndex, - JoinFilter, JoinOn, JoinSide, + adjust_right_output_partitioning, build_join_schema, check_join_is_valid, + combine_join_equivalence_properties, estimate_join_statistics, + partitioned_join_output_partitioning, ColumnIndex, JoinFilter, JoinOn, JoinSide, }, metrics::{self, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet}, - DisplayFormatType, ExecutionPlan, Partitioning, PhysicalExpr, RecordBatchStream, - SendableRecordBatchStream, Statistics, + DisplayFormatType, Distribution, EquivalenceProperties, ExecutionPlan, Partitioning, + PhysicalExpr, RecordBatchStream, SendableRecordBatchStream, Statistics, }; use crate::error::{DataFusionError, Result}; @@ -270,6 +271,75 @@ impl ExecutionPlan for HashJoinExec { self.schema.clone() } + fn required_input_distribution(&self) -> Vec { + match self.mode { + PartitionMode::CollectLeft => vec![ + Distribution::SinglePartition, + Distribution::UnspecifiedDistribution, + ], + PartitionMode::Partitioned => { + let (left_expr, right_expr) = self + .on + .iter() + .map(|(l, r)| { + ( + Arc::new(l.clone()) as Arc, + Arc::new(r.clone()) as Arc, + ) + }) + .unzip(); + vec![ + Distribution::HashPartitioned(left_expr), + Distribution::HashPartitioned(right_expr), + ] + } + } + } + + fn output_partitioning(&self) -> Partitioning { + let left_columns_len = self.left.schema().fields.len(); + match self.mode { + PartitionMode::CollectLeft => match self.join_type { + JoinType::Inner | JoinType::Right => adjust_right_output_partitioning( + self.right.output_partitioning(), + left_columns_len, + ), + JoinType::RightSemi | JoinType::RightAnti => { + self.right.output_partitioning() + } + JoinType::Left + | JoinType::LeftSemi + | JoinType::LeftAnti + | JoinType::Full => Partitioning::UnknownPartitioning( + self.right.output_partitioning().partition_count(), + ), + }, + PartitionMode::Partitioned => partitioned_join_output_partitioning( + self.join_type, + self.left.output_partitioning(), + self.right.output_partitioning(), + left_columns_len, + ), + } + } + + // TODO Output ordering might be kept for some cases. + // For example if it is inner join then the stream side order can be kept + fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { + None + } + + fn equivalence_properties(&self) -> EquivalenceProperties { + let left_columns_len = self.left.schema().fields.len(); + combine_join_equivalence_properties( + self.join_type, + self.left.equivalence_properties(), + self.right.equivalence_properties(), + left_columns_len, + self.on(), + ) + } + fn children(&self) -> Vec> { vec![self.left.clone(), self.right.clone()] } @@ -289,18 +359,6 @@ impl ExecutionPlan for HashJoinExec { )?)) } - fn output_partitioning(&self) -> Partitioning { - self.right.output_partitioning() - } - - fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { - None - } - - fn relies_on_input_order(&self) -> bool { - false - } - fn execute( &self, partition: usize, diff --git a/datafusion/core/src/physical_plan/joins/sort_merge_join.rs b/datafusion/core/src/physical_plan/joins/sort_merge_join.rs index 92392455e440..44771ba4c27b 100644 --- a/datafusion/core/src/physical_plan/joins/sort_merge_join.rs +++ b/datafusion/core/src/physical_plan/joins/sort_merge_join.rs @@ -43,14 +43,17 @@ use crate::physical_plan::common::combine_batches; use crate::physical_plan::expressions::Column; use crate::physical_plan::expressions::PhysicalSortExpr; use crate::physical_plan::joins::utils::{ - build_join_schema, check_join_is_valid, JoinOn, + build_join_schema, check_join_is_valid, combine_join_equivalence_properties, + partitioned_join_output_partitioning, JoinOn, }; use crate::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricBuilder, MetricsSet}; use crate::physical_plan::{ - metrics, DisplayFormatType, ExecutionPlan, Partitioning, RecordBatchStream, - SendableRecordBatchStream, Statistics, + metrics, DisplayFormatType, Distribution, EquivalenceProperties, ExecutionPlan, + Partitioning, PhysicalExpr, RecordBatchStream, SendableRecordBatchStream, Statistics, }; +use datafusion_physical_expr::rewrite::TreeNodeRewritable; + /// join execution plan executes partitions in parallel and combines them into a set of /// partitions. #[derive(Debug)] @@ -67,6 +70,12 @@ pub struct SortMergeJoinExec { schema: SchemaRef, /// Execution metrics metrics: ExecutionPlanMetricsSet, + /// The left SortExpr + left_sort_exprs: Vec, + /// The right SortExpr + right_sort_exprs: Vec, + /// The output ordering + output_ordering: Option>, /// Sort options of join columns used in sorting left and right execution plans sort_options: Vec, /// If null_equals_null is true, null == null else null != null @@ -104,6 +113,75 @@ impl SortMergeJoinExec { ))); } + let (left_expr, right_expr): (Vec<_>, Vec<_>) = on + .iter() + .map(|(l, r)| { + ( + Arc::new(l.clone()) as Arc, + Arc::new(r.clone()) as Arc, + ) + }) + .unzip(); + + let left_sort_exprs = left_expr + .into_iter() + .zip(sort_options.iter()) + .map(|(k, sort_op)| PhysicalSortExpr { + expr: k, + options: *sort_op, + }) + .collect::>(); + + let right_sort_exprs = right_expr + .into_iter() + .zip(sort_options.iter()) + .map(|(k, sort_op)| PhysicalSortExpr { + expr: k, + options: *sort_op, + }) + .collect::>(); + + let output_ordering = match join_type { + JoinType::Inner + | JoinType::Left + | JoinType::LeftSemi + | JoinType::LeftAnti => { + left.output_ordering().map(|sort_exprs| sort_exprs.to_vec()) + } + JoinType::RightSemi | JoinType::RightAnti => right + .output_ordering() + .map(|sort_exprs| sort_exprs.to_vec()), + JoinType::Right => { + let left_columns_len = left.schema().fields.len(); + right.output_ordering().map(|sort_exprs| { + sort_exprs + .iter() + .map(|e| { + let new_expr = e + .expr + .clone() + .transform_down(&|e| match e + .as_any() + .downcast_ref::() + { + Some(col) => Some(Arc::new(Column::new( + col.name(), + left_columns_len + col.index(), + ))), + None => None, + }) + .unwrap(); + PhysicalSortExpr { + expr: new_expr, + options: e.options, + } + }) + .collect::>() + }) + } + JoinType::Full => None, + }; + let schema = Arc::new(build_join_schema(&left_schema, &right_schema, &join_type).0); @@ -114,10 +192,18 @@ impl SortMergeJoinExec { join_type, schema, metrics: ExecutionPlanMetricsSet::new(), + left_sort_exprs, + right_sort_exprs, + output_ordering, sort_options, null_equals_null, }) } + + /// Set of common columns used to join on + pub fn on(&self) -> &[(Column, Column)] { + &self.on + } } impl ExecutionPlan for SortMergeJoinExec { @@ -129,25 +215,50 @@ impl ExecutionPlan for SortMergeJoinExec { self.schema.clone() } + fn required_input_distribution(&self) -> Vec { + let (left_expr, right_expr) = self + .on + .iter() + .map(|(l, r)| { + ( + Arc::new(l.clone()) as Arc, + Arc::new(r.clone()) as Arc, + ) + }) + .unzip(); + vec![ + Distribution::HashPartitioned(left_expr), + Distribution::HashPartitioned(right_expr), + ] + } + + fn required_input_ordering(&self) -> Vec> { + vec![Some(&self.left_sort_exprs), Some(&self.right_sort_exprs)] + } + fn output_partitioning(&self) -> Partitioning { - self.right.output_partitioning() + let left_columns_len = self.left.schema().fields.len(); + partitioned_join_output_partitioning( + self.join_type, + self.left.output_partitioning(), + self.right.output_partitioning(), + left_columns_len, + ) } fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { - match self.join_type { - JoinType::Inner - | JoinType::Left - | JoinType::LeftSemi - | JoinType::LeftAnti => self.left.output_ordering(), - JoinType::Right | JoinType::RightSemi | JoinType::RightAnti => { - self.right.output_ordering() - } - JoinType::Full => None, - } + self.output_ordering.as_deref() } - fn relies_on_input_order(&self) -> bool { - true + fn equivalence_properties(&self) -> EquivalenceProperties { + let left_columns_len = self.left.schema().fields.len(); + combine_join_equivalence_properties( + self.join_type, + self.left.equivalence_properties(), + self.right.equivalence_properties(), + left_columns_len, + self.on(), + ) } fn children(&self) -> Vec> { @@ -228,8 +339,8 @@ impl ExecutionPlan for SortMergeJoinExec { DisplayFormatType::Default => { write!( f, - "SortMergeJoin: join_type={:?}, on={:?}, schema={:?}", - self.join_type, self.on, &self.schema + "SortMergeJoin: join_type={:?}, on={:?}", + self.join_type, self.on ) } } diff --git a/datafusion/core/src/physical_plan/joins/utils.rs b/datafusion/core/src/physical_plan/joins/utils.rs index d041e7dfb6c9..905e59de966f 100644 --- a/datafusion/core/src/physical_plan/joins/utils.rs +++ b/datafusion/core/src/physical_plan/joins/utils.rs @@ -23,7 +23,7 @@ use crate::physical_plan::expressions::Column; use arrow::datatypes::{Field, Schema}; use arrow::error::ArrowError; use datafusion_common::ScalarValue; -use datafusion_physical_expr::PhysicalExpr; +use datafusion_physical_expr::{EquivalentClass, PhysicalExpr}; use futures::future::{BoxFuture, Shared}; use futures::{ready, FutureExt}; use parking_lot::Mutex; @@ -33,7 +33,10 @@ use std::future::Future; use std::sync::Arc; use std::task::{Context, Poll}; -use crate::physical_plan::{ColumnStatistics, ExecutionPlan, Statistics}; +use crate::physical_plan::{ + ColumnStatistics, EquivalenceProperties, ExecutionPlan, Partitioning, Statistics, +}; +use datafusion_physical_expr::rewrite::TreeNodeRewritable; /// The on clause of the join, as vector of (left, right) columns. pub type JoinOn = Vec<(Column, Column)>; @@ -83,6 +86,128 @@ fn check_join_set_is_valid( Ok(()) } +/// Calculate the OutputPartitioning for Partitioned Join +pub fn partitioned_join_output_partitioning( + join_type: JoinType, + left_partitioning: Partitioning, + right_partitioning: Partitioning, + left_columns_len: usize, +) -> Partitioning { + match join_type { + JoinType::Inner | JoinType::Left | JoinType::LeftSemi | JoinType::LeftAnti => { + left_partitioning + } + JoinType::RightSemi | JoinType::RightAnti => right_partitioning, + JoinType::Right => { + adjust_right_output_partitioning(right_partitioning, left_columns_len) + } + JoinType::Full => { + Partitioning::UnknownPartitioning(right_partitioning.partition_count()) + } + } +} + +/// Adjust the right out partitioning to new Column Index +pub fn adjust_right_output_partitioning( + right_partitioning: Partitioning, + left_columns_len: usize, +) -> Partitioning { + match right_partitioning { + Partitioning::RoundRobinBatch(size) => Partitioning::RoundRobinBatch(size), + Partitioning::UnknownPartitioning(size) => { + Partitioning::UnknownPartitioning(size) + } + Partitioning::Hash(exprs, size) => { + let new_exprs = exprs + .into_iter() + .map(|expr| { + expr.transform_down(&|e| match e.as_any().downcast_ref::() { + Some(col) => Some(Arc::new(Column::new( + col.name(), + left_columns_len + col.index(), + ))), + None => None, + }) + .unwrap() + }) + .collect::>(); + Partitioning::Hash(new_exprs, size) + } + } +} + +/// Combine the Equivalence Properties for Join Node +pub fn combine_join_equivalence_properties( + join_type: JoinType, + left_properties: EquivalenceProperties, + right_properties: EquivalenceProperties, + left_columns_len: usize, + on: &[(Column, Column)], +) -> EquivalenceProperties { + let mut new_properties = match join_type { + JoinType::Inner | JoinType::Left | JoinType::Full | JoinType::Right => { + let mut left_properties = left_properties; + let new_right_properties = right_properties + .classes() + .iter() + .map(|prop| { + let new_head = Column::new( + prop.head().name(), + left_columns_len + prop.head().index(), + ); + let new_others = prop + .others() + .iter() + .map(|col| { + Column::new(col.name(), left_columns_len + col.index()) + }) + .collect::>(); + EquivalentClass::new(new_head, new_others) + }) + .collect::>(); + + left_properties.extend(new_right_properties); + left_properties + } + JoinType::LeftSemi | JoinType::LeftAnti => left_properties, + JoinType::RightSemi | JoinType::RightAnti => right_properties, + }; + + if join_type == JoinType::Inner { + on.iter().for_each(|(column1, column2)| { + let new_column2 = + Column::new(column2.name(), left_columns_len + column2.index()); + new_properties.add_equal_conditions((column1, &new_column2)) + }) + } + new_properties +} + +/// Calculate the Equivalence Properties for CrossJoin Node +pub fn cross_join_equivalence_properties( + left_properties: EquivalenceProperties, + right_properties: EquivalenceProperties, + left_columns_len: usize, +) -> EquivalenceProperties { + let mut left_properties = left_properties; + let new_right_properties = right_properties + .classes() + .iter() + .map(|prop| { + let new_head = + Column::new(prop.head().name(), left_columns_len + prop.head().index()); + let new_others = prop + .others() + .iter() + .map(|col| Column::new(col.name(), left_columns_len + col.index())) + .collect::>(); + EquivalentClass::new(new_head, new_others) + }) + .collect::>(); + left_properties.extend(new_right_properties); + left_properties +} + /// Used in ColumnIndex to distinguish which side the index is for #[derive(Debug, Clone)] pub enum JoinSide { diff --git a/datafusion/core/src/physical_plan/limit.rs b/datafusion/core/src/physical_plan/limit.rs index 322c21ff419c..171e2b2f0f64 100644 --- a/datafusion/core/src/physical_plan/limit.rs +++ b/datafusion/core/src/physical_plan/limit.rs @@ -27,7 +27,7 @@ use std::task::{Context, Poll}; use crate::error::{DataFusionError, Result}; use crate::physical_plan::{ - DisplayFormatType, Distribution, ExecutionPlan, Partitioning, + DisplayFormatType, Distribution, EquivalenceProperties, ExecutionPlan, Partitioning, }; use arrow::array::ArrayRef; use arrow::compute::limit; @@ -98,10 +98,9 @@ impl ExecutionPlan for GlobalLimitExec { vec![self.input.clone()] } - fn required_child_distribution(&self) -> Distribution { - Distribution::SinglePartition + fn required_input_distribution(&self) -> Vec { + vec![Distribution::SinglePartition] } - /// Get the output partitioning of this plan fn output_partitioning(&self) -> Partitioning { Partitioning::UnknownPartitioning(1) @@ -123,6 +122,10 @@ impl ExecutionPlan for GlobalLimitExec { self.input.output_ordering() } + fn equivalence_properties(&self) -> EquivalenceProperties { + self.input.equivalence_properties() + } + fn with_new_children( self: Arc, children: Vec>, @@ -281,14 +284,13 @@ impl ExecutionPlan for LocalLimitExec { false } - // Local limit does not make any attempt to maintain the input - // sortedness (if there is more than one partition) + // Local limit will not change the input plan's ordering fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { - if self.output_partitioning().partition_count() == 1 { - self.input.output_ordering() - } else { - None - } + self.input.output_ordering() + } + + fn equivalence_properties(&self) -> EquivalenceProperties { + self.input.equivalence_properties() } fn with_new_children( diff --git a/datafusion/core/src/physical_plan/memory.rs b/datafusion/core/src/physical_plan/memory.rs index 698eaf12a273..7753a5ba7765 100644 --- a/datafusion/core/src/physical_plan/memory.rs +++ b/datafusion/core/src/physical_plan/memory.rs @@ -81,10 +81,6 @@ impl ExecutionPlan for MemoryExec { None } - fn relies_on_input_order(&self) -> bool { - false - } - fn with_new_children( self: Arc, _: Vec>, diff --git a/datafusion/core/src/physical_plan/mod.rs b/datafusion/core/src/physical_plan/mod.rs index 55b46c9915a7..87df26781da9 100644 --- a/datafusion/core/src/physical_plan/mod.rs +++ b/datafusion/core/src/physical_plan/mod.rs @@ -122,10 +122,20 @@ pub trait ExecutionPlan: Debug + Send + Sync { /// have any particular output order here fn output_ordering(&self) -> Option<&[PhysicalSortExpr]>; - /// Specifies the data distribution requirements of all the - /// children for this operator - fn required_child_distribution(&self) -> Distribution { - Distribution::UnspecifiedDistribution + /// Specifies the data distribution requirements for all the + /// children for this operator, By default it's [[Distribution::UnspecifiedDistribution]] for each child, + fn required_input_distribution(&self) -> Vec { + if !self.children().is_empty() { + vec![Distribution::UnspecifiedDistribution; self.children().len()] + } else { + vec![Distribution::UnspecifiedDistribution] + } + } + + /// Specifies the ordering requirements for all the + /// children for this operator. + fn required_input_ordering(&self) -> Vec> { + vec![None; self.children().len()] } /// Returns `true` if this operator relies on its inputs being @@ -136,13 +146,17 @@ pub trait ExecutionPlan: Debug + Send + Sync { /// optimizations which might reorder the inputs (such as /// repartitioning to increase concurrency). /// - /// The default implementation returns `true` + /// The default implementation checks the input ordering requirements + /// and if there is non empty ordering requirements to the input, the method will + /// return `true`. /// /// WARNING: if you override this default and return `false`, your /// operator can not rely on DataFusion preserving the input order /// as it will likely not. fn relies_on_input_order(&self) -> bool { - true + self.required_input_ordering() + .iter() + .any(|ordering| matches!(ordering, Some(_))) } /// Returns `false` if this operator's implementation may reorder @@ -175,10 +189,15 @@ pub trait ExecutionPlan: Debug + Send + Sync { fn benefits_from_input_partitioning(&self) -> bool { // By default try to maximize parallelism with more CPUs if // possible - !matches!( - self.required_child_distribution(), - Distribution::SinglePartition - ) + !self + .required_input_distribution() + .into_iter() + .any(|dist| matches!(dist, Distribution::SinglePartition)) + } + + /// Get the EquivalenceProperties within the plan + fn equivalence_properties(&self) -> EquivalenceProperties { + EquivalenceProperties::new() } /// Get a list of child execution plans that provide the input for this plan. The returned list @@ -460,6 +479,23 @@ impl Partitioning { } } +impl PartialEq for Partitioning { + fn eq(&self, other: &Partitioning) -> bool { + match (self, other) { + ( + Partitioning::RoundRobinBatch(count1), + Partitioning::RoundRobinBatch(count2), + ) if count1 == count2 => true, + (Partitioning::Hash(exprs1, count1), Partitioning::Hash(exprs2, count2)) + if expr_list_eq_strict_order(exprs1, exprs2) && (count1 == count2) => + { + true + } + _ => false, + } + } +} + /// Distribution schemes #[derive(Debug, Clone)] pub enum Distribution { @@ -472,7 +508,10 @@ pub enum Distribution { HashPartitioned(Vec>), } +use datafusion_physical_expr::expr_list_eq_strict_order; +use datafusion_physical_expr::expressions::Column; pub use datafusion_physical_expr::window::WindowExpr; +use datafusion_physical_expr::EquivalenceProperties; pub use datafusion_physical_expr::{AggregateExpr, PhysicalExpr}; /// Applies an optional projection to a [`SchemaRef`], returning the diff --git a/datafusion/core/src/physical_plan/planner.rs b/datafusion/core/src/physical_plan/planner.rs index 17e09497fafe..a34f16b0a658 100644 --- a/datafusion/core/src/physical_plan/planner.rs +++ b/datafusion/core/src/physical_plan/planner.rs @@ -522,8 +522,9 @@ impl DefaultPhysicalPlanner { && session_state.config.target_partitions > 1 && session_state.config.repartition_windows; - let input_exec = if can_repartition { - let partition_keys = partition_keys + let physical_partition_keys = if can_repartition + { + partition_keys .iter() .map(|e| { self.create_physical_expr( @@ -533,11 +534,16 @@ impl DefaultPhysicalPlanner { session_state, ) }) - .collect::>>>()?; + .collect::>>>()? + } else { + vec![] + }; + + let input_exec = if can_repartition { Arc::new(RepartitionExec::try_new( input_exec, Partitioning::Hash( - partition_keys, + physical_partition_keys.clone(), session_state.config.target_partitions, ), )?) @@ -576,8 +582,8 @@ impl DefaultPhysicalPlanner { let logical_input_schema = input.schema(); - let input_exec = if sort_keys.is_empty() { - input_exec + let physical_sort_keys = if sort_keys.is_empty() { + None } else { let physical_input_schema = input_exec.schema(); let sort_keys = sort_keys @@ -600,13 +606,19 @@ impl DefaultPhysicalPlanner { _ => unreachable!(), }) .collect::>>()?; - Arc::new(if can_repartition { - SortExec::new_with_partitioning(sort_keys, input_exec, true, None) - } else { - SortExec::try_new(sort_keys, input_exec, None)? - }) + Some(sort_keys) }; + let input_exec = match physical_sort_keys.clone() { + None => input_exec, + Some(sort_exprs) => { + if can_repartition { + Arc::new(SortExec::new_with_partitioning(sort_exprs, input_exec, true, None)) + } else { + Arc::new(SortExec::try_new(sort_exprs, input_exec, None)?) + } + }, + }; let physical_input_schema = input_exec.schema(); let window_expr = window_expr .iter() @@ -624,6 +636,8 @@ impl DefaultPhysicalPlanner { window_expr, input_exec, physical_input_schema, + physical_partition_keys, + physical_sort_keys, )?)) } LogicalPlan::Aggregate(Aggregate { @@ -2306,10 +2320,6 @@ mod tests { None } - fn relies_on_input_order(&self) -> bool { - false - } - fn children(&self) -> Vec> { vec![] } diff --git a/datafusion/core/src/physical_plan/projection.rs b/datafusion/core/src/physical_plan/projection.rs index 5fa3c93cdd42..2b6297f8c9f5 100644 --- a/datafusion/core/src/physical_plan/projection.rs +++ b/datafusion/core/src/physical_plan/projection.rs @@ -21,14 +21,15 @@ //! projection expressions. `SELECT` without `FROM` will only evaluate expressions. use std::any::Any; -use std::collections::BTreeMap; +use std::collections::{BTreeMap, HashMap}; use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; use crate::error::Result; use crate::physical_plan::{ - ColumnStatistics, DisplayFormatType, ExecutionPlan, Partitioning, PhysicalExpr, + ColumnStatistics, DisplayFormatType, EquivalenceProperties, ExecutionPlan, + Partitioning, PhysicalExpr, }; use arrow::datatypes::{Field, Schema, SchemaRef}; use arrow::error::Result as ArrowResult; @@ -39,6 +40,7 @@ use super::expressions::{Column, PhysicalSortExpr}; use super::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet}; use super::{RecordBatchStream, SendableRecordBatchStream, Statistics}; use crate::execution::context::TaskContext; +use datafusion_physical_expr::normalize_out_expr_with_alias_schema; use futures::stream::Stream; use futures::stream::StreamExt; @@ -51,6 +53,11 @@ pub struct ProjectionExec { schema: SchemaRef, /// The input plan input: Arc, + /// The output ordering + output_ordering: Option>, + /// The alias map used to normalize out expressions like Partitioning and PhysicalSortExpr + /// The key is the column from the input schema and the values are the columns from the output schema + alias_map: HashMap>, /// Execution metrics metrics: ExecutionPlanMetricsSet, } @@ -82,10 +89,47 @@ impl ProjectionExec { input_schema.metadata().clone(), )); + let mut alias_map: HashMap> = HashMap::new(); + for (expression, name) in expr.iter() { + if let Some(column) = expression.as_any().downcast_ref::() { + let new_col_idx = schema.index_of(name)?; + // When the column name is the same, but index does not equal, treat it as Alias + if (column.name() != name) || (column.index() != new_col_idx) { + let entry = alias_map.entry(column.clone()).or_insert_with(Vec::new); + entry.push(Column::new(name, new_col_idx)); + } + }; + } + + // Output Ordering need to respect the alias + let child_output_ordering = input.output_ordering(); + let output_ordering = match child_output_ordering { + Some(sort_exprs) => { + let normalized_exprs = sort_exprs + .iter() + .map(|sort_expr| { + let expr = normalize_out_expr_with_alias_schema( + sort_expr.expr.clone(), + &alias_map, + &schema, + ); + PhysicalSortExpr { + expr, + options: sort_expr.options, + } + }) + .collect::>(); + Some(normalized_exprs) + } + None => None, + }; + Ok(Self { expr, schema, input: input.clone(), + output_ordering, + alias_map, metrics: ExecutionPlanMetricsSet::new(), }) } @@ -118,11 +162,28 @@ impl ExecutionPlan for ProjectionExec { /// Get the output partitioning of this plan fn output_partitioning(&self) -> Partitioning { - self.input.output_partitioning() + // Output partition need to respect the alias + let input_partition = self.input.output_partitioning(); + match input_partition { + Partitioning::Hash(exprs, part) => { + let normalized_exprs = exprs + .into_iter() + .map(|expr| { + normalize_out_expr_with_alias_schema( + expr, + &self.alias_map, + &self.schema, + ) + }) + .collect::>(); + Partitioning::Hash(normalized_exprs, part) + } + _ => input_partition, + } } fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { - self.input.output_ordering() + self.output_ordering.as_deref() } fn maintains_input_order(&self) -> bool { @@ -130,8 +191,15 @@ impl ExecutionPlan for ProjectionExec { true } - fn relies_on_input_order(&self) -> bool { - false + // Equivalence properties need to be adjusted after the Projection. + // 1) Add Alias, Alias can introduce additional equivalence properties, + // For example: Projection(a, a as a1, a as a2) + // 2) Truncate the properties that are not in the schema of the Projection + fn equivalence_properties(&self) -> EquivalenceProperties { + let mut input_equivalence_properties = self.input.equivalence_properties(); + input_equivalence_properties.merge_properties_with_alias(&self.alias_map); + input_equivalence_properties.truncate_properties_not_in_schema(&self.schema); + input_equivalence_properties } fn with_new_children( diff --git a/datafusion/core/src/physical_plan/repartition.rs b/datafusion/core/src/physical_plan/repartition.rs index 4c057b1d6c52..3e50ec645288 100644 --- a/datafusion/core/src/physical_plan/repartition.rs +++ b/datafusion/core/src/physical_plan/repartition.rs @@ -25,7 +25,9 @@ use std::{any::Any, vec}; use crate::error::{DataFusionError, Result}; use crate::physical_plan::hash_utils::create_hashes; -use crate::physical_plan::{DisplayFormatType, ExecutionPlan, Partitioning, Statistics}; +use crate::physical_plan::{ + DisplayFormatType, EquivalenceProperties, ExecutionPlan, Partitioning, Statistics, +}; use arrow::array::{ArrayRef, UInt64Builder}; use arrow::datatypes::SchemaRef; use arrow::error::Result as ArrowResult; @@ -272,10 +274,6 @@ impl ExecutionPlan for RepartitionExec { vec![self.input.clone()] } - fn relies_on_input_order(&self) -> bool { - false - } - fn with_new_children( self: Arc, children: Vec>, @@ -294,6 +292,10 @@ impl ExecutionPlan for RepartitionExec { None } + fn equivalence_properties(&self) -> EquivalenceProperties { + self.input.equivalence_properties() + } + fn execute( &self, partition: usize, diff --git a/datafusion/core/src/physical_plan/sorts/sort.rs b/datafusion/core/src/physical_plan/sorts/sort.rs index cd14f27b6b23..fb289b68b5bb 100644 --- a/datafusion/core/src/physical_plan/sorts/sort.rs +++ b/datafusion/core/src/physical_plan/sorts/sort.rs @@ -46,6 +46,7 @@ use arrow::error::{ArrowError, Result as ArrowResult}; use arrow::ipc::reader::FileReader; use arrow::record_batch::RecordBatch; use async_trait::async_trait; +use datafusion_physical_expr::EquivalenceProperties; use futures::lock::Mutex; use futures::{Stream, StreamExt, TryFutureExt, TryStreamExt}; use log::{debug, error}; @@ -743,11 +744,13 @@ impl ExecutionPlan for SortExec { } } - fn required_child_distribution(&self) -> Distribution { + fn required_input_distribution(&self) -> Vec { if self.preserve_partitioning { - Distribution::UnspecifiedDistribution + vec![Distribution::UnspecifiedDistribution] } else { - Distribution::SinglePartition + // global sort + // TODO support RangePartition and OrderedDistribution + vec![Distribution::SinglePartition] } } @@ -755,11 +758,6 @@ impl ExecutionPlan for SortExec { vec![self.input.clone()] } - fn relies_on_input_order(&self) -> bool { - // this operator resorts everything - false - } - fn benefits_from_input_partitioning(&self) -> bool { false } @@ -768,6 +766,10 @@ impl ExecutionPlan for SortExec { Some(&self.expr) } + fn equivalence_properties(&self) -> EquivalenceProperties { + self.input.equivalence_properties() + } + fn with_new_children( self: Arc, children: Vec>, diff --git a/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs b/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs index a8c6fe4e6a93..73a507037b91 100644 --- a/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs +++ b/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs @@ -49,6 +49,7 @@ use crate::physical_plan::{ Distribution, ExecutionPlan, Partitioning, PhysicalExpr, RecordBatchStream, SendableRecordBatchStream, Statistics, }; +use datafusion_physical_expr::EquivalenceProperties; /// Sort preserving merge execution plan /// @@ -123,18 +124,22 @@ impl ExecutionPlan for SortPreservingMergeExec { Partitioning::UnknownPartitioning(1) } - fn required_child_distribution(&self) -> Distribution { - Distribution::UnspecifiedDistribution + fn required_input_distribution(&self) -> Vec { + vec![Distribution::UnspecifiedDistribution] } - fn relies_on_input_order(&self) -> bool { - true + fn required_input_ordering(&self) -> Vec> { + vec![Some(&self.expr)] } fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { Some(&self.expr) } + fn equivalence_properties(&self) -> EquivalenceProperties { + self.input.equivalence_properties() + } + fn children(&self) -> Vec> { vec![self.input.clone()] } diff --git a/datafusion/core/src/physical_plan/union.rs b/datafusion/core/src/physical_plan/union.rs index bf9dfbd1b694..af57c9ef9cc2 100644 --- a/datafusion/core/src/physical_plan/union.rs +++ b/datafusion/core/src/physical_plan/union.rs @@ -21,15 +21,19 @@ //! The Union operator combines multiple inputs with the same schema +use std::pin::Pin; +use std::task::{Context, Poll}; use std::{any::Any, sync::Arc}; +use arrow::error::Result as ArrowResult; use arrow::{ datatypes::{Field, Schema, SchemaRef}, record_batch::RecordBatch, }; -use futures::StreamExt; +use futures::{Stream, StreamExt}; use itertools::Itertools; use log::debug; +use log::warn; use super::{ expressions::PhysicalSortExpr, @@ -42,6 +46,8 @@ use crate::{ error::Result, physical_plan::{expressions, metrics::BaselineMetrics}, }; +use datafusion_physical_expr::sort_expr_list_eq_strict_order; +use tokio::macros::support::thread_rng_n; /// UNION ALL execution plan #[derive(Debug)] @@ -52,6 +58,8 @@ pub struct UnionExec { metrics: ExecutionPlanMetricsSet, /// Schema of Union schema: SchemaRef, + /// Partition aware Union + partition_aware: bool, } impl UnionExec { @@ -78,10 +86,24 @@ impl UnionExec { inputs[0].schema().metadata().clone(), )); + // If all the input partitions have the same Hash partition spec with the first_input_partition + // The UnionExec is partition aware. + // + // It might be too strict here in the case that the input partition specs are compatible but not exactly the same. + // For example one input partition has the partition spec Hash('a','b','c') and + // other has the partition spec Hash('a'), It is safe to derive the out partition with the spec Hash('a','b','c'). + let first_input_partition = inputs[0].output_partitioning(); + let partition_aware = matches!(first_input_partition, Partitioning::Hash(_, _)) + && inputs + .iter() + .map(|plan| plan.output_partitioning()) + .all(|partition| partition == first_input_partition); + UnionExec { inputs, metrics: ExecutionPlanMetricsSet::new(), schema, + partition_aware, } } @@ -107,23 +129,46 @@ impl ExecutionPlan for UnionExec { /// Output of the union is the combination of all output partitions of the inputs fn output_partitioning(&self) -> Partitioning { - // Sums all the output partitions - let num_partitions = self - .inputs - .iter() - .map(|plan| plan.output_partitioning().partition_count()) - .sum(); - // TODO: this loses partitioning info in case of same partitioning scheme (for example `Partitioning::Hash`) - // https://issues.apache.org/jira/browse/ARROW-11991 - Partitioning::UnknownPartitioning(num_partitions) + if self.partition_aware { + self.inputs[0].output_partitioning() + } else { + // Output the combination of all output partitions of the inputs if the Union is not partition aware + let num_partitions = self + .inputs + .iter() + .map(|plan| plan.output_partitioning().partition_count()) + .sum(); + + Partitioning::UnknownPartitioning(num_partitions) + } } fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { - None - } - - fn relies_on_input_order(&self) -> bool { - false + let first_input_ordering = self.inputs[0].output_ordering(); + // If the Union is not partition aware and all the input ordering spec strictly equal with the first_input_ordering + // Return the first_input_ordering as the output_ordering + // + // It might be too strict here in the case that the input ordering are compatible but not exactly the same. + // For example one input ordering has the ordering spec SortExpr('a','b','c') and the other has the ordering + // spec SortExpr('a'), It is safe to derive the out ordering with the spec SortExpr('a'). + if !self.partition_aware + && first_input_ordering.is_some() + && self + .inputs + .iter() + .map(|plan| plan.output_ordering()) + .all(|ordering| { + ordering.is_some() + && sort_expr_list_eq_strict_order( + ordering.unwrap(), + first_input_ordering.unwrap(), + ) + }) + { + first_input_ordering + } else { + None + } } fn with_new_children( @@ -145,19 +190,38 @@ impl ExecutionPlan for UnionExec { let elapsed_compute = baseline_metrics.elapsed_compute().clone(); let _timer = elapsed_compute.timer(); // record on drop - // find partition to execute - for input in self.inputs.iter() { - // Calculate whether partition belongs to the current partition - if partition < input.output_partitioning().partition_count() { - let stream = input.execute(partition, context)?; - debug!("Found a Union partition to execute"); + if self.partition_aware { + let mut input_stream_vec = vec![]; + for input in self.inputs.iter() { + if partition < input.output_partitioning().partition_count() { + input_stream_vec.push(input.execute(partition, context.clone())?); + } else { + // Do not find a partition to execute + break; + } + } + if input_stream_vec.len() == self.inputs.len() { + let stream = Box::pin(CombinedRecordBatchStream::new( + self.schema(), + input_stream_vec, + )); return Ok(Box::pin(ObservedStream::new(stream, baseline_metrics))); - } else { - partition -= input.output_partitioning().partition_count(); + } + } else { + // find partition to execute + for input in self.inputs.iter() { + // Calculate whether partition belongs to the current partition + if partition < input.output_partitioning().partition_count() { + let stream = input.execute(partition, context)?; + debug!("Found a Union partition to execute"); + return Ok(Box::pin(ObservedStream::new(stream, baseline_metrics))); + } else { + partition -= input.output_partitioning().partition_count(); + } } } - debug!("Error in Union: Partition {} not found", partition); + warn!("Error in Union: Partition {} not found", partition); Err(crate::error::DataFusionError::Execution(format!( "Partition {} not found in Union", @@ -194,6 +258,73 @@ impl ExecutionPlan for UnionExec { } } +/// CombinedRecordBatchStream can be used to combine a Vec of SendableRecordBatchStreams into one +pub struct CombinedRecordBatchStream { + /// Schema wrapped by Arc + schema: SchemaRef, + /// Stream entries + entries: Vec, +} + +impl CombinedRecordBatchStream { + /// Create an CombinedRecordBatchStream + pub fn new(schema: SchemaRef, entries: Vec) -> Self { + Self { schema, entries } + } +} + +impl RecordBatchStream for CombinedRecordBatchStream { + fn schema(&self) -> SchemaRef { + self.schema.clone() + } +} + +impl Stream for CombinedRecordBatchStream { + type Item = ArrowResult; + + fn poll_next( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + use Poll::*; + + let start = thread_rng_n(self.entries.len() as u32) as usize; + let mut idx = start; + + for _ in 0..self.entries.len() { + let stream = self.entries.get_mut(idx).unwrap(); + + match Pin::new(stream).poll_next(cx) { + Ready(Some(val)) => return Ready(Some(val)), + Ready(None) => { + // Remove the entry + self.entries.swap_remove(idx); + + // Check if this was the last entry, if so the cursor needs + // to wrap + if idx == self.entries.len() { + idx = 0; + } else if idx < start && start <= self.entries.len() { + // The stream being swapped into the current index has + // already been polled, so skip it. + idx = idx.wrapping_add(1) % self.entries.len(); + } + } + Pending => { + idx = idx.wrapping_add(1) % self.entries.len(); + } + } + } + + // If the map is empty, then the stream is complete. + if self.entries.is_empty() { + Ready(None) + } else { + Pending + } + } +} + /// Stream wrapper that records `BaselineMetrics` for a particular /// partition struct ObservedStream { diff --git a/datafusion/core/src/physical_plan/values.rs b/datafusion/core/src/physical_plan/values.rs index 897936814cee..6ab4f7b82490 100644 --- a/datafusion/core/src/physical_plan/values.rs +++ b/datafusion/core/src/physical_plan/values.rs @@ -22,8 +22,8 @@ use super::{common, SendableRecordBatchStream, Statistics}; use crate::error::{DataFusionError, Result}; use crate::execution::context::TaskContext; use crate::physical_plan::{ - memory::MemoryStream, ColumnarValue, DisplayFormatType, Distribution, ExecutionPlan, - Partitioning, PhysicalExpr, + memory::MemoryStream, ColumnarValue, DisplayFormatType, ExecutionPlan, Partitioning, + PhysicalExpr, }; use crate::scalar::ScalarValue; use arrow::array::new_null_array; @@ -108,11 +108,6 @@ impl ExecutionPlan for ValuesExec { fn children(&self) -> Vec> { vec![] } - - fn required_child_distribution(&self) -> Distribution { - Distribution::UnspecifiedDistribution - } - /// Get the output partitioning of this plan fn output_partitioning(&self) -> Partitioning { Partitioning::UnknownPartitioning(1) @@ -122,10 +117,6 @@ impl ExecutionPlan for ValuesExec { None } - fn relies_on_input_order(&self) -> bool { - false - } - fn with_new_children( self: Arc, _: Vec>, diff --git a/datafusion/core/src/physical_plan/windows/mod.rs b/datafusion/core/src/physical_plan/windows/mod.rs index 95582b2119de..0183ee6cc390 100644 --- a/datafusion/core/src/physical_plan/windows/mod.rs +++ b/datafusion/core/src/physical_plan/windows/mod.rs @@ -216,6 +216,8 @@ mod tests { ], input, schema.clone(), + vec![], + None, )?); let result: Vec = collect(window_exec, task_ctx).await?; @@ -261,6 +263,8 @@ mod tests { )?], blocking_exec, schema, + vec![], + None, )?); let fut = collect(window_agg_exec, task_ctx); diff --git a/datafusion/core/src/physical_plan/windows/window_agg_exec.rs b/datafusion/core/src/physical_plan/windows/window_agg_exec.rs index e9eac35a3d88..76ad0afb10a1 100644 --- a/datafusion/core/src/physical_plan/windows/window_agg_exec.rs +++ b/datafusion/core/src/physical_plan/windows/window_agg_exec.rs @@ -24,8 +24,9 @@ use crate::physical_plan::metrics::{ BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet, }; use crate::physical_plan::{ - common, ColumnStatistics, DisplayFormatType, Distribution, ExecutionPlan, - Partitioning, RecordBatchStream, SendableRecordBatchStream, Statistics, WindowExpr, + common, ColumnStatistics, DisplayFormatType, Distribution, EquivalenceProperties, + ExecutionPlan, Partitioning, PhysicalExpr, RecordBatchStream, + SendableRecordBatchStream, Statistics, WindowExpr, }; use arrow::{ array::ArrayRef, @@ -35,6 +36,7 @@ use arrow::{ }; use futures::stream::Stream; use futures::{ready, StreamExt}; +use log::warn; use std::any::Any; use std::pin::Pin; use std::sync::Arc; @@ -51,6 +53,10 @@ pub struct WindowAggExec { schema: SchemaRef, /// Schema before the window input_schema: SchemaRef, + /// Partition Keys + pub partition_keys: Vec>, + /// Sort Keys + pub sort_keys: Option>, /// Execution metrics metrics: ExecutionPlanMetricsSet, } @@ -61,6 +67,8 @@ impl WindowAggExec { window_expr: Vec>, input: Arc, input_schema: SchemaRef, + partition_keys: Vec>, + sort_keys: Option>, ) -> Result { let schema = create_schema(&input_schema, &window_expr)?; let schema = Arc::new(schema); @@ -69,6 +77,8 @@ impl WindowAggExec { window_expr, schema, input_schema, + partition_keys, + sort_keys, metrics: ExecutionPlanMetricsSet::new(), }) } @@ -119,22 +129,25 @@ impl ExecutionPlan for WindowAggExec { true } - fn relies_on_input_order(&self) -> bool { - true + fn required_input_ordering(&self) -> Vec> { + let sort_keys = self.sort_keys.as_deref(); + vec![sort_keys] } - fn required_child_distribution(&self) -> Distribution { - if self - .window_expr() - .iter() - .all(|expr| expr.partition_by().is_empty()) - { - Distribution::SinglePartition + fn required_input_distribution(&self) -> Vec { + if self.partition_keys.is_empty() { + warn!("No partition defined for WindowAggExec!!!"); + vec![Distribution::SinglePartition] } else { - Distribution::UnspecifiedDistribution + //TODO support PartitionCollections if there is no common partition columns in the window_expr + vec![Distribution::HashPartitioned(self.partition_keys.clone())] } } + fn equivalence_properties(&self) -> EquivalenceProperties { + self.input.equivalence_properties() + } + fn with_new_children( self: Arc, children: Vec>, @@ -143,6 +156,8 @@ impl ExecutionPlan for WindowAggExec { self.window_expr.clone(), children[0].clone(), self.input_schema.clone(), + self.partition_keys.clone(), + self.sort_keys.clone(), )?)) } diff --git a/datafusion/core/src/scheduler/pipeline/execution.rs b/datafusion/core/src/scheduler/pipeline/execution.rs index 20e7c6e79a48..8ecece85e938 100644 --- a/datafusion/core/src/scheduler/pipeline/execution.rs +++ b/datafusion/core/src/scheduler/pipeline/execution.rs @@ -235,8 +235,8 @@ impl ExecutionPlan for ProxyExecutionPlan { self.inner.output_ordering() } - fn required_child_distribution(&self) -> Distribution { - self.inner.required_child_distribution() + fn required_input_distribution(&self) -> Vec { + self.inner.required_input_distribution() } fn relies_on_input_order(&self) -> bool { diff --git a/datafusion/core/tests/user_defined_plan.rs b/datafusion/core/tests/user_defined_plan.rs index 7e9ece36b74c..3fb810e3ac0b 100644 --- a/datafusion/core/tests/user_defined_plan.rs +++ b/datafusion/core/tests/user_defined_plan.rs @@ -441,12 +441,8 @@ impl ExecutionPlan for TopKExec { None } - fn relies_on_input_order(&self) -> bool { - false - } - - fn required_child_distribution(&self) -> Distribution { - Distribution::SinglePartition + fn required_input_distribution(&self) -> Vec { + vec![Distribution::SinglePartition] } fn children(&self) -> Vec> { diff --git a/datafusion/physical-expr/src/equivalence.rs b/datafusion/physical-expr/src/equivalence.rs new file mode 100644 index 000000000000..411a492a522f --- /dev/null +++ b/datafusion/physical-expr/src/equivalence.rs @@ -0,0 +1,256 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::expressions::Column; + +use arrow::datatypes::SchemaRef; + +use std::collections::HashMap; +use std::collections::HashSet; + +/// Equivalence Properties is a vec of EquivalentClass. +#[derive(Debug, Default, Clone)] +pub struct EquivalenceProperties { + classes: Vec, +} + +impl EquivalenceProperties { + pub fn new() -> Self { + EquivalenceProperties { classes: vec![] } + } + + pub fn classes(&self) -> &[EquivalentClass] { + &self.classes + } + + pub fn extend>(&mut self, iter: I) { + self.classes.extend(iter) + } + + /// Add new equal conditions into the EquivalenceProperties, the new equal conditions are usually comming from the + /// equality predicates in Join or Filter + pub fn add_equal_conditions(&mut self, new_conditions: (&Column, &Column)) { + let mut idx1: Option = None; + let mut idx2: Option = None; + for (idx, class) in self.classes.iter_mut().enumerate() { + let contains_first = class.contains(new_conditions.0); + let contains_second = class.contains(new_conditions.1); + match (contains_first, contains_second) { + (true, false) => { + class.insert(new_conditions.1.clone()); + idx1 = Some(idx); + } + (false, true) => { + class.insert(new_conditions.0.clone()); + idx2 = Some(idx); + } + (true, true) => { + idx1 = Some(idx); + idx2 = Some(idx); + break; + } + (false, false) => {} + } + } + + match (idx1, idx2) { + (Some(idx_1), Some(idx_2)) if idx_1 != idx_2 => { + // need to merge the two existing EquivalentClasses + let second_eq_class = self.classes.get(idx_2).unwrap().clone(); + let first_eq_class = self.classes.get_mut(idx_1).unwrap(); + for prop in second_eq_class.iter() { + if !first_eq_class.contains(prop) { + first_eq_class.insert(prop.clone()); + } + } + self.classes.remove(idx_2); + } + (None, None) => { + // adding new pairs + self.classes.push(EquivalentClass::new( + new_conditions.0.clone(), + vec![new_conditions.1.clone()], + )); + } + _ => {} + } + } + + pub fn merge_properties_with_alias( + &mut self, + alias_map: &HashMap>, + ) { + for (column, columns) in alias_map { + let mut find_match = false; + for class in self.classes.iter_mut() { + if class.contains(column) { + for col in columns { + class.insert(col.clone()); + } + find_match = true; + break; + } + } + if !find_match { + self.classes + .push(EquivalentClass::new(column.clone(), columns.clone())); + } + } + } + + pub fn truncate_properties_not_in_schema(&mut self, schema: &SchemaRef) { + for class in self.classes.iter_mut() { + let mut columns_to_remove = vec![]; + for column in class.iter() { + if let Ok(idx) = schema.index_of(column.name()) { + if idx != column.index() { + columns_to_remove.push(column.clone()); + } + } else { + columns_to_remove.push(column.clone()); + } + } + for column in columns_to_remove { + class.remove(&column); + } + } + self.classes.retain(|props| props.len() > 1); + } +} + +/// Equivalent Class is a set of Columns that are known to have the same value in all tuples in a relation +/// Equivalent Class is generated by equality predicates, typically equijoin conditions and equality conditions in filters. +#[derive(Debug, Clone)] +pub struct EquivalentClass { + /// First element in the EquivalentClass + head: Column, + /// Other equal columns + others: HashSet, +} + +impl EquivalentClass { + pub fn new(head: Column, others: Vec) -> Self { + EquivalentClass { + head, + others: HashSet::from_iter(others), + } + } + + pub fn head(&self) -> &Column { + &self.head + } + + pub fn others(&self) -> &HashSet { + &self.others + } + + pub fn contains(&self, col: &Column) -> bool { + self.head == *col || self.others.contains(col) + } + + pub fn insert(&mut self, col: Column) -> bool { + self.others.insert(col) + } + + pub fn remove(&mut self, col: &Column) -> bool { + let removed = self.others.remove(col); + if !removed && *col == self.head { + let one_col = self.others.iter().next().cloned(); + if let Some(col) = one_col { + let removed = self.others.remove(&col); + self.head = col; + removed + } else { + false + } + } else { + true + } + } + + pub fn iter(&self) -> impl Iterator { + std::iter::once(&self.head).chain(self.others.iter()) + } + + pub fn len(&self) -> usize { + self.others.len() + 1 + } + + pub fn is_empty(&self) -> bool { + self.len() == 0 + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::expressions::Column; + use datafusion_common::Result; + + #[test] + fn add_equal_conditions_test() -> Result<()> { + let mut eq_properties = EquivalenceProperties::new(); + let new_condition = (&Column::new("a", 0), &Column::new("b", 1)); + eq_properties.add_equal_conditions(new_condition); + assert_eq!(eq_properties.classes().len(), 1); + + let new_condition = (&Column::new("b", 1), &Column::new("a", 0)); + eq_properties.add_equal_conditions(new_condition); + assert_eq!(eq_properties.classes().len(), 1); + assert_eq!(eq_properties.classes()[0].len(), 2); + + let new_condition = (&Column::new("b", 1), &Column::new("c", 2)); + eq_properties.add_equal_conditions(new_condition); + assert_eq!(eq_properties.classes().len(), 1); + assert_eq!(eq_properties.classes()[0].len(), 3); + + let new_condition = (&Column::new("x", 99), &Column::new("y", 100)); + eq_properties.add_equal_conditions(new_condition); + assert_eq!(eq_properties.classes().len(), 2); + + let new_condition = (&Column::new("x", 99), &Column::new("a", 0)); + eq_properties.add_equal_conditions(new_condition); + assert_eq!(eq_properties.classes().len(), 1); + assert_eq!(eq_properties.classes()[0].len(), 5); + + Ok(()) + } + + #[test] + fn merge_equivalence_properties_with_alias_test() -> Result<()> { + let mut eq_properties = EquivalenceProperties::new(); + let mut alias_map = HashMap::new(); + alias_map.insert( + Column::new("a", 0), + vec![Column::new("a1", 1), Column::new("a2", 2)], + ); + + eq_properties.merge_properties_with_alias(&alias_map); + assert_eq!(eq_properties.classes().len(), 1); + assert_eq!(eq_properties.classes()[0].len(), 3); + + let mut alias_map = HashMap::new(); + alias_map.insert( + Column::new("a", 0), + vec![Column::new("a3", 1), Column::new("a4", 2)], + ); + eq_properties.merge_properties_with_alias(&alias_map); + assert_eq!(eq_properties.classes().len(), 1); + assert_eq!(eq_properties.classes()[0].len(), 5); + Ok(()) + } +} diff --git a/datafusion/physical-expr/src/expressions/column.rs b/datafusion/physical-expr/src/expressions/column.rs index 450919aa3f2c..776f4dc0a506 100644 --- a/datafusion/physical-expr/src/expressions/column.rs +++ b/datafusion/physical-expr/src/expressions/column.rs @@ -118,6 +118,76 @@ impl PartialEq for Column { } } +/// Represents the unknown column without index +#[derive(Debug, Hash, PartialEq, Eq, Clone)] +pub struct UnKnownColumn { + name: String, +} + +impl UnKnownColumn { + /// Create a new unknown column expression + pub fn new(name: &str) -> Self { + Self { + name: name.to_owned(), + } + } + + /// Get the column name + pub fn name(&self) -> &str { + &self.name + } +} + +impl std::fmt::Display for UnKnownColumn { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + write!(f, "{}", self.name) + } +} + +impl PhysicalExpr for UnKnownColumn { + /// Return a reference to Any that can be used for downcasting + fn as_any(&self) -> &dyn std::any::Any { + self + } + + /// Get the data type of this expression, given the schema of the input + fn data_type(&self, _input_schema: &Schema) -> Result { + Ok(DataType::Null) + } + + /// Decide whehter this expression is nullable, given the schema of the input + fn nullable(&self, _input_schema: &Schema) -> Result { + Ok(true) + } + + /// Evaluate the expression + fn evaluate(&self, _batch: &RecordBatch) -> Result { + Err(DataFusionError::Plan( + "UnKnownColumn::evaluate() should not be called".to_owned(), + )) + } + + fn children(&self) -> Vec> { + vec![] + } + + fn with_new_children( + self: Arc, + _children: Vec>, + ) -> Result> { + Ok(self) + } +} + +impl PartialEq for UnKnownColumn { + fn eq(&self, other: &dyn Any) -> bool { + down_cast_any_ref(other) + .downcast_ref::() + .map(|x| self == x) + .unwrap_or(false) + } +} + #[derive(Debug, Clone)] struct ColumnExprStats { index: usize, diff --git a/datafusion/physical-expr/src/expressions/mod.rs b/datafusion/physical-expr/src/expressions/mod.rs index d27737dd9881..26bb3ca1e64c 100644 --- a/datafusion/physical-expr/src/expressions/mod.rs +++ b/datafusion/physical-expr/src/expressions/mod.rs @@ -72,7 +72,7 @@ pub use case::{case, CaseExpr}; pub use cast::{ cast, cast_column, cast_with_options, CastExpr, DEFAULT_DATAFUSION_CAST_OPTIONS, }; -pub use column::{col, Column}; +pub use column::{col, Column, UnKnownColumn}; pub use datetime::DateTimeIntervalExpr; pub use get_indexed_field::GetIndexedFieldExpr; pub use in_list::{in_list, InListExpr}; diff --git a/datafusion/physical-expr/src/lib.rs b/datafusion/physical-expr/src/lib.rs index d2b899dca785..026329e84b30 100644 --- a/datafusion/physical-expr/src/lib.rs +++ b/datafusion/physical-expr/src/lib.rs @@ -21,6 +21,7 @@ pub mod conditional_expressions; #[cfg(feature = "crypto_expressions")] pub mod crypto_expressions; pub mod datetime_expressions; +pub mod equivalence; pub mod execution_props; pub mod expressions; pub mod functions; @@ -46,7 +47,15 @@ pub mod window; // reexport this to maintain compatibility with anything that used from_slice previously pub use aggregate::AggregateExpr; pub use datafusion_common::from_slice; +pub use equivalence::EquivalenceProperties; +pub use equivalence::EquivalentClass; pub use physical_expr::{ExprBoundaries, PhysicalExpr, PhysicalExprStats}; pub use planner::create_physical_expr; pub use scalar_function::ScalarFunctionExpr; pub use sort_expr::PhysicalSortExpr; +pub use utils::{ + expr_list_eq_any_order, expr_list_eq_strict_order, + normalize_expr_with_equivalence_properties, normalize_out_expr_with_alias_schema, + normalize_sort_expr_with_equivalence_properties, sort_expr_list_eq_strict_order, + split_conjunction, +}; diff --git a/datafusion/physical-expr/src/utils.rs b/datafusion/physical-expr/src/utils.rs index 7ee947c0a7f9..78ce9c931bf0 100644 --- a/datafusion/physical-expr/src/utils.rs +++ b/datafusion/physical-expr/src/utils.rs @@ -15,9 +15,18 @@ // specific language governing permissions and limitations // under the License. +use crate::equivalence::EquivalentClass; +use crate::expressions::BinaryExpr; +use crate::expressions::Column; +use crate::expressions::UnKnownColumn; +use crate::rewrite::TreeNodeRewritable; use crate::PhysicalExpr; use crate::PhysicalSortExpr; +use datafusion_expr::Operator; +use arrow::datatypes::SchemaRef; + +use std::collections::HashMap; use std::sync::Arc; /// Compare the two expr lists are equal no matter the order. @@ -65,6 +74,117 @@ pub fn sort_expr_list_eq_strict_order( list1.len() == list2.len() && list1.iter().zip(list2.iter()).all(|(e1, e2)| e1.eq(e2)) } +/// Assume the predicate is in the form of CNF, split the predicate to a Vec of PhysicalExprs. +/// +/// For example, split "a1 = a2 AND b1 <= b2 AND c1 != c2" into ["a1 = a2", "b1 <= b2", "c1 != c2"] +pub fn split_conjunction( + predicate: &Arc, +) -> Vec<&Arc> { + split_conjunction_impl(predicate, vec![]) +} + +fn split_conjunction_impl<'a>( + predicate: &'a Arc, + mut exprs: Vec<&'a Arc>, +) -> Vec<&'a Arc> { + match predicate.as_any().downcast_ref::() { + Some(binary) => match binary.op() { + Operator::And => { + let exprs = split_conjunction_impl(binary.left(), exprs); + split_conjunction_impl(binary.right(), exprs) + } + _ => { + exprs.push(predicate); + exprs + } + }, + None => { + exprs.push(predicate); + exprs + } + } +} + +/// Normalize the output expressions based on Alias Map and SchemaRef. +/// +/// 1) If there is mapping in Alias Map, replace the Column in the output expressions with the 1st Column in Alias Map +/// 2) If the Column is invalid for the current Schema, replace the Column with a place holder UnKnownColumn +/// +pub fn normalize_out_expr_with_alias_schema( + expr: Arc, + alias_map: &HashMap>, + schema: &SchemaRef, +) -> Arc { + let expr_clone = expr.clone(); + expr_clone + .transform(&|expr| { + let normalized_form: Option> = + match expr.as_any().downcast_ref::() { + Some(column) => { + let out = alias_map + .get(column) + .map(|c| { + let out_col: Arc = + Arc::new(c[0].clone()); + out_col + }) + .or_else(|| match schema.index_of(column.name()) { + // Exactly matching, return None, no need to do the transform + Ok(idx) if column.index() == idx => None, + _ => { + let out_col: Arc = + Arc::new(UnKnownColumn::new(column.name())); + Some(out_col) + } + }); + out + } + None => None, + }; + normalized_form + }) + .unwrap_or(expr) +} + +pub fn normalize_expr_with_equivalence_properties( + expr: Arc, + eq_properties: &[EquivalentClass], +) -> Arc { + let expr_clone = expr.clone(); + expr_clone + .transform(&|expr| match expr.as_any().downcast_ref::() { + Some(column) => { + let mut normalized: Option> = None; + for class in eq_properties { + if class.contains(column) { + normalized = Some(Arc::new(class.head().clone())); + break; + } + } + normalized + } + None => None, + }) + .unwrap_or(expr) +} + +pub fn normalize_sort_expr_with_equivalence_properties( + sort_expr: PhysicalSortExpr, + eq_properties: &[EquivalentClass], +) -> PhysicalSortExpr { + let normalized_expr = + normalize_expr_with_equivalence_properties(sort_expr.expr.clone(), eq_properties); + + if sort_expr.expr.ne(&normalized_expr) { + PhysicalSortExpr { + expr: normalized_expr, + options: sort_expr.options, + } + } else { + sort_expr + } +} + #[cfg(test)] mod tests { @@ -77,7 +197,7 @@ mod tests { use std::sync::Arc; #[test] - fn expr_list_eq_any_order_test() -> Result<()> { + fn expr_list_eq_test() -> Result<()> { let list1: Vec> = vec![ Arc::new(Column::new("a", 0)), Arc::new(Column::new("a", 0)), @@ -91,6 +211,15 @@ mod tests { assert!(!expr_list_eq_any_order(list1.as_slice(), list2.as_slice())); assert!(!expr_list_eq_any_order(list2.as_slice(), list1.as_slice())); + assert!(!expr_list_eq_strict_order( + list1.as_slice(), + list2.as_slice() + )); + assert!(!expr_list_eq_strict_order( + list2.as_slice(), + list1.as_slice() + )); + let list3: Vec> = vec![ Arc::new(Column::new("a", 0)), Arc::new(Column::new("b", 1)), @@ -110,6 +239,17 @@ mod tests { assert!(expr_list_eq_any_order(list3.as_slice(), list3.as_slice())); assert!(expr_list_eq_any_order(list4.as_slice(), list4.as_slice())); + assert!(!expr_list_eq_strict_order( + list3.as_slice(), + list4.as_slice() + )); + assert!(!expr_list_eq_strict_order( + list4.as_slice(), + list3.as_slice() + )); + assert!(expr_list_eq_any_order(list3.as_slice(), list3.as_slice())); + assert!(expr_list_eq_any_order(list4.as_slice(), list4.as_slice())); + Ok(()) }