-
Notifications
You must be signed in to change notification settings - Fork 1.8k
[Part2] Partition and Sort Enforcement, ExecutionPlan enhancement #4043
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
fa9511f
26fdffb
f21a2c1
c32d772
e945c37
33e9c18
8dc5d5f
4437235
fc18efc
fcfbf66
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -842,6 +842,8 @@ mod tests { | |
| use super::*; | ||
| use crate::execution::options::{CsvReadOptions, ParquetReadOptions}; | ||
| use crate::physical_plan::ColumnarValue; | ||
| use crate::physical_plan::Partitioning; | ||
| use crate::physical_plan::PhysicalExpr; | ||
| use crate::test_util; | ||
| use crate::test_util::parquet_test_data; | ||
| use crate::{assert_batches_sorted_eq, execution::context::SessionContext}; | ||
|
|
@@ -851,6 +853,7 @@ mod tests { | |
| avg, cast, count, count_distinct, create_udf, lit, max, min, sum, | ||
| BuiltInWindowFunction, ScalarFunctionImplementation, Volatility, WindowFunction, | ||
| }; | ||
| use datafusion_physical_expr::expressions::Column; | ||
|
|
||
| #[tokio::test] | ||
| async fn select_columns() -> Result<()> { | ||
|
|
@@ -1515,4 +1518,163 @@ mod tests { | |
|
|
||
| Ok(()) | ||
| } | ||
|
|
||
| #[tokio::test] | ||
| async fn partition_aware_union() -> Result<()> { | ||
| let left = test_table().await?.select_columns(&["c1", "c2"])?; | ||
| let right = test_table_with_name("c2") | ||
| .await? | ||
| .select_columns(&["c1", "c3"])? | ||
| .with_column_renamed("c2.c1", "c2_c1")?; | ||
|
|
||
| let left_rows = left.collect().await?; | ||
| let right_rows = right.collect().await?; | ||
| let join1 = | ||
| left.join(right.clone(), JoinType::Inner, &["c1"], &["c2_c1"], None)?; | ||
| let join2 = left.join(right, JoinType::Inner, &["c1"], &["c2_c1"], None)?; | ||
|
|
||
| let union = join1.union(join2)?; | ||
|
|
||
| let union_rows = union.collect().await?; | ||
|
|
||
| assert_eq!(100, left_rows.iter().map(|x| x.num_rows()).sum::<usize>()); | ||
| assert_eq!(100, right_rows.iter().map(|x| x.num_rows()).sum::<usize>()); | ||
| assert_eq!(4016, union_rows.iter().map(|x| x.num_rows()).sum::<usize>()); | ||
|
|
||
| let physical_plan = union.create_physical_plan().await?; | ||
| let default_partition_count = | ||
| SessionContext::new().copied_config().target_partitions; | ||
|
|
||
| // For partition aware union, the output partition count should not be changed. | ||
| assert_eq!( | ||
| physical_plan.output_partitioning().partition_count(), | ||
| default_partition_count | ||
| ); | ||
| // For partition aware union, the output partition is the same with the union's inputs | ||
| for child in physical_plan.children() { | ||
| assert_eq!( | ||
| physical_plan.output_partitioning(), | ||
| child.output_partitioning() | ||
| ); | ||
| } | ||
|
|
||
| Ok(()) | ||
| } | ||
|
|
||
| #[tokio::test] | ||
| async fn non_partition_aware_union() -> Result<()> { | ||
| let left = test_table().await?.select_columns(&["c1", "c2"])?; | ||
| let right = test_table_with_name("c2") | ||
| .await? | ||
| .select_columns(&["c1", "c2"])? | ||
| .with_column_renamed("c2.c1", "c2_c1")? | ||
| .with_column_renamed("c2.c2", "c2_c2")?; | ||
|
|
||
| let left_rows = left.collect().await?; | ||
| let right_rows = right.collect().await?; | ||
| let join1 = left.join( | ||
| right.clone(), | ||
| JoinType::Inner, | ||
| &["c1", "c2"], | ||
| &["c2_c1", "c2_c2"], | ||
| None, | ||
| )?; | ||
|
|
||
| // join key ordering is different | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 👍 |
||
| let join2 = left.join( | ||
| right, | ||
| JoinType::Inner, | ||
| &["c2", "c1"], | ||
| &["c2_c2", "c2_c1"], | ||
| None, | ||
| )?; | ||
|
|
||
| let union = join1.union(join2)?; | ||
|
|
||
| let union_rows = union.collect().await?; | ||
|
|
||
| assert_eq!(100, left_rows.iter().map(|x| x.num_rows()).sum::<usize>()); | ||
| assert_eq!(100, right_rows.iter().map(|x| x.num_rows()).sum::<usize>()); | ||
| assert_eq!(916, union_rows.iter().map(|x| x.num_rows()).sum::<usize>()); | ||
|
|
||
| let physical_plan = union.create_physical_plan().await?; | ||
| let default_partition_count = | ||
| SessionContext::new().copied_config().target_partitions; | ||
|
|
||
| // For non-partition aware union, the output partitioning count should be the combination of all output partitions count | ||
| assert!(matches!( | ||
| physical_plan.output_partitioning(), | ||
| Partitioning::UnknownPartitioning(partition_count) if partition_count == default_partition_count * 2)); | ||
| Ok(()) | ||
| } | ||
|
|
||
| #[tokio::test] | ||
| async fn verify_join_output_partitioning() -> Result<()> { | ||
| let left = test_table().await?.select_columns(&["c1", "c2"])?; | ||
| let right = test_table_with_name("c2") | ||
| .await? | ||
| .select_columns(&["c1", "c2"])? | ||
| .with_column_renamed("c2.c1", "c2_c1")? | ||
| .with_column_renamed("c2.c2", "c2_c2")?; | ||
|
|
||
| let all_join_types = vec![ | ||
| JoinType::Inner, | ||
| JoinType::Left, | ||
| JoinType::Right, | ||
| JoinType::Full, | ||
| JoinType::LeftSemi, | ||
| JoinType::RightSemi, | ||
| JoinType::LeftAnti, | ||
| JoinType::RightAnti, | ||
| ]; | ||
|
|
||
| let default_partition_count = | ||
| SessionContext::new().copied_config().target_partitions; | ||
|
|
||
| for join_type in all_join_types { | ||
| let join = left.join( | ||
| right.clone(), | ||
| join_type, | ||
| &["c1", "c2"], | ||
| &["c2_c1", "c2_c2"], | ||
| None, | ||
| )?; | ||
| let physical_plan = join.create_physical_plan().await?; | ||
| let out_partitioning = physical_plan.output_partitioning(); | ||
| let join_schema = physical_plan.schema(); | ||
|
|
||
| match join_type { | ||
| JoinType::Inner | ||
| | JoinType::Left | ||
| | JoinType::LeftSemi | ||
| | JoinType::LeftAnti => { | ||
| let left_exprs: Vec<Arc<dyn PhysicalExpr>> = vec![ | ||
| Arc::new(Column::new_with_schema("c1", &join_schema).unwrap()), | ||
| Arc::new(Column::new_with_schema("c2", &join_schema).unwrap()), | ||
| ]; | ||
| assert_eq!( | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 👍
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
| out_partitioning, | ||
| Partitioning::Hash(left_exprs, default_partition_count) | ||
| ); | ||
| } | ||
| JoinType::Right | JoinType::RightSemi | JoinType::RightAnti => { | ||
| let right_exprs: Vec<Arc<dyn PhysicalExpr>> = vec![ | ||
| Arc::new(Column::new_with_schema("c2_c1", &join_schema).unwrap()), | ||
| Arc::new(Column::new_with_schema("c2_c2", &join_schema).unwrap()), | ||
| ]; | ||
| assert_eq!( | ||
| out_partitioning, | ||
| Partitioning::Hash(right_exprs, default_partition_count) | ||
| ); | ||
| } | ||
| JoinType::Full => { | ||
| assert!(matches!( | ||
| out_partitioning, | ||
| Partitioning::UnknownPartitioning(partition_count) if partition_count == default_partition_count)); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| Ok(()) | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -37,6 +37,7 @@ use datafusion_physical_expr::{ | |
| expressions, AggregateExpr, PhysicalExpr, PhysicalSortExpr, | ||
| }; | ||
| use std::any::Any; | ||
| use std::collections::HashMap; | ||
|
|
||
| use std::sync::Arc; | ||
|
|
||
|
|
@@ -45,9 +46,11 @@ mod no_grouping; | |
| mod row_hash; | ||
|
|
||
| use crate::physical_plan::aggregates::row_hash::GroupedHashAggregateStreamV2; | ||
| use crate::physical_plan::EquivalenceProperties; | ||
| pub use datafusion_expr::AggregateFunction; | ||
| use datafusion_physical_expr::aggregate::row_accumulator::RowAccumulator; | ||
| pub use datafusion_physical_expr::expressions::create_aggregate_expr; | ||
| use datafusion_physical_expr::normalize_out_expr_with_alias_schema; | ||
| use datafusion_row::{row_supported, RowType}; | ||
|
|
||
| /// Hash aggregate modes | ||
|
|
@@ -163,6 +166,9 @@ pub struct AggregateExec { | |
| /// same as input.schema() but for the final aggregate it will be the same as the input | ||
| /// to the partial aggregate | ||
| input_schema: SchemaRef, | ||
| /// The alias map used to normalize out expressions like Partitioning and PhysicalSortExpr | ||
| /// The key is the column from the input schema and the values are the columns from the output schema | ||
| alias_map: HashMap<Column, Vec<Column>>, | ||
| /// Execution Metrics | ||
| metrics: ExecutionPlanMetricsSet, | ||
| } | ||
|
|
@@ -186,13 +192,26 @@ impl AggregateExec { | |
|
|
||
| let schema = Arc::new(schema); | ||
|
|
||
| let mut alias_map: HashMap<Column, Vec<Column>> = HashMap::new(); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you explain what this code is for? It doesn't seem correct to me as I don't understand the circumstances under which the output of be different 🤔 It seems like in this case the input logical plan maybe was incorrect?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is to deal with the case that there are alias in the group exprs, in this case we can not derive the output partitioning from the input/child directly, need to take the alias into consideration. This is similar to the For example, the input has the output partitioning 'a', the |
||
| for (expression, name) in group_by.expr.iter() { | ||
| if let Some(column) = expression.as_any().downcast_ref::<Column>() { | ||
| let new_col_idx = schema.index_of(name)?; | ||
| // When the column name is the same, but index does not equal, treat it as Alias | ||
| if (column.name() != name) || (column.index() != new_col_idx) { | ||
| let entry = alias_map.entry(column.clone()).or_insert_with(Vec::new); | ||
| entry.push(Column::new(name, new_col_idx)); | ||
| } | ||
| }; | ||
| } | ||
|
|
||
| Ok(AggregateExec { | ||
| mode, | ||
| group_by, | ||
| aggr_expr, | ||
| input, | ||
| schema, | ||
| input_schema, | ||
| alias_map, | ||
| metrics: ExecutionPlanMetricsSet::new(), | ||
| }) | ||
| } | ||
|
|
@@ -255,25 +274,51 @@ impl ExecutionPlan for AggregateExec { | |
|
|
||
| /// Get the output partitioning of this plan | ||
| fn output_partitioning(&self) -> Partitioning { | ||
| self.input.output_partitioning() | ||
| match &self.mode { | ||
| AggregateMode::Partial => { | ||
| // Partial Aggregation will not change the output partitioning but need to respect the Alias | ||
| let input_partition = self.input.output_partitioning(); | ||
| match input_partition { | ||
| Partitioning::Hash(exprs, part) => { | ||
| let normalized_exprs = exprs | ||
| .into_iter() | ||
| .map(|expr| { | ||
| normalize_out_expr_with_alias_schema( | ||
| expr, | ||
| &self.alias_map, | ||
| &self.schema, | ||
| ) | ||
| }) | ||
| .collect::<Vec<_>>(); | ||
| Partitioning::Hash(normalized_exprs, part) | ||
| } | ||
| _ => input_partition, | ||
| } | ||
| } | ||
| // Final Aggregation's output partitioning is the same as its real input | ||
| _ => self.input.output_partitioning(), | ||
| } | ||
| } | ||
|
|
||
| fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { | ||
| None | ||
| } | ||
|
|
||
| fn required_child_distribution(&self) -> Distribution { | ||
| fn required_input_distribution(&self) -> Vec<Distribution> { | ||
| match &self.mode { | ||
| AggregateMode::Partial => Distribution::UnspecifiedDistribution, | ||
| AggregateMode::FinalPartitioned => Distribution::HashPartitioned( | ||
| self.group_by.expr.iter().map(|x| x.0.clone()).collect(), | ||
| ), | ||
| AggregateMode::Final => Distribution::SinglePartition, | ||
| AggregateMode::Partial => vec![Distribution::UnspecifiedDistribution], | ||
| AggregateMode::FinalPartitioned => { | ||
| vec![Distribution::HashPartitioned(self.output_group_expr())] | ||
| } | ||
| AggregateMode::Final => vec![Distribution::SinglePartition], | ||
| } | ||
| } | ||
|
|
||
| fn relies_on_input_order(&self) -> bool { | ||
| false | ||
| fn equivalence_properties(&self) -> EquivalenceProperties { | ||
| let mut input_equivalence_properties = self.input.equivalence_properties(); | ||
| input_equivalence_properties.merge_properties_with_alias(&self.alias_map); | ||
| input_equivalence_properties.truncate_properties_not_in_schema(&self.schema); | ||
| input_equivalence_properties | ||
| } | ||
|
|
||
| fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> { | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.