Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions datafusion/src/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -341,9 +341,8 @@ pub async fn collect_partitioned(
pub enum Partitioning {
/// Allocate batches using a round-robin algorithm and the specified number of partitions
RoundRobinBatch(usize),
/// Allocate rows based on a hash of one of more expressions and the specified
/// number of partitions
/// This partitioning scheme is not yet fully supported. See [ARROW-11011](https://issues.apache.org/jira/browse/ARROW-11011)
/// Allocate rows based on a hash of one of more expressions and the specified number of
/// partitions
Hash(Vec<Arc<dyn PhysicalExpr>>, usize),
/// Unknown partitioning scheme with a known number of partitions
UnknownPartitioning(usize),
Expand Down
54 changes: 24 additions & 30 deletions datafusion/src/physical_plan/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -222,11 +222,15 @@ impl DefaultPhysicalPlanner {
.flat_map(|x| x.0.data_type(physical_input_schema.as_ref()))
.any(|x| matches!(x, DataType::Dictionary(_, _)));

if !groups.is_empty()
let can_repartition = !groups.is_empty()
&& ctx_state.config.concurrency > 1
&& ctx_state.config.repartition_aggregations
&& !contains_dict
{
&& !contains_dict;

let (initial_aggr, next_partition_mode): (
Arc<dyn ExecutionPlan>,
AggregateMode,
) = if can_repartition {
// Divide partial hash aggregates into multiple partitions by hash key
let hash_repartition = Arc::new(RepartitionExec::try_new(
initial_aggr,
Expand All @@ -235,35 +239,25 @@ impl DefaultPhysicalPlanner {
ctx_state.config.concurrency,
),
)?);

// Combine hashaggregates within the partition
Ok(Arc::new(HashAggregateExec::try_new(
AggregateMode::FinalPartitioned,
final_group
.iter()
.enumerate()
.map(|(i, expr)| (expr.clone(), groups[i].1.clone()))
.collect(),
aggregates,
hash_repartition,
input_schema,
)?))
// Combine hash aggregates within the partition
(hash_repartition, AggregateMode::FinalPartitioned)
} else {
// construct a second aggregation, keeping the final column name equal to the first aggregation
// and the expressions corresponding to the respective aggregate
// construct a second aggregation, keeping the final column name equal to the
// first aggregation and the expressions corresponding to the respective aggregate
(initial_aggr, AggregateMode::Final)
};

Ok(Arc::new(HashAggregateExec::try_new(
AggregateMode::Final,
final_group
.iter()
.enumerate()
.map(|(i, expr)| (expr.clone(), groups[i].1.clone()))
.collect(),
aggregates,
initial_aggr,
input_schema,
)?))
}
Ok(Arc::new(HashAggregateExec::try_new(
next_partition_mode,
final_group
.iter()
.enumerate()
.map(|(i, expr)| (expr.clone(), groups[i].1.clone()))
.collect(),
aggregates,
initial_aggr,
input_schema,
)?))
}
LogicalPlan::Projection { input, expr, .. } => {
let input_exec = self.create_initial_plan(input, ctx_state)?;
Expand Down