Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions datafusion/core/tests/physical_optimizer/partition_statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -985,6 +985,43 @@ mod test {
scan_schema.clone(),
)?);

let expect_partial_stat = Statistics {
num_rows: Precision::Exact(1),
total_byte_size: Precision::Absent,
column_statistics: vec![ColumnStatistics::new_unknown()],
};
assert_eq!(
expect_partial_stat,
*agg_partial.partition_statistics(Some(0))?
);
assert_eq!(
expect_partial_stat,
*agg_partial.partition_statistics(Some(1))?
);

let expect_partial_overall_stat = Statistics {
num_rows: Precision::Exact(2),
total_byte_size: Precision::Absent,
column_statistics: vec![ColumnStatistics::new_unknown()],
};
assert_eq!(
expect_partial_overall_stat,
*agg_partial.partition_statistics(None)?
);

// Verify that the partial aggregate emits one accumulator-state row per
// output partition, even when the corresponding input partitions are empty.
let partitions = execute_stream_partitioned(
agg_partial.clone(),
Arc::new(TaskContext::default()),
)?;
assert_eq!(2, partitions.len());
for partition_stream in partitions {
let result: Vec<RecordBatch> = partition_stream.try_collect().await?;
let rows = result.iter().map(|batch| batch.num_rows()).sum::<usize>();
assert_eq!(1, rows);
}

let coalesce = Arc::new(CoalescePartitionsExec::new(agg_partial.clone()));

let agg_final = Arc::new(AggregateExec::try_new(
Expand Down
197 changes: 178 additions & 19 deletions datafusion/physical-plan/src/aggregates/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -831,6 +831,7 @@ impl AggregateExec {
&input,
Arc::clone(&schema),
&group_expr_mapping,
group_by.is_true_no_grouping(),
&mode,
&input_order_mode,
aggr_expr.as_ref(),
Expand Down Expand Up @@ -1021,6 +1022,7 @@ impl AggregateExec {
input: &Arc<dyn ExecutionPlan>,
schema: SchemaRef,
group_expr_mapping: &ProjectionMapping,
is_true_no_grouping: bool,
mode: &AggregateMode,
input_order_mode: &InputOrderMode,
aggr_exprs: &[Arc<AggregateFunctionExpr>],
Expand All @@ -1030,9 +1032,12 @@ impl AggregateExec {
.equivalence_properties()
.project(group_expr_mapping, schema);

// If the group by is empty, then we ensure that the operator will produce
// only one row, and mark the generated result as a constant value.
if group_expr_mapping.is_empty() {
// True no-group aggregates produce only one row in each output
// partition, so aggregate outputs are constants within the partition.
// Grouping sets with empty grouping expressions are not covered here:
// their output schema can include grouping-set columns before the
// aggregate columns, so this aggregate-column mapping does not apply.
if is_true_no_grouping {
let new_constants = aggr_exprs.iter().enumerate().map(|(idx, func)| {
let column = Arc::new(Column::new(func.name(), idx));
ConstExpr::from(column as Arc<dyn PhysicalExpr>)
Expand Down Expand Up @@ -1091,6 +1096,11 @@ impl AggregateExec {

/// Estimates output statistics for this aggregate node.
///
/// For aggregations without group-by expressions, row count follows the
/// number of logical aggregate rows and the aggregate output mode. True
/// no-group aggregates have one logical row; empty grouping sets have one
/// logical row per grouping-set occurrence.
///
/// For grouped aggregations with known input row count > 1, the output row
/// count is estimated as:
///
Expand Down Expand Up @@ -1128,7 +1138,11 @@ impl AggregateExec {
/// - Per-set products are summed across all grouping sets
/// - Requires NDV stats for ALL active group-by columns; if any lacks stats,
/// falls back to `input_rows` (or `Absent` if that is also unknown)
fn statistics_inner(&self, child_statistics: &Statistics) -> Result<Statistics> {
fn statistics_inner(
&self,
child_statistics: &Statistics,
partition: Option<usize>,
) -> Result<Statistics> {
// TODO stats: group expressions:
// - once expressions will be able to compute their own stats, use it here
// - case where we group by on a column for which with have the `distinct` stat
Expand All @@ -1152,20 +1166,18 @@ impl AggregateExec {

column_statistics
};
match self.mode {
AggregateMode::Final | AggregateMode::FinalPartitioned
if self.group_by.expr.is_empty() =>
{
match self.exact_output_rows_without_group_exprs(partition) {
Some(output_rows) => {
let total_byte_size =
Self::calculate_scaled_byte_size(child_statistics, 1);
Self::calculate_scaled_byte_size(child_statistics, output_rows);

Ok(Statistics {
num_rows: Precision::Exact(1),
num_rows: Precision::Exact(output_rows),
column_statistics,
total_byte_size,
})
}
_ => {
None => {
let num_rows = self.estimate_num_rows(child_statistics);

let total_byte_size = num_rows
Expand All @@ -1186,6 +1198,51 @@ impl AggregateExec {
}
}

/// Exact physical output row count for aggregates without group-by
/// expressions.
///
/// `partition` follows [`ExecutionPlan::partition_statistics`]: `Some(_)`
/// requests one output partition, while `None` requests the entire plan.
/// Partial-state output contains the logical rows in each output partition;
/// final-value output contains the global logical rows once.
/// This mirrors execution, where partial aggregation without group-by
/// expressions emits its logical rows from every output partition, including
/// empty input partitions.
///
/// Returns `None` when grouping expressions are present and grouped
/// cardinality estimation should be used instead.
fn exact_output_rows_without_group_exprs(
&self,
partition: Option<usize>,
) -> Option<usize> {
let logical_rows = self.logical_rows_without_group_exprs()?;

Some(match (self.mode.output_mode(), partition) {
(AggregateOutputMode::Final, _) => logical_rows,
(AggregateOutputMode::Partial, Some(_)) => logical_rows,
(AggregateOutputMode::Partial, None) => {
logical_rows * self.cache.output_partitioning().partition_count()
}
})
}

/// Exact number of logical aggregate rows for aggregates without group-by
/// expressions.
///
/// A true no-group aggregate has one logical aggregate row. Empty grouping
/// sets have one logical aggregate row per grouping-set occurrence, even
/// when there are duplicate empty grouping sets. Returns `None` when there
/// are grouping expressions.
fn logical_rows_without_group_exprs(&self) -> Option<usize> {
if self.group_by.is_true_no_grouping() {
Some(1)
} else if self.group_by.expr.is_empty() {
Some(self.group_by.groups.len())
} else {
None
}
}

/// Estimates the output row count for grouped aggregations, combining NDV,
/// input row count, and TopK limit into a single [`Precision<usize>`].
fn estimate_num_rows(&self, child_statistics: &Statistics) -> Precision<usize> {
Expand Down Expand Up @@ -1614,7 +1671,9 @@ impl ExecutionPlan for AggregateExec {

fn partition_statistics(&self, partition: Option<usize>) -> Result<Arc<Statistics>> {
let child_statistics = self.input().partition_statistics(partition)?;
Ok(Arc::new(self.statistics_inner(&child_statistics)?))
Ok(Arc::new(
self.statistics_inner(&child_statistics, partition)?,
))
}

fn cardinality_effect(&self) -> CardinalityEffect {
Expand Down Expand Up @@ -4119,6 +4178,26 @@ mod tests {
let stats_zero = agg_zero.partition_statistics(None)?;
assert_eq!(stats_zero.total_byte_size, Precision::Absent);

let single_input =
Arc::new(EmptyExec::new(Arc::clone(&schema))) as Arc<dyn ExecutionPlan>;
let single_agg_zero = AggregateExec::try_new(
AggregateMode::Single,
PhysicalGroupBy::default(),
vec![count_a_aggregate(&schema)?],
vec![None],
single_input,
Arc::clone(&schema),
)?;
assert_eq!(
single_agg_zero
.properties()
.output_partitioning()
.partition_count(),
1
);
let single_stats_zero = single_agg_zero.partition_statistics(None)?;
assert_eq!(single_stats_zero.num_rows, Precision::Exact(1));

Ok(())
}

Expand All @@ -4127,19 +4206,39 @@ mod tests {
stats: Statistics,
group_by: PhysicalGroupBy,
limit: Option<LimitOptions>,
) -> Result<AggregateExec> {
build_test_aggregate_with_mode(
schema,
stats,
group_by,
limit,
AggregateMode::Final,
)
}

fn count_a_aggregate(schema: &SchemaRef) -> Result<Arc<AggregateFunctionExpr>> {
Ok(Arc::new(
AggregateExprBuilder::new(count_udaf(), vec![col("a", schema)?])
.schema(Arc::clone(schema))
.alias("COUNT(a)")
.build()?,
))
}

fn build_test_aggregate_with_mode(
schema: &SchemaRef,
stats: Statistics,
group_by: PhysicalGroupBy,
limit: Option<LimitOptions>,
mode: AggregateMode,
) -> Result<AggregateExec> {
let input = Arc::new(StatisticsExec::new(stats, (**schema).clone()))
as Arc<dyn ExecutionPlan>;

let mut agg = AggregateExec::try_new(
AggregateMode::Final,
mode,
group_by,
vec![Arc::new(
AggregateExprBuilder::new(count_udaf(), vec![col("a", schema)?])
.schema(Arc::clone(schema))
.alias("COUNT(a)")
.build()?,
)],
vec![count_a_aggregate(schema)?],
vec![None],
input,
Arc::clone(schema),
Expand Down Expand Up @@ -4574,6 +4673,66 @@ mod tests {
Ok(())
}

#[tokio::test]
async fn test_aggregate_stats_duplicate_empty_grouping_sets() -> Result<()> {
let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, true)]));

let duplicate_empty_grouping_sets =
PhysicalGroupBy::new(vec![], vec![], vec![vec![], vec![]], true);

let single_input =
Arc::new(EmptyExec::new(Arc::clone(&schema))) as Arc<dyn ExecutionPlan>;
let single_agg = AggregateExec::try_new(
AggregateMode::Single,
duplicate_empty_grouping_sets.clone(),
vec![count_a_aggregate(&schema)?],
vec![None],
single_input,
Arc::clone(&schema),
)?;
assert_eq!(
single_agg.partition_statistics(None)?.num_rows,
Precision::Exact(2)
);

let partial_input =
Arc::new(EmptyExec::new(Arc::clone(&schema)).with_partitions(2))
as Arc<dyn ExecutionPlan>;
let partial_agg = Arc::new(AggregateExec::try_new(
AggregateMode::Partial,
duplicate_empty_grouping_sets,
vec![count_a_aggregate(&schema)?],
vec![None],
partial_input,
Arc::clone(&schema),
)?);

assert_eq!(
partial_agg
.properties()
.output_partitioning()
.partition_count(),
2
);
let task_ctx = Arc::new(TaskContext::default());
for partition in 0..2 {
assert_eq!(
partial_agg.partition_statistics(Some(partition))?.num_rows,
Precision::Exact(2)
);
let result =
collect(partial_agg.execute(partition, Arc::clone(&task_ctx))?).await?;
assert_eq!(result.iter().map(RecordBatch::num_rows).sum::<usize>(), 2);
}

assert_eq!(
partial_agg.partition_statistics(None)?.num_rows,
Precision::Exact(4)
);

Ok(())
}

#[test]
fn test_aggregate_stats_non_column_expr_bails_out() -> Result<()> {
use datafusion_common::ColumnStatistics;
Expand Down
Loading