diff --git a/datafusion/core/tests/dataframe/mod.rs b/datafusion/core/tests/dataframe/mod.rs index b856a776c864..dbfc2c5da367 100644 --- a/datafusion/core/tests/dataframe/mod.rs +++ b/datafusion/core/tests/dataframe/mod.rs @@ -3302,30 +3302,33 @@ async fn test_count_wildcard_on_where_scalar_subquery() -> Result<()> { assert_snapshot!( pretty_format_batches(&sql_results).unwrap(), @r" - +---------------+-------------------------------------------------------------------------------------------------------------------------+ - | plan_type | plan | - +---------------+-------------------------------------------------------------------------------------------------------------------------+ - | logical_plan | Projection: t1.a, t1.b | - | | Filter: CASE WHEN __scalar_sq_1.__always_true IS NULL THEN Int64(0) ELSE __scalar_sq_1.count(*) END > Int64(0) | - | | Projection: t1.a, t1.b, __scalar_sq_1.count(*), __scalar_sq_1.__always_true | - | | Left Join: t1.a = __scalar_sq_1.a | - | | TableScan: t1 projection=[a, b] | - | | SubqueryAlias: __scalar_sq_1 | - | | Projection: count(Int64(1)) AS count(*), t2.a, Boolean(true) AS __always_true | - | | Aggregate: groupBy=[[t2.a]], aggr=[[count(Int64(1))]] | - | | TableScan: t2 projection=[a] | - | physical_plan | FilterExec: CASE WHEN __always_true@3 IS NULL THEN 0 ELSE count(*)@2 END > 0, projection=[a@0, b@1] | - | | CoalesceBatchesExec: target_batch_size=8192 | - | | HashJoinExec: mode=CollectLeft, join_type=Left, on=[(a@0, a@1)], projection=[a@0, b@1, count(*)@2, __always_true@4] | - | | DataSourceExec: partitions=1, partition_sizes=[1] | - | | ProjectionExec: expr=[count(Int64(1))@1 as count(*), a@0 as a, true as __always_true] | - | | AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[count(Int64(1))] | - | | CoalesceBatchesExec: target_batch_size=8192 | - | | RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=1 | - | | AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[count(Int64(1))] | - | | DataSourceExec: partitions=1, partition_sizes=[1] | - | | | - +---------------+-------------------------------------------------------------------------------------------------------------------------+ + +---------------+------------------------------------------------------------------------------------------------------------------------------+ + | plan_type | plan | + +---------------+------------------------------------------------------------------------------------------------------------------------------+ + | logical_plan | Projection: t1.a, t1.b | + | | Filter: CASE WHEN __scalar_sq_1.__always_true IS NULL THEN Int64(0) ELSE __scalar_sq_1.count(*) END > Int64(0) | + | | Projection: t1.a, t1.b, __scalar_sq_1.count(*), __scalar_sq_1.__always_true | + | | Left Join: t1.a = __scalar_sq_1.a | + | | TableScan: t1 projection=[a, b] | + | | SubqueryAlias: __scalar_sq_1 | + | | Projection: count(Int64(1)) AS count(*), t2.a, Boolean(true) AS __always_true | + | | Aggregate: groupBy=[[t2.a]], aggr=[[count(Int64(1))]] | + | | TableScan: t2 projection=[a] | + | physical_plan | FilterExec: CASE WHEN __always_true@3 IS NULL THEN 0 ELSE count(*)@2 END > 0, projection=[a@0, b@1] | + | | RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 | + | | ProjectionExec: expr=[a@2 as a, b@3 as b, count(*)@0 as count(*), __always_true@1 as __always_true] | + | | CoalesceBatchesExec: target_batch_size=8192 | + | | HashJoinExec: mode=CollectLeft, join_type=Right, on=[(a@1, a@0)], projection=[count(*)@0, __always_true@2, a@3, b@4] | + | | CoalescePartitionsExec | + | | ProjectionExec: expr=[count(Int64(1))@1 as count(*), a@0 as a, true as __always_true] | + | | AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[count(Int64(1))] | + | | CoalesceBatchesExec: target_batch_size=8192 | + | | RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=1 | + | | AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[count(Int64(1))] | + | | DataSourceExec: partitions=1, partition_sizes=[1] | + | | DataSourceExec: partitions=1, partition_sizes=[1] | + | | | + +---------------+------------------------------------------------------------------------------------------------------------------------------+ " ); @@ -3357,30 +3360,33 @@ async fn test_count_wildcard_on_where_scalar_subquery() -> Result<()> { assert_snapshot!( pretty_format_batches(&df_results).unwrap(), @r" - +---------------+-------------------------------------------------------------------------------------------------------------------------+ - | plan_type | plan | - +---------------+-------------------------------------------------------------------------------------------------------------------------+ - | logical_plan | Projection: t1.a, t1.b | - | | Filter: CASE WHEN __scalar_sq_1.__always_true IS NULL THEN Int64(0) ELSE __scalar_sq_1.count(*) END > Int64(0) | - | | Projection: t1.a, t1.b, __scalar_sq_1.count(*), __scalar_sq_1.__always_true | - | | Left Join: t1.a = __scalar_sq_1.a | - | | TableScan: t1 projection=[a, b] | - | | SubqueryAlias: __scalar_sq_1 | - | | Projection: count(*), t2.a, Boolean(true) AS __always_true | - | | Aggregate: groupBy=[[t2.a]], aggr=[[count(Int64(1)) AS count(*)]] | - | | TableScan: t2 projection=[a] | - | physical_plan | FilterExec: CASE WHEN __always_true@3 IS NULL THEN 0 ELSE count(*)@2 END > 0, projection=[a@0, b@1] | - | | CoalesceBatchesExec: target_batch_size=8192 | - | | HashJoinExec: mode=CollectLeft, join_type=Left, on=[(a@0, a@1)], projection=[a@0, b@1, count(*)@2, __always_true@4] | - | | DataSourceExec: partitions=1, partition_sizes=[1] | - | | ProjectionExec: expr=[count(*)@1 as count(*), a@0 as a, true as __always_true] | - | | AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[count(*)] | - | | CoalesceBatchesExec: target_batch_size=8192 | - | | RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=1 | - | | AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[count(*)] | - | | DataSourceExec: partitions=1, partition_sizes=[1] | - | | | - +---------------+-------------------------------------------------------------------------------------------------------------------------+ + +---------------+------------------------------------------------------------------------------------------------------------------------------+ + | plan_type | plan | + +---------------+------------------------------------------------------------------------------------------------------------------------------+ + | logical_plan | Projection: t1.a, t1.b | + | | Filter: CASE WHEN __scalar_sq_1.__always_true IS NULL THEN Int64(0) ELSE __scalar_sq_1.count(*) END > Int64(0) | + | | Projection: t1.a, t1.b, __scalar_sq_1.count(*), __scalar_sq_1.__always_true | + | | Left Join: t1.a = __scalar_sq_1.a | + | | TableScan: t1 projection=[a, b] | + | | SubqueryAlias: __scalar_sq_1 | + | | Projection: count(*), t2.a, Boolean(true) AS __always_true | + | | Aggregate: groupBy=[[t2.a]], aggr=[[count(Int64(1)) AS count(*)]] | + | | TableScan: t2 projection=[a] | + | physical_plan | FilterExec: CASE WHEN __always_true@3 IS NULL THEN 0 ELSE count(*)@2 END > 0, projection=[a@0, b@1] | + | | RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 | + | | ProjectionExec: expr=[a@2 as a, b@3 as b, count(*)@0 as count(*), __always_true@1 as __always_true] | + | | CoalesceBatchesExec: target_batch_size=8192 | + | | HashJoinExec: mode=CollectLeft, join_type=Right, on=[(a@1, a@0)], projection=[count(*)@0, __always_true@2, a@3, b@4] | + | | CoalescePartitionsExec | + | | ProjectionExec: expr=[count(*)@1 as count(*), a@0 as a, true as __always_true] | + | | AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[count(*)] | + | | CoalesceBatchesExec: target_batch_size=8192 | + | | RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=1 | + | | AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[count(*)] | + | | DataSourceExec: partitions=1, partition_sizes=[1] | + | | DataSourceExec: partitions=1, partition_sizes=[1] | + | | | + +---------------+------------------------------------------------------------------------------------------------------------------------------+ " ); diff --git a/datafusion/core/tests/physical_optimizer/partition_statistics.rs b/datafusion/core/tests/physical_optimizer/partition_statistics.rs index 7034b71fd500..7045cb8ea133 100644 --- a/datafusion/core/tests/physical_optimizer/partition_statistics.rs +++ b/datafusion/core/tests/physical_optimizer/partition_statistics.rs @@ -627,7 +627,7 @@ mod test { let expected_p0_statistics = Statistics { num_rows: Precision::Inexact(2), - total_byte_size: Precision::Absent, + total_byte_size: Precision::Inexact(110), column_statistics: vec![ ColumnStatistics { null_count: Precision::Absent, @@ -645,7 +645,7 @@ mod test { let expected_p1_statistics = Statistics { num_rows: Precision::Inexact(2), - total_byte_size: Precision::Absent, + total_byte_size: Precision::Inexact(110), column_statistics: vec![ ColumnStatistics { null_count: Precision::Absent, diff --git a/datafusion/physical-plan/src/aggregates/mod.rs b/datafusion/physical-plan/src/aggregates/mod.rs index 6bf59fd3d303..f175fd4cdba4 100644 --- a/datafusion/physical-plan/src/aggregates/mod.rs +++ b/datafusion/physical-plan/src/aggregates/mod.rs @@ -792,10 +792,13 @@ impl AggregateExec { AggregateMode::Final | AggregateMode::FinalPartitioned if self.group_by.expr.is_empty() => { + let total_byte_size = + Self::calculate_scaled_byte_size(child_statistics, 1); + Ok(Statistics { num_rows: Precision::Exact(1), column_statistics, - total_byte_size: Precision::Absent, + total_byte_size, }) } _ => { @@ -815,14 +818,48 @@ impl AggregateExec { } else { Precision::Absent }; + + let total_byte_size = num_rows + .get_value() + .and_then(|&output_rows| { + Self::calculate_scaled_byte_size(child_statistics, output_rows) + .get_value() + .map(|&bytes| Precision::Inexact(bytes)) + }) + .unwrap_or(Precision::Absent); + Ok(Statistics { num_rows, column_statistics, - total_byte_size: Precision::Absent, + total_byte_size, }) } } } + + /// Calculate scaled byte size based on row count ratio. + /// Returns `Precision::Absent` if input statistics are insufficient. + /// Returns `Precision::Inexact` with the scaled value otherwise. + /// + /// This is a simple heuristic that assumes uniform row sizes. + #[inline] + fn calculate_scaled_byte_size( + input_stats: &Statistics, + target_row_count: usize, + ) -> Precision { + match ( + input_stats.num_rows.get_value(), + input_stats.total_byte_size.get_value(), + ) { + (Some(&input_rows), Some(&input_bytes)) if input_rows > 0 => { + let bytes_per_row = input_bytes as f64 / input_rows as f64; + let scaled_bytes = + (bytes_per_row * target_row_count as f64).ceil() as usize; + Precision::Inexact(scaled_bytes) + } + _ => Precision::Absent, + } + } } impl DisplayAs for AggregateExec { @@ -1921,6 +1958,10 @@ mod tests { input_schema, )?); + // Verify statistics are preserved proportionally through aggregation + let final_stats = merged_aggregate.partition_statistics(None)?; + assert!(final_stats.total_byte_size.get_value().is_some()); + let task_ctx = if spill { // enlarge memory limit to let the final aggregation finish new_spill_ctx(2, 2600) @@ -3146,4 +3187,77 @@ mod tests { run_test_with_spill_pool_if_necessary(20_000, false).await?; Ok(()) } + + #[tokio::test] + async fn test_aggregate_statistics_edge_cases() -> Result<()> { + use crate::test::exec::StatisticsExec; + use datafusion_common::ColumnStatistics; + + let schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Int32, false), + Field::new("b", DataType::Float64, false), + ])); + + // Test 1: Absent statistics remain absent + let input = Arc::new(StatisticsExec::new( + Statistics { + num_rows: Precision::Exact(100), + total_byte_size: Precision::Absent, + column_statistics: vec![ + ColumnStatistics::new_unknown(), + ColumnStatistics::new_unknown(), + ], + }, + (*schema).clone(), + )) as Arc; + + let agg = Arc::new(AggregateExec::try_new( + AggregateMode::Final, + PhysicalGroupBy::default(), + vec![Arc::new( + AggregateExprBuilder::new(count_udaf(), vec![col("a", &schema)?]) + .schema(Arc::clone(&schema)) + .alias("COUNT(a)") + .build()?, + )], + vec![None], + input, + Arc::clone(&schema), + )?); + + let stats = agg.partition_statistics(None)?; + assert_eq!(stats.total_byte_size, Precision::Absent); + + // Test 2: Zero rows returns Absent (can't estimate output size from zero input) + let input_zero = Arc::new(StatisticsExec::new( + Statistics { + num_rows: Precision::Exact(0), + total_byte_size: Precision::Exact(0), + column_statistics: vec![ + ColumnStatistics::new_unknown(), + ColumnStatistics::new_unknown(), + ], + }, + (*schema).clone(), + )) as Arc; + + let agg_zero = Arc::new(AggregateExec::try_new( + AggregateMode::Final, + PhysicalGroupBy::default(), + vec![Arc::new( + AggregateExprBuilder::new(count_udaf(), vec![col("a", &schema)?]) + .schema(Arc::clone(&schema)) + .alias("COUNT(a)") + .build()?, + )], + vec![None], + input_zero, + Arc::clone(&schema), + )?); + + let stats_zero = agg_zero.partition_statistics(None)?; + assert_eq!(stats_zero.total_byte_size, Precision::Absent); + + Ok(()) + } } diff --git a/datafusion/sqllogictest/test_files/union.slt b/datafusion/sqllogictest/test_files/union.slt index 9e63f79f4545..c20598239c94 100644 --- a/datafusion/sqllogictest/test_files/union.slt +++ b/datafusion/sqllogictest/test_files/union.slt @@ -307,17 +307,17 @@ logical_plan physical_plan 01)UnionExec 02)--CoalesceBatchesExec: target_batch_size=2 -03)----HashJoinExec: mode=CollectLeft, join_type=LeftAnti, on=[(id@0, CAST(t2.id AS Int32)@2), (name@1, name@1)], NullsEqual: true +03)----HashJoinExec: mode=CollectLeft, join_type=RightAnti, on=[(CAST(t2.id AS Int32)@2, id@0), (name@1, name@1)], NullsEqual: true 04)------CoalescePartitionsExec -05)--------AggregateExec: mode=FinalPartitioned, gby=[id@0 as id, name@1 as name], aggr=[] -06)----------CoalesceBatchesExec: target_batch_size=2 -07)------------RepartitionExec: partitioning=Hash([id@0, name@1], 4), input_partitions=4 -08)--------------AggregateExec: mode=Partial, gby=[id@0 as id, name@1 as name], aggr=[] -09)----------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 -10)------------------DataSourceExec: partitions=1, partition_sizes=[1] -11)------ProjectionExec: expr=[id@0 as id, name@1 as name, CAST(id@0 AS Int32) as CAST(t2.id AS Int32)] -12)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 -13)----------DataSourceExec: partitions=1, partition_sizes=[1] +05)--------ProjectionExec: expr=[id@0 as id, name@1 as name, CAST(id@0 AS Int32) as CAST(t2.id AS Int32)] +06)----------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +07)------------DataSourceExec: partitions=1, partition_sizes=[1] +08)------AggregateExec: mode=FinalPartitioned, gby=[id@0 as id, name@1 as name], aggr=[] +09)--------CoalesceBatchesExec: target_batch_size=2 +10)----------RepartitionExec: partitioning=Hash([id@0, name@1], 4), input_partitions=4 +11)------------AggregateExec: mode=Partial, gby=[id@0 as id, name@1 as name], aggr=[] +12)--------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +13)----------------DataSourceExec: partitions=1, partition_sizes=[1] 14)--ProjectionExec: expr=[CAST(id@0 AS Int32) as id, name@1 as name] 15)----CoalesceBatchesExec: target_batch_size=2 16)------HashJoinExec: mode=CollectLeft, join_type=LeftAnti, on=[(CAST(t2.id AS Int32)@2, id@0), (name@1, name@1)], projection=[id@0, name@1], NullsEqual: true