diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index 6bcc45e9e465..a561da7f7d42 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -800,7 +800,11 @@ impl DefaultPhysicalPlanner { let can_repartition = !groups.is_empty() && session_state.config().target_partitions() > 1 - && session_state.config().repartition_aggregations(); + && session_state.config().repartition_aggregations() + && has_sufficient_rows_for_repartition( + initial_aggr.input(), + session_state, + )?; // Some aggregators may be modified during initialization for // optimization purposes. For example, a FIRST_VALUE may turn @@ -1578,6 +1582,25 @@ impl DefaultPhysicalPlanner { } } +fn has_sufficient_rows_for_repartition( + input: &Arc, + session_state: &SessionState, +) -> Result { + // Get partition statistics, default to repartitioning if unavailable + let stats = match input.partition_statistics(None) { + Ok(s) => s, + Err(_) => return Ok(true), + }; + + if let Some(num_rows) = stats.num_rows.get_value().copied() { + let batch_size = session_state.config().batch_size(); + + return Ok(num_rows >= batch_size); + } + + Ok(true) +} + /// Expand and align a GROUPING SET expression. /// (see ) /// diff --git a/datafusion/sqllogictest/test_files/repartition.slt b/datafusion/sqllogictest/test_files/repartition.slt index f6061b61a249..dd879c41c96a 100644 --- a/datafusion/sqllogictest/test_files/repartition.slt +++ b/datafusion/sqllogictest/test_files/repartition.slt @@ -60,11 +60,8 @@ logical_plan 01)Aggregate: groupBy=[[parquet_table.column1]], aggr=[[sum(CAST(parquet_table.column2 AS Int64))]] 02)--TableScan: parquet_table projection=[column1, column2] physical_plan -01)AggregateExec: mode=FinalPartitioned, gby=[column1@0 as column1], aggr=[sum(parquet_table.column2)] -02)--CoalesceBatchesExec: target_batch_size=8192 -03)----RepartitionExec: partitioning=Hash([column1@0], 4), input_partitions=1 -04)------AggregateExec: mode=Partial, gby=[column1@0 as column1], aggr=[sum(parquet_table.column2)] -05)--------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition/parquet_table/2.parquet]]}, projection=[column1, column2], file_type=parquet +01)AggregateExec: mode=Single, gby=[column1@0 as column1], aggr=[sum(parquet_table.column2)] +02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition/parquet_table/2.parquet]]}, projection=[column1, column2], file_type=parquet # Cleanup