diff --git a/datafusion/functions-aggregate/src/array_agg.rs b/datafusion/functions-aggregate/src/array_agg.rs index 24edaaff1f09d..33c48f8bb725d 100644 --- a/datafusion/functions-aggregate/src/array_agg.rs +++ b/datafusion/functions-aggregate/src/array_agg.rs @@ -1008,7 +1008,13 @@ impl OrderSensitiveArrayAggAccumulator { } else { (0..fields.len()) .map(|i| { - let column_values = self.ordering_values.iter().map(|x| x[i].clone()); + let column_values: Box> = if self + .reverse + { + Box::new(self.ordering_values.iter().rev().map(|x| x[i].clone())) + } else { + Box::new(self.ordering_values.iter().map(|x| x[i].clone())) + }; ScalarValue::iter_to_array(column_values) }) .collect::>()? @@ -1512,6 +1518,115 @@ mod tests { Ok(()) } + // Reproduces the bug where `state()` emits reversed values but non-reversed + // orderings when the optimizer sets is_input_pre_ordered=true + reverse=true + // (DESC aggregate with ASC pre-sorted input). The partial states are fed into + // a final accumulator via merge_batch; without the fix the ordering keys and + // values are mismatched so the final sort produces wrong order. + #[test] + fn desc_order_partial_final_merge_correct() -> Result<()> { + use arrow::array::Int64Array; + use datafusion_physical_expr::expressions::Column; + + let schema = Schema::new(vec![ + Field::new("val", DataType::Int64, true), + Field::new("ord", DataType::Int64, true), + ]); + let ord_expr = Arc::new( + Column::new_with_schema("ord", &schema).expect("column not in schema"), + ) as Arc; + + // ordering_req for partial = [ord ASC] (reversed, because input is pre-sorted ASC + // and the user wants DESC — the optimizer reverses the requirement) + let asc_opts = SortOptions { + descending: false, + nulls_first: false, + }; + let desc_opts = SortOptions { + descending: true, + nulls_first: false, + }; + + let asc_ordering = LexOrdering::new(vec![PhysicalSortExpr::new( + Arc::clone(&ord_expr), + asc_opts, + )]) + .unwrap(); + let desc_ordering = LexOrdering::new(vec![PhysicalSortExpr::new( + Arc::clone(&ord_expr), + desc_opts, + )]) + .unwrap(); + + let ordering_dtype = DataType::Int64; + + // Partial acc A: sees rows [0,1,2] arriving in ASC order (pre-ordered). + // is_input_pre_ordered=true, reverse=true, ordering_req=[ASC]. + let mut partial_a = OrderSensitiveArrayAggAccumulator::try_new( + &DataType::Int64, + std::slice::from_ref(&ordering_dtype), + asc_ordering.clone(), + /*is_input_pre_ordered=*/ true, + /*reverse=*/ true, + /*ignore_nulls=*/ false, + )?; + let vals_a = Arc::new(Int64Array::from(vec![0i64, 1, 2])) as ArrayRef; + let ords_a = Arc::new(Int64Array::from(vec![0i64, 1, 2])) as ArrayRef; + partial_a.update_batch(&[vals_a, ords_a])?; + let state_a = partial_a + .state()? + .iter() + .map(|v| v.to_array()) + .collect::>>()?; + + // Partial acc B: sees rows [3,4,5] arriving in ASC order. + let mut partial_b = OrderSensitiveArrayAggAccumulator::try_new( + &DataType::Int64, + std::slice::from_ref(&ordering_dtype), + asc_ordering, + /*is_input_pre_ordered=*/ true, + /*reverse=*/ true, + /*ignore_nulls=*/ false, + )?; + let vals_b = Arc::new(Int64Array::from(vec![3i64, 4, 5])) as ArrayRef; + let ords_b = Arc::new(Int64Array::from(vec![3i64, 4, 5])) as ArrayRef; + partial_b.update_batch(&[vals_b, ords_b])?; + let state_b = partial_b + .state()? + .iter() + .map(|v| v.to_array()) + .collect::>>()?; + + // Final acc: not optimized — ordering_req=[DESC], reverse=false. + let mut final_acc = OrderSensitiveArrayAggAccumulator::try_new( + &DataType::Int64, + std::slice::from_ref(&ordering_dtype), + desc_ordering, + /*is_input_pre_ordered=*/ false, + /*reverse=*/ false, + /*ignore_nulls=*/ false, + )?; + final_acc.merge_batch(&state_a)?; + final_acc.merge_batch(&state_b)?; + let result = final_acc.evaluate()?; + + let ScalarValue::List(list) = result else { + return datafusion_common::internal_err!("expected List"); + }; + let result_vals: Vec = list + .values() + .as_any() + .downcast_ref::() + .unwrap() + .iter() + .map(|v| v.unwrap()) + .collect(); + + // Expected DESC: [5, 4, 3, 2, 1, 0] + assert_eq!(result_vals, vec![5i64, 4, 3, 2, 1, 0]); + Ok(()) + } + struct ArrayAggAccumulatorBuilder { return_field: FieldRef, distinct: bool, diff --git a/datafusion/sqllogictest/test_files/aggregate.slt b/datafusion/sqllogictest/test_files/aggregate.slt index 25b69d16dd035..e9e61ec541256 100644 --- a/datafusion/sqllogictest/test_files/aggregate.slt +++ b/datafusion/sqllogictest/test_files/aggregate.slt @@ -322,6 +322,31 @@ physical_plan 05)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 06)----------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/aggregate_agg_multi_order.csv]]}, projection=[c1, c2, c3], file_type=csv, has_header=true +# Regression test: ARRAY_AGG with conflicting ASC/DESC ORDER BY in the same query. +# get_finer_aggregate_exprs_requirement picks ASC as the common requirement and +# reverses the DESC aggregate (is_reversed=true, ordering_req=[ASC]). +# The optimizer then sets is_input_pre_ordered=true on both. Without the fix, +# state() emits values reversed to DESC but ordering keys still in ASC order, +# causing merge_batch to pair each value with the wrong key (silent wrong results). +query TT +explain select array_agg(c1 order by c1), array_agg(c1 order by c1 desc) from agg_order; +---- +logical_plan +01)Aggregate: groupBy=[[]], aggr=[[array_agg(agg_order.c1) ORDER BY [agg_order.c1 ASC NULLS LAST], array_agg(agg_order.c1) ORDER BY [agg_order.c1 DESC NULLS FIRST]]] +02)--TableScan: agg_order projection=[c1] +physical_plan +01)AggregateExec: mode=Final, gby=[], aggr=[array_agg(agg_order.c1) ORDER BY [agg_order.c1 ASC NULLS LAST], array_agg(agg_order.c1) ORDER BY [agg_order.c1 DESC NULLS FIRST]] +02)--CoalescePartitionsExec +03)----AggregateExec: mode=Partial, gby=[], aggr=[array_agg(agg_order.c1) ORDER BY [agg_order.c1 ASC NULLS LAST], array_agg(agg_order.c1) ORDER BY [agg_order.c1 DESC NULLS FIRST]] +04)------SortExec: expr=[c1@0 ASC NULLS LAST], preserve_partitioning=[true] +05)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +06)----------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/aggregate_agg_multi_order.csv]]}, projection=[c1], file_type=csv, has_header=true + +query ?? +select array_agg(c1 order by c1), array_agg(c1 order by c1 desc) from agg_order; +---- +[1, 2, 3, 4, 5, 6, 7, 8, 9, 10] [10, 9, 8, 7, 6, 5, 4, 3, 2, 1] + # test array_agg_order with list data type statement ok CREATE TABLE array_agg_order_list_table AS VALUES