diff --git a/datafusion/src/physical_plan/mod.rs b/datafusion/src/physical_plan/mod.rs index af6969c43cbd..ebc6fd6ce94a 100644 --- a/datafusion/src/physical_plan/mod.rs +++ b/datafusion/src/physical_plan/mod.rs @@ -341,9 +341,8 @@ pub async fn collect_partitioned( pub enum Partitioning { /// Allocate batches using a round-robin algorithm and the specified number of partitions RoundRobinBatch(usize), - /// Allocate rows based on a hash of one of more expressions and the specified - /// number of partitions - /// This partitioning scheme is not yet fully supported. See [ARROW-11011](https://issues.apache.org/jira/browse/ARROW-11011) + /// Allocate rows based on a hash of one of more expressions and the specified number of + /// partitions Hash(Vec>, usize), /// Unknown partitioning scheme with a known number of partitions UnknownPartitioning(usize), diff --git a/datafusion/src/physical_plan/planner.rs b/datafusion/src/physical_plan/planner.rs index d7451c787096..37ca10aaa0d7 100644 --- a/datafusion/src/physical_plan/planner.rs +++ b/datafusion/src/physical_plan/planner.rs @@ -222,11 +222,15 @@ impl DefaultPhysicalPlanner { .flat_map(|x| x.0.data_type(physical_input_schema.as_ref())) .any(|x| matches!(x, DataType::Dictionary(_, _))); - if !groups.is_empty() + let can_repartition = !groups.is_empty() && ctx_state.config.concurrency > 1 && ctx_state.config.repartition_aggregations - && !contains_dict - { + && !contains_dict; + + let (initial_aggr, next_partition_mode): ( + Arc, + AggregateMode, + ) = if can_repartition { // Divide partial hash aggregates into multiple partitions by hash key let hash_repartition = Arc::new(RepartitionExec::try_new( initial_aggr, @@ -235,35 +239,25 @@ impl DefaultPhysicalPlanner { ctx_state.config.concurrency, ), )?); - - // Combine hashaggregates within the partition - Ok(Arc::new(HashAggregateExec::try_new( - AggregateMode::FinalPartitioned, - final_group - .iter() - .enumerate() - .map(|(i, expr)| (expr.clone(), groups[i].1.clone())) - .collect(), - aggregates, - hash_repartition, - input_schema, - )?)) + // Combine hash aggregates within the partition + (hash_repartition, AggregateMode::FinalPartitioned) } else { - // construct a second aggregation, keeping the final column name equal to the first aggregation - // and the expressions corresponding to the respective aggregate + // construct a second aggregation, keeping the final column name equal to the + // first aggregation and the expressions corresponding to the respective aggregate + (initial_aggr, AggregateMode::Final) + }; - Ok(Arc::new(HashAggregateExec::try_new( - AggregateMode::Final, - final_group - .iter() - .enumerate() - .map(|(i, expr)| (expr.clone(), groups[i].1.clone())) - .collect(), - aggregates, - initial_aggr, - input_schema, - )?)) - } + Ok(Arc::new(HashAggregateExec::try_new( + next_partition_mode, + final_group + .iter() + .enumerate() + .map(|(i, expr)| (expr.clone(), groups[i].1.clone())) + .collect(), + aggregates, + initial_aggr, + input_schema, + )?)) } LogicalPlan::Projection { input, expr, .. } => { let input_exec = self.create_initial_plan(input, ctx_state)?;