Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
117 changes: 116 additions & 1 deletion datafusion/functions-aggregate/src/array_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1008,7 +1008,13 @@ impl OrderSensitiveArrayAggAccumulator {
} else {
(0..fields.len())
.map(|i| {
let column_values = self.ordering_values.iter().map(|x| x[i].clone());
let column_values: Box<dyn Iterator<Item = ScalarValue>> = if self
.reverse
{
Box::new(self.ordering_values.iter().rev().map(|x| x[i].clone()))
} else {
Box::new(self.ordering_values.iter().map(|x| x[i].clone()))
};
Comment thread
gabotechs marked this conversation as resolved.
ScalarValue::iter_to_array(column_values)
})
.collect::<Result<_>>()?
Expand Down Expand Up @@ -1512,6 +1518,115 @@ mod tests {
Ok(())
}

// Reproduces the bug where `state()` emits reversed values but non-reversed
// orderings when the optimizer sets is_input_pre_ordered=true + reverse=true
// (DESC aggregate with ASC pre-sorted input). The partial states are fed into
// a final accumulator via merge_batch; without the fix the ordering keys and
// values are mismatched so the final sort produces wrong order.
#[test]
fn desc_order_partial_final_merge_correct() -> Result<()> {
use arrow::array::Int64Array;
use datafusion_physical_expr::expressions::Column;

let schema = Schema::new(vec![
Field::new("val", DataType::Int64, true),
Field::new("ord", DataType::Int64, true),
]);
let ord_expr = Arc::new(
Column::new_with_schema("ord", &schema).expect("column not in schema"),
) as Arc<dyn PhysicalExpr>;

// ordering_req for partial = [ord ASC] (reversed, because input is pre-sorted ASC
// and the user wants DESC — the optimizer reverses the requirement)
let asc_opts = SortOptions {
descending: false,
nulls_first: false,
};
let desc_opts = SortOptions {
descending: true,
nulls_first: false,
};

let asc_ordering = LexOrdering::new(vec![PhysicalSortExpr::new(
Arc::clone(&ord_expr),
asc_opts,
)])
.unwrap();
let desc_ordering = LexOrdering::new(vec![PhysicalSortExpr::new(
Arc::clone(&ord_expr),
desc_opts,
)])
.unwrap();

let ordering_dtype = DataType::Int64;

// Partial acc A: sees rows [0,1,2] arriving in ASC order (pre-ordered).
// is_input_pre_ordered=true, reverse=true, ordering_req=[ASC].
let mut partial_a = OrderSensitiveArrayAggAccumulator::try_new(
&DataType::Int64,
std::slice::from_ref(&ordering_dtype),
asc_ordering.clone(),
/*is_input_pre_ordered=*/ true,
/*reverse=*/ true,
/*ignore_nulls=*/ false,
)?;
let vals_a = Arc::new(Int64Array::from(vec![0i64, 1, 2])) as ArrayRef;
let ords_a = Arc::new(Int64Array::from(vec![0i64, 1, 2])) as ArrayRef;
partial_a.update_batch(&[vals_a, ords_a])?;
let state_a = partial_a
.state()?
.iter()
.map(|v| v.to_array())
.collect::<Result<Vec<_>>>()?;

// Partial acc B: sees rows [3,4,5] arriving in ASC order.
let mut partial_b = OrderSensitiveArrayAggAccumulator::try_new(
&DataType::Int64,
std::slice::from_ref(&ordering_dtype),
asc_ordering,
/*is_input_pre_ordered=*/ true,
/*reverse=*/ true,
/*ignore_nulls=*/ false,
)?;
let vals_b = Arc::new(Int64Array::from(vec![3i64, 4, 5])) as ArrayRef;
let ords_b = Arc::new(Int64Array::from(vec![3i64, 4, 5])) as ArrayRef;
partial_b.update_batch(&[vals_b, ords_b])?;
let state_b = partial_b
.state()?
.iter()
.map(|v| v.to_array())
.collect::<Result<Vec<_>>>()?;

// Final acc: not optimized — ordering_req=[DESC], reverse=false.
let mut final_acc = OrderSensitiveArrayAggAccumulator::try_new(
&DataType::Int64,
std::slice::from_ref(&ordering_dtype),
desc_ordering,
/*is_input_pre_ordered=*/ false,
/*reverse=*/ false,
/*ignore_nulls=*/ false,
)?;
final_acc.merge_batch(&state_a)?;
final_acc.merge_batch(&state_b)?;
let result = final_acc.evaluate()?;

let ScalarValue::List(list) = result else {
return datafusion_common::internal_err!("expected List");
};
let result_vals: Vec<i64> = list
.values()
.as_any()
.downcast_ref::<Int64Array>()
.unwrap()
.iter()
.map(|v| v.unwrap())
.collect();

// Expected DESC: [5, 4, 3, 2, 1, 0]
assert_eq!(result_vals, vec![5i64, 4, 3, 2, 1, 0]);
Ok(())
}

struct ArrayAggAccumulatorBuilder {
return_field: FieldRef,
distinct: bool,
Expand Down
25 changes: 25 additions & 0 deletions datafusion/sqllogictest/test_files/aggregate.slt
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,31 @@ physical_plan
05)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
06)----------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/aggregate_agg_multi_order.csv]]}, projection=[c1, c2, c3], file_type=csv, has_header=true

# Regression test: ARRAY_AGG with conflicting ASC/DESC ORDER BY in the same query.
# get_finer_aggregate_exprs_requirement picks ASC as the common requirement and
# reverses the DESC aggregate (is_reversed=true, ordering_req=[ASC]).
# The optimizer then sets is_input_pre_ordered=true on both. Without the fix,
# state() emits values reversed to DESC but ordering keys still in ASC order,
# causing merge_batch to pair each value with the wrong key (silent wrong results).
query TT
explain select array_agg(c1 order by c1), array_agg(c1 order by c1 desc) from agg_order;
----
logical_plan
01)Aggregate: groupBy=[[]], aggr=[[array_agg(agg_order.c1) ORDER BY [agg_order.c1 ASC NULLS LAST], array_agg(agg_order.c1) ORDER BY [agg_order.c1 DESC NULLS FIRST]]]
02)--TableScan: agg_order projection=[c1]
physical_plan
01)AggregateExec: mode=Final, gby=[], aggr=[array_agg(agg_order.c1) ORDER BY [agg_order.c1 ASC NULLS LAST], array_agg(agg_order.c1) ORDER BY [agg_order.c1 DESC NULLS FIRST]]
02)--CoalescePartitionsExec
03)----AggregateExec: mode=Partial, gby=[], aggr=[array_agg(agg_order.c1) ORDER BY [agg_order.c1 ASC NULLS LAST], array_agg(agg_order.c1) ORDER BY [agg_order.c1 DESC NULLS FIRST]]
04)------SortExec: expr=[c1@0 ASC NULLS LAST], preserve_partitioning=[true]
05)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
06)----------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/aggregate_agg_multi_order.csv]]}, projection=[c1], file_type=csv, has_header=true

query ??
select array_agg(c1 order by c1), array_agg(c1 order by c1 desc) from agg_order;
----
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10] [10, 9, 8, 7, 6, 5, 4, 3, 2, 1]
Comment thread
gabotechs marked this conversation as resolved.

# test array_agg_order with list data type
statement ok
CREATE TABLE array_agg_order_list_table AS VALUES
Expand Down
Loading