diff --git a/datafusion/physical-expr/src/projection.rs b/datafusion/physical-expr/src/projection.rs index ed4f2ff0bde4..f5ce01ad6c45 100644 --- a/datafusion/physical-expr/src/projection.rs +++ b/datafusion/physical-expr/src/projection.rs @@ -28,11 +28,14 @@ use crate::utils::collect_columns; use arrow::array::{RecordBatch, RecordBatchOptions}; use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; use datafusion_common::stats::{ColumnStatistics, Precision}; -use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; +use datafusion_common::tree_node::{ + Transformed, TransformedResult, TreeNode, TreeNodeRecursion, +}; use datafusion_common::{ Result, ScalarValue, Statistics, assert_or_internal_err, internal_datafusion_err, plan_err, }; +use datafusion_expr_common::interval_arithmetic::Interval; use datafusion_physical_expr_common::metrics::ExecutionPlanMetricsSet; use datafusion_physical_expr_common::metrics::ExpressionEvaluatorMetrics; @@ -714,9 +717,9 @@ impl ProjectionExprs { } } } else { - // TODO stats: estimate more statistics from expressions - // (expressions should compute their statistics themselves) - ColumnStatistics::new_unknown() + // Propagate statistics through expressions (CAST, arithmetic, etc.) + // using the interval arithmetic system (evaluate_bounds). + project_column_statistics_through_expr(expr, &stats.column_statistics) }; column_statistics.push(col_stats); } @@ -726,6 +729,100 @@ impl ProjectionExprs { } } +/// Propagate min/max statistics through an expression using +/// [`PhysicalExpr::evaluate_bounds`]. Works for any expression that +/// implements `evaluate_bounds` (CAST, negation, arithmetic with literals, etc.). +/// +/// Only applied when the expression references a single column at most once in +/// the expression tree. Interval arithmetic treats each column reference as an +/// independent value, so an expression like `a - b` or `col * col` would +/// combine mins/maxes from possibly different rows (or the same column with +/// itself as an independent variable), producing an interval that is not a +/// valid min/max of the expression across the actual data. +fn project_column_statistics_through_expr( + expr: &Arc, + column_stats: &[ColumnStatistics], +) -> ColumnStatistics { + let Some(single_column) = single_column_reference(expr) else { + return ColumnStatistics::new_unknown(); + }; + // With at most one column reference (and literals — always known — as the + // only other leaves), the exactness of the propagated interval reduces to + // the exactness of that column's stats. + let exact = single_column.is_none_or(|col| { + let stats = &column_stats[col.index()]; + stats.min_value.is_exact().unwrap_or(false) + && stats.max_value.is_exact().unwrap_or(false) + }); + match compute_bounds(expr.as_ref(), column_stats) { + Some(interval) => ColumnStatistics { + min_value: to_precision(interval.lower().clone(), exact), + max_value: to_precision(interval.upper().clone(), exact), + null_count: Precision::Absent, + distinct_count: Precision::Absent, + sum_value: Precision::Absent, + byte_size: Precision::Absent, + }, + None => ColumnStatistics::new_unknown(), + } +} + +/// Returns `Some(Some(col))` if `expr` references exactly one [`Column`] (a +/// single occurrence), `Some(None)` if it references no columns, and `None` if +/// it contains two or more [`Column`] nodes (same column or different). +fn single_column_reference(expr: &Arc) -> Option> { + let mut found: Option = None; + let mut multiple = false; + expr.apply(|e| { + if let Some(col) = e.downcast_ref::() { + if found.is_some() { + multiple = true; + return Ok(TreeNodeRecursion::Stop); + } + found = Some(col.clone()); + } + Ok(TreeNodeRecursion::Continue) + }) + .expect("closure never returns Err"); + if multiple { None } else { Some(found) } +} + +/// Convert a bound value to the appropriate [`Precision`] level. +fn to_precision(value: ScalarValue, exact: bool) -> Precision { + if value.is_null() { + Precision::Absent + } else if exact { + Precision::Exact(value) + } else { + Precision::Inexact(value) + } +} + +/// Recursively compute the output [`Interval`] for an expression by feeding +/// column statistics and literals into [`PhysicalExpr::evaluate_bounds`]. +fn compute_bounds( + expr: &dyn PhysicalExpr, + column_stats: &[ColumnStatistics], +) -> Option { + if let Some(col) = expr.downcast_ref::() { + let stats = &column_stats[col.index()]; + let min = stats.min_value.get_value()?.clone(); + let max = stats.max_value.get_value()?.clone(); + return Interval::try_new(min, max).ok(); + } + if let Some(lit) = expr.downcast_ref::() { + let val = lit.value().clone(); + return Interval::try_new(val.clone(), val).ok(); + } + let children = expr.children(); + let child_intervals = children + .iter() + .map(|child| compute_bounds(child.as_ref(), column_stats)) + .collect::>>()?; + let child_refs: Vec<&Interval> = child_intervals.iter().collect(); + expr.evaluate_bounds(&child_refs).ok() +} + impl<'a> IntoIterator for &'a ProjectionExprs { type Item = &'a ProjectionExpr; type IntoIter = std::slice::Iter<'a, ProjectionExpr>; @@ -1256,7 +1353,7 @@ pub(crate) mod tests { use super::*; use crate::equivalence::{EquivalenceProperties, convert_to_orderings}; - use crate::expressions::{BinaryExpr, col}; + use crate::expressions::{BinaryExpr, CastExpr, col}; use crate::utils::tests::TestScalarUDF; use crate::{PhysicalExprRef, ScalarFunctionExpr}; @@ -2772,13 +2869,17 @@ pub(crate) mod tests { // Should have 2 column statistics assert_eq!(output_stats.column_statistics.len(), 2); - // First column (expression) should have unknown statistics + // First column (col0 + 1) should have propagated min/max via evaluate_bounds assert_eq!( - output_stats.column_statistics[0].distinct_count, - Precision::Absent + output_stats.column_statistics[0].min_value, + Precision::Exact(ScalarValue::Int64(Some(-3))) ); assert_eq!( output_stats.column_statistics[0].max_value, + Precision::Exact(ScalarValue::Int64(Some(22))) + ); + assert_eq!( + output_stats.column_statistics[0].distinct_count, Precision::Absent ); @@ -2791,6 +2892,46 @@ pub(crate) mod tests { Ok(()) } + #[test] + fn test_project_statistics_multi_column_expr_returns_unknown() -> Result<()> { + // Multi-column expressions cannot produce valid min/max via interval + // arithmetic because each column's min/max may come from a different + // row, so the combined interval is wider than the actual range of the + // expression over the data. + let input_stats = get_stats(); + let input_schema = get_schema(); + + // SELECT col0 - CAST(col2 AS Int64) AS delta + let projection = ProjectionExprs::new(vec![ProjectionExpr { + expr: Arc::new(BinaryExpr::new( + Arc::new(Column::new("col0", 0)), + Operator::Minus, + Arc::new(CastExpr::new( + Arc::new(Column::new("col2", 2)), + DataType::Int64, + None, + )), + )), + alias: "delta".to_string(), + }]); + + let output_stats = projection.project_statistics( + input_stats, + &projection.project_schema(&input_schema)?, + )?; + + assert_eq!( + output_stats.column_statistics[0].min_value, + Precision::Absent + ); + assert_eq!( + output_stats.column_statistics[0].max_value, + Precision::Absent + ); + + Ok(()) + } + #[test] fn test_project_statistics_primitive_width_only() -> Result<()> { let input_stats = get_stats(); diff --git a/datafusion/physical-optimizer/src/aggregate_statistics.rs b/datafusion/physical-optimizer/src/aggregate_statistics.rs index 75da1873263d..11f5c4ebb7e8 100644 --- a/datafusion/physical-optimizer/src/aggregate_statistics.rs +++ b/datafusion/physical-optimizer/src/aggregate_statistics.rs @@ -20,7 +20,9 @@ use datafusion_common::Result; use datafusion_common::config::ConfigOptions; use datafusion_common::scalar::ScalarValue; use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; -use datafusion_physical_plan::aggregates::{AggregateExec, AggregateInputMode}; +use datafusion_physical_plan::aggregates::{ + AggregateExec, AggregateInputMode, AggregateMode, +}; use datafusion_physical_plan::placeholder_row::PlaceholderRowExec; use datafusion_physical_plan::projection::{ProjectionExec, ProjectionExpr}; use datafusion_physical_plan::udaf::{AggregateFunctionExpr, StatisticsArgs}; @@ -49,7 +51,7 @@ impl PhysicalOptimizerRule for AggregateStatistics { plan: Arc, config: &ConfigOptions, ) -> Result> { - if let Some(partial_agg_exec) = take_optimizable(&*plan) { + if let Some(partial_agg_exec) = take_optimizable(&plan) { let partial_agg_exec = partial_agg_exec .downcast_ref::() .expect("take_optimizable() ensures that this is a AggregateExec"); @@ -106,19 +108,37 @@ impl PhysicalOptimizerRule for AggregateStatistics { } } -/// assert if the node passed as argument is a final `AggregateExec` node that can be optimized: -/// - its child (with possible intermediate layers) is a partial `AggregateExec` node -/// - they both have no grouping expression +/// Returns an `AggregateExec` whose statistics can be used to replace the +/// entire aggregate with literal values, if the plan is eligible. /// -/// If this is the case, return a ref to the partial `AggregateExec`, else `None`. -/// We would have preferred to return a casted ref to AggregateExec but the recursion requires -/// the `ExecutionPlan.children()` method that returns an owned reference. -fn take_optimizable(node: &dyn ExecutionPlan) -> Option> { - if let Some(final_agg_exec) = node.downcast_ref::() - && final_agg_exec.mode().input_mode() == AggregateInputMode::Partial - && final_agg_exec.group_expr().is_empty() +/// Two patterns are recognized: +/// +/// 1. **Final wrapping Partial** (multi-partition): A final `AggregateExec` +/// (input mode = `Partial`) with no GROUP BY whose descendant is a partial +/// `AggregateExec` (input mode = `Raw`) with no GROUP BY and no filters. +/// Returns the inner partial aggregate. +/// +/// 2. **Single / SinglePartitioned** (single-partition): A `Single` or +/// `SinglePartitioned` `AggregateExec` with no GROUP BY and no filters. +/// Returns the aggregate itself. +fn take_optimizable(plan: &Arc) -> Option> { + let agg_exec = plan.downcast_ref::()?; + + // Case 1: Single-mode aggregate — processes raw input, produces final output + if matches!( + agg_exec.mode(), + AggregateMode::Single | AggregateMode::SinglePartitioned + ) && agg_exec.group_expr().is_empty() + && agg_exec.filter_expr().iter().all(|e| e.is_none()) + { + return Some(Arc::clone(plan)); + } + + // Case 2: Final aggregate wrapping a Partial aggregate + if agg_exec.mode().input_mode() == AggregateInputMode::Partial + && agg_exec.group_expr().is_empty() { - let mut child = Arc::clone(final_agg_exec.input()); + let mut child = Arc::clone(agg_exec.input()); loop { if let Some(partial_agg_exec) = child.downcast_ref::() && partial_agg_exec.mode().input_mode() == AggregateInputMode::Raw diff --git a/datafusion/sqllogictest/test_files/aggregate.slt b/datafusion/sqllogictest/test_files/aggregate.slt index 962d1d510395..763c978a6d0a 100644 --- a/datafusion/sqllogictest/test_files/aggregate.slt +++ b/datafusion/sqllogictest/test_files/aggregate.slt @@ -8951,3 +8951,64 @@ GROUP BY id ORDER BY id; statement ok DROP TABLE first_last_value_str_tests; + +# Regression test for incorrect MIN/MAX folding from projected expression +# statistics. Interval arithmetic over multi-column expressions combines the +# independent min/max of each column, which may come from different rows, so +# the resulting interval is not a valid min/max of the expression over the +# data. This query must execute against the actual rows rather than the +# per-column Parquet envelopes. +statement ok +SET datafusion.execution.target_partitions = 1; + +statement ok +CREATE EXTERNAL TABLE hits_raw +STORED AS PARQUET +LOCATION '../core/tests/data/clickbench_hits_10.parquet'; + +query II +SELECT MIN(delta), MAX(delta) +FROM ( + SELECT "UserID" - CAST("ClientIP" AS BIGINT) AS delta + FROM hits_raw +); +---- +-2461439044872611287 7418527518698834918 + +# Same class of bug with a single column referenced more than once. Interval +# arithmetic treats the two occurrences of `UserID` as independent values, so +# it would report `min(UserID) - max(UserID)` / `max(UserID) - min(UserID)` +# instead of the correct 0 / 0. +query II +SELECT MIN(zero), MAX(zero) +FROM ( + SELECT "UserID" - "UserID" AS zero + FROM hits_raw +); +---- +0 0 + +# Single-column, single-occurrence monotonic expressions: MIN/MAX should fold +# from parquet statistics via the aggregate-statistics optimizer and return +# the same values as computing MIN/MAX on the raw column and then applying +# the transformation. Covers negation, arithmetic with a literal, and CAST. +query II +SELECT MIN(-"UserID"), MAX(-"UserID") FROM hits_raw; +---- +-7418527520126366595 2461439046089301801 + +query II +SELECT MIN("UserID" + 1), MAX("UserID" + 1) FROM hits_raw; +---- +-2461439046089301800 7418527520126366596 + +query II +SELECT MIN(CAST("EventDate" AS BIGINT)), MAX(CAST("EventDate" AS BIGINT)) FROM hits_raw; +---- +15901 15901 + +statement ok +SET datafusion.execution.target_partitions = 4; + +statement ok +DROP TABLE hits_raw; diff --git a/datafusion/sqllogictest/test_files/clickbench.slt b/datafusion/sqllogictest/test_files/clickbench.slt index 314c3f9736e9..b1cf01a6443c 100644 --- a/datafusion/sqllogictest/test_files/clickbench.slt +++ b/datafusion/sqllogictest/test_files/clickbench.slt @@ -192,8 +192,8 @@ logical_plan 03)----Projection: CAST(CAST(hits_raw.EventDate AS Int32) AS Date32) AS EventDate 04)------TableScan: hits_raw projection=[EventDate] physical_plan -01)AggregateExec: mode=Single, gby=[], aggr=[min(hits.EventDate), max(hits.EventDate)] -02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/clickbench_hits_10.parquet]]}, projection=[CAST(CAST(EventDate@5 AS Int32) AS Date32) as EventDate], file_type=parquet +01)ProjectionExec: expr=[2013-07-15 as min(hits.EventDate), 2013-07-15 as max(hits.EventDate)] +02)--PlaceholderRowExec query DD SELECT MIN("EventDate"), MAX("EventDate") FROM hits;