diff --git a/datafusion/core/tests/physical_optimizer/partition_statistics.rs b/datafusion/core/tests/physical_optimizer/partition_statistics.rs index 181b7de7d9f71..2c009792fe126 100644 --- a/datafusion/core/tests/physical_optimizer/partition_statistics.rs +++ b/datafusion/core/tests/physical_optimizer/partition_statistics.rs @@ -985,6 +985,43 @@ mod test { scan_schema.clone(), )?); + let expect_partial_stat = Statistics { + num_rows: Precision::Exact(1), + total_byte_size: Precision::Absent, + column_statistics: vec![ColumnStatistics::new_unknown()], + }; + assert_eq!( + expect_partial_stat, + *agg_partial.partition_statistics(Some(0))? + ); + assert_eq!( + expect_partial_stat, + *agg_partial.partition_statistics(Some(1))? + ); + + let expect_partial_overall_stat = Statistics { + num_rows: Precision::Exact(2), + total_byte_size: Precision::Absent, + column_statistics: vec![ColumnStatistics::new_unknown()], + }; + assert_eq!( + expect_partial_overall_stat, + *agg_partial.partition_statistics(None)? + ); + + // Verify that the partial aggregate emits one accumulator-state row per + // output partition, even when the corresponding input partitions are empty. + let partitions = execute_stream_partitioned( + agg_partial.clone(), + Arc::new(TaskContext::default()), + )?; + assert_eq!(2, partitions.len()); + for partition_stream in partitions { + let result: Vec = partition_stream.try_collect().await?; + let rows = result.iter().map(|batch| batch.num_rows()).sum::(); + assert_eq!(1, rows); + } + let coalesce = Arc::new(CoalescePartitionsExec::new(agg_partial.clone())); let agg_final = Arc::new(AggregateExec::try_new( diff --git a/datafusion/physical-plan/src/aggregates/mod.rs b/datafusion/physical-plan/src/aggregates/mod.rs index c8b825d576e02..b2a54b5eba8a6 100644 --- a/datafusion/physical-plan/src/aggregates/mod.rs +++ b/datafusion/physical-plan/src/aggregates/mod.rs @@ -831,6 +831,7 @@ impl AggregateExec { &input, Arc::clone(&schema), &group_expr_mapping, + group_by.is_true_no_grouping(), &mode, &input_order_mode, aggr_expr.as_ref(), @@ -1021,6 +1022,7 @@ impl AggregateExec { input: &Arc, schema: SchemaRef, group_expr_mapping: &ProjectionMapping, + is_true_no_grouping: bool, mode: &AggregateMode, input_order_mode: &InputOrderMode, aggr_exprs: &[Arc], @@ -1030,9 +1032,12 @@ impl AggregateExec { .equivalence_properties() .project(group_expr_mapping, schema); - // If the group by is empty, then we ensure that the operator will produce - // only one row, and mark the generated result as a constant value. - if group_expr_mapping.is_empty() { + // True no-group aggregates produce only one row in each output + // partition, so aggregate outputs are constants within the partition. + // Grouping sets with empty grouping expressions are not covered here: + // their output schema can include grouping-set columns before the + // aggregate columns, so this aggregate-column mapping does not apply. + if is_true_no_grouping { let new_constants = aggr_exprs.iter().enumerate().map(|(idx, func)| { let column = Arc::new(Column::new(func.name(), idx)); ConstExpr::from(column as Arc) @@ -1091,6 +1096,11 @@ impl AggregateExec { /// Estimates output statistics for this aggregate node. /// + /// For aggregations without group-by expressions, row count follows the + /// number of logical aggregate rows and the aggregate output mode. True + /// no-group aggregates have one logical row; empty grouping sets have one + /// logical row per grouping-set occurrence. + /// /// For grouped aggregations with known input row count > 1, the output row /// count is estimated as: /// @@ -1128,7 +1138,11 @@ impl AggregateExec { /// - Per-set products are summed across all grouping sets /// - Requires NDV stats for ALL active group-by columns; if any lacks stats, /// falls back to `input_rows` (or `Absent` if that is also unknown) - fn statistics_inner(&self, child_statistics: &Statistics) -> Result { + fn statistics_inner( + &self, + child_statistics: &Statistics, + partition: Option, + ) -> Result { // TODO stats: group expressions: // - once expressions will be able to compute their own stats, use it here // - case where we group by on a column for which with have the `distinct` stat @@ -1152,20 +1166,18 @@ impl AggregateExec { column_statistics }; - match self.mode { - AggregateMode::Final | AggregateMode::FinalPartitioned - if self.group_by.expr.is_empty() => - { + match self.exact_output_rows_without_group_exprs(partition) { + Some(output_rows) => { let total_byte_size = - Self::calculate_scaled_byte_size(child_statistics, 1); + Self::calculate_scaled_byte_size(child_statistics, output_rows); Ok(Statistics { - num_rows: Precision::Exact(1), + num_rows: Precision::Exact(output_rows), column_statistics, total_byte_size, }) } - _ => { + None => { let num_rows = self.estimate_num_rows(child_statistics); let total_byte_size = num_rows @@ -1186,6 +1198,51 @@ impl AggregateExec { } } + /// Exact physical output row count for aggregates without group-by + /// expressions. + /// + /// `partition` follows [`ExecutionPlan::partition_statistics`]: `Some(_)` + /// requests one output partition, while `None` requests the entire plan. + /// Partial-state output contains the logical rows in each output partition; + /// final-value output contains the global logical rows once. + /// This mirrors execution, where partial aggregation without group-by + /// expressions emits its logical rows from every output partition, including + /// empty input partitions. + /// + /// Returns `None` when grouping expressions are present and grouped + /// cardinality estimation should be used instead. + fn exact_output_rows_without_group_exprs( + &self, + partition: Option, + ) -> Option { + let logical_rows = self.logical_rows_without_group_exprs()?; + + Some(match (self.mode.output_mode(), partition) { + (AggregateOutputMode::Final, _) => logical_rows, + (AggregateOutputMode::Partial, Some(_)) => logical_rows, + (AggregateOutputMode::Partial, None) => { + logical_rows * self.cache.output_partitioning().partition_count() + } + }) + } + + /// Exact number of logical aggregate rows for aggregates without group-by + /// expressions. + /// + /// A true no-group aggregate has one logical aggregate row. Empty grouping + /// sets have one logical aggregate row per grouping-set occurrence, even + /// when there are duplicate empty grouping sets. Returns `None` when there + /// are grouping expressions. + fn logical_rows_without_group_exprs(&self) -> Option { + if self.group_by.is_true_no_grouping() { + Some(1) + } else if self.group_by.expr.is_empty() { + Some(self.group_by.groups.len()) + } else { + None + } + } + /// Estimates the output row count for grouped aggregations, combining NDV, /// input row count, and TopK limit into a single [`Precision`]. fn estimate_num_rows(&self, child_statistics: &Statistics) -> Precision { @@ -1614,7 +1671,9 @@ impl ExecutionPlan for AggregateExec { fn partition_statistics(&self, partition: Option) -> Result> { let child_statistics = self.input().partition_statistics(partition)?; - Ok(Arc::new(self.statistics_inner(&child_statistics)?)) + Ok(Arc::new( + self.statistics_inner(&child_statistics, partition)?, + )) } fn cardinality_effect(&self) -> CardinalityEffect { @@ -4119,6 +4178,26 @@ mod tests { let stats_zero = agg_zero.partition_statistics(None)?; assert_eq!(stats_zero.total_byte_size, Precision::Absent); + let single_input = + Arc::new(EmptyExec::new(Arc::clone(&schema))) as Arc; + let single_agg_zero = AggregateExec::try_new( + AggregateMode::Single, + PhysicalGroupBy::default(), + vec![count_a_aggregate(&schema)?], + vec![None], + single_input, + Arc::clone(&schema), + )?; + assert_eq!( + single_agg_zero + .properties() + .output_partitioning() + .partition_count(), + 1 + ); + let single_stats_zero = single_agg_zero.partition_statistics(None)?; + assert_eq!(single_stats_zero.num_rows, Precision::Exact(1)); + Ok(()) } @@ -4127,19 +4206,39 @@ mod tests { stats: Statistics, group_by: PhysicalGroupBy, limit: Option, + ) -> Result { + build_test_aggregate_with_mode( + schema, + stats, + group_by, + limit, + AggregateMode::Final, + ) + } + + fn count_a_aggregate(schema: &SchemaRef) -> Result> { + Ok(Arc::new( + AggregateExprBuilder::new(count_udaf(), vec![col("a", schema)?]) + .schema(Arc::clone(schema)) + .alias("COUNT(a)") + .build()?, + )) + } + + fn build_test_aggregate_with_mode( + schema: &SchemaRef, + stats: Statistics, + group_by: PhysicalGroupBy, + limit: Option, + mode: AggregateMode, ) -> Result { let input = Arc::new(StatisticsExec::new(stats, (**schema).clone())) as Arc; let mut agg = AggregateExec::try_new( - AggregateMode::Final, + mode, group_by, - vec![Arc::new( - AggregateExprBuilder::new(count_udaf(), vec![col("a", schema)?]) - .schema(Arc::clone(schema)) - .alias("COUNT(a)") - .build()?, - )], + vec![count_a_aggregate(schema)?], vec![None], input, Arc::clone(schema), @@ -4574,6 +4673,66 @@ mod tests { Ok(()) } + #[tokio::test] + async fn test_aggregate_stats_duplicate_empty_grouping_sets() -> Result<()> { + let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, true)])); + + let duplicate_empty_grouping_sets = + PhysicalGroupBy::new(vec![], vec![], vec![vec![], vec![]], true); + + let single_input = + Arc::new(EmptyExec::new(Arc::clone(&schema))) as Arc; + let single_agg = AggregateExec::try_new( + AggregateMode::Single, + duplicate_empty_grouping_sets.clone(), + vec![count_a_aggregate(&schema)?], + vec![None], + single_input, + Arc::clone(&schema), + )?; + assert_eq!( + single_agg.partition_statistics(None)?.num_rows, + Precision::Exact(2) + ); + + let partial_input = + Arc::new(EmptyExec::new(Arc::clone(&schema)).with_partitions(2)) + as Arc; + let partial_agg = Arc::new(AggregateExec::try_new( + AggregateMode::Partial, + duplicate_empty_grouping_sets, + vec![count_a_aggregate(&schema)?], + vec![None], + partial_input, + Arc::clone(&schema), + )?); + + assert_eq!( + partial_agg + .properties() + .output_partitioning() + .partition_count(), + 2 + ); + let task_ctx = Arc::new(TaskContext::default()); + for partition in 0..2 { + assert_eq!( + partial_agg.partition_statistics(Some(partition))?.num_rows, + Precision::Exact(2) + ); + let result = + collect(partial_agg.execute(partition, Arc::clone(&task_ctx))?).await?; + assert_eq!(result.iter().map(RecordBatch::num_rows).sum::(), 2); + } + + assert_eq!( + partial_agg.partition_statistics(None)?.num_rows, + Precision::Exact(4) + ); + + Ok(()) + } + #[test] fn test_aggregate_stats_non_column_expr_bails_out() -> Result<()> { use datafusion_common::ColumnStatistics;