Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 24 additions & 1 deletion datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -800,7 +800,11 @@ impl DefaultPhysicalPlanner {

let can_repartition = !groups.is_empty()
&& session_state.config().target_partitions() > 1
&& session_state.config().repartition_aggregations();
&& session_state.config().repartition_aggregations()
&& has_sufficient_rows_for_repartition(
initial_aggr.input(),
session_state,
)?;

// Some aggregators may be modified during initialization for
// optimization purposes. For example, a FIRST_VALUE may turn
Expand Down Expand Up @@ -1578,6 +1582,25 @@ impl DefaultPhysicalPlanner {
}
}

fn has_sufficient_rows_for_repartition(
input: &Arc<dyn ExecutionPlan>,
session_state: &SessionState,
) -> Result<bool> {
// Get partition statistics, default to repartitioning if unavailable
let stats = match input.partition_statistics(None) {
Ok(s) => s,
Err(_) => return Ok(true),
};

if let Some(num_rows) = stats.num_rows.get_value().copied() {
let batch_size = session_state.config().batch_size();

return Ok(num_rows >= batch_size);
}

Ok(true)
}

/// Expand and align a GROUPING SET expression.
/// (see <https://www.postgresql.org/docs/current/queries-table-expressions.html#QUERIES-GROUPING-SETS>)
///
Expand Down
7 changes: 2 additions & 5 deletions datafusion/sqllogictest/test_files/repartition.slt
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,8 @@ logical_plan
01)Aggregate: groupBy=[[parquet_table.column1]], aggr=[[sum(CAST(parquet_table.column2 AS Int64))]]
02)--TableScan: parquet_table projection=[column1, column2]
physical_plan
01)AggregateExec: mode=FinalPartitioned, gby=[column1@0 as column1], aggr=[sum(parquet_table.column2)]
02)--CoalesceBatchesExec: target_batch_size=8192
03)----RepartitionExec: partitioning=Hash([column1@0], 4), input_partitions=1
04)------AggregateExec: mode=Partial, gby=[column1@0 as column1], aggr=[sum(parquet_table.column2)]
05)--------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition/parquet_table/2.parquet]]}, projection=[column1, column2], file_type=parquet
01)AggregateExec: mode=Single, gby=[column1@0 as column1], aggr=[sum(parquet_table.column2)]
02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition/parquet_table/2.parquet]]}, projection=[column1, column2], file_type=parquet


# Cleanup
Expand Down
Loading