Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
164 changes: 130 additions & 34 deletions datafusion/common/src/scalar/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3246,6 +3246,8 @@ impl ScalarValue {

/// Retrieve ScalarValue for each row in `array`
///
/// Elements in `array` may be NULL, in which case the corresponding element in the returned vector is None.
///
/// Example 1: Array (ScalarValue::Int32)
/// ```
/// use datafusion_common::ScalarValue;
Expand All @@ -3262,15 +3264,15 @@ impl ScalarValue {
/// let scalar_vec = ScalarValue::convert_array_to_scalar_vec(&list_arr).unwrap();
///
/// let expected = vec![
/// vec![
/// ScalarValue::Int32(Some(1)),
/// ScalarValue::Int32(Some(2)),
/// ScalarValue::Int32(Some(3)),
/// ],
/// vec![
/// ScalarValue::Int32(Some(4)),
/// ScalarValue::Int32(Some(5)),
/// ],
/// Some(vec![
/// ScalarValue::Int32(Some(1)),
/// ScalarValue::Int32(Some(2)),
/// ScalarValue::Int32(Some(3)),
/// ]),
/// Some(vec![
/// ScalarValue::Int32(Some(4)),
/// ScalarValue::Int32(Some(5)),
/// ]),
/// ];
///
/// assert_eq!(scalar_vec, expected);
Expand Down Expand Up @@ -3303,28 +3305,62 @@ impl ScalarValue {
/// ]);
///
/// let expected = vec![
/// vec![
/// Some(vec![
/// ScalarValue::List(Arc::new(l1)),
/// ScalarValue::List(Arc::new(l2)),
/// ],
/// ]),
/// ];
///
/// assert_eq!(scalar_vec, expected);
/// ```
///
/// Example 3: Nullable array
/// ```
/// use datafusion_common::ScalarValue;
/// use arrow::array::ListArray;
/// use arrow::datatypes::{DataType, Int32Type};
///
/// let list_arr = ListArray::from_iter_primitive::<Int32Type, _, _>(vec![
/// Some(vec![Some(1), Some(2), Some(3)]),
/// None,
/// Some(vec![Some(4), Some(5)])
/// ]);
///
/// // Convert the array into Scalar Values for each row
/// let scalar_vec = ScalarValue::convert_array_to_scalar_vec(&list_arr).unwrap();
///
/// let expected = vec![
/// Some(vec![
/// ScalarValue::Int32(Some(1)),
/// ScalarValue::Int32(Some(2)),
/// ScalarValue::Int32(Some(3)),
/// ]),
/// None,
/// Some(vec![
/// ScalarValue::Int32(Some(4)),
/// ScalarValue::Int32(Some(5)),
/// ]),
/// ];
///
/// assert_eq!(scalar_vec, expected);
/// ```
pub fn convert_array_to_scalar_vec(array: &dyn Array) -> Result<Vec<Vec<Self>>> {
pub fn convert_array_to_scalar_vec(
array: &dyn Array,
) -> Result<Vec<Option<Vec<Self>>>> {
fn generic_collect<OffsetSize: OffsetSizeTrait>(
array: &dyn Array,
) -> Result<Vec<Vec<ScalarValue>>> {
) -> Result<Vec<Option<Vec<ScalarValue>>>> {
array
.as_list::<OffsetSize>()
.iter()
.map(|nested_array| match nested_array {
Some(nested_array) => (0..nested_array.len())
.map(|i| ScalarValue::try_from_array(&nested_array, i))
.collect::<Result<Vec<_>>>(),
// TODO: what can we put for null?
// https://github.com/apache/datafusion/issues/17749
None => Ok(vec![]),
.map(|nested_array| {
nested_array
.map(|array| {
(0..array.len())
.map(|i| ScalarValue::try_from_array(&array, i))
.collect::<Result<Vec<_>>>()
})
.transpose()
})
.collect()
}
Expand Down Expand Up @@ -9021,7 +9057,7 @@ mod tests {

#[test]
fn test_convert_array_to_scalar_vec() {
// Regular ListArray
// 1: Regular ListArray
let list = ListArray::from_iter_primitive::<Int64Type, _, _>(vec![
Some(vec![Some(1), Some(2)]),
None,
Expand All @@ -9031,17 +9067,20 @@ mod tests {
assert_eq!(
converted,
vec![
vec![ScalarValue::Int64(Some(1)), ScalarValue::Int64(Some(2))],
vec![],
vec![
Some(vec![
ScalarValue::Int64(Some(1)),
ScalarValue::Int64(Some(2))
]),
None,
Some(vec![
ScalarValue::Int64(Some(3)),
ScalarValue::Int64(None),
ScalarValue::Int64(Some(4))
],
]),
]
);

// Regular LargeListArray
// 2: Regular LargeListArray
let large_list = LargeListArray::from_iter_primitive::<Int64Type, _, _>(vec![
Some(vec![Some(1), Some(2)]),
None,
Expand All @@ -9051,17 +9090,20 @@ mod tests {
assert_eq!(
converted,
vec![
vec![ScalarValue::Int64(Some(1)), ScalarValue::Int64(Some(2))],
vec![],
vec![
Some(vec![
ScalarValue::Int64(Some(1)),
ScalarValue::Int64(Some(2))
]),
None,
Some(vec![
ScalarValue::Int64(Some(3)),
ScalarValue::Int64(None),
ScalarValue::Int64(Some(4))
],
]),
]
);

// Funky (null slot has non-zero list offsets)
// 3: Funky (null slot has non-zero list offsets)
// Offsets + Values looks like this: [[1, 2], [3, 4], [5]]
// But with NullBuffer it's like this: [[1, 2], NULL, [5]]
let funky = ListArray::new(
Expand All @@ -9074,9 +9116,63 @@ mod tests {
assert_eq!(
converted,
vec![
vec![ScalarValue::Int64(Some(1)), ScalarValue::Int64(Some(2))],
vec![],
vec![ScalarValue::Int64(Some(5))],
Some(vec![
ScalarValue::Int64(Some(1)),
ScalarValue::Int64(Some(2))
]),
None,
Some(vec![ScalarValue::Int64(Some(5))]),
]
);

// 4: Offsets + Values looks like this: [[1, 2], [], [5]]
// But with NullBuffer it's like this: [[1, 2], NULL, [5]]
// The converted result is: [[1, 2], None, [5]]
let array4 = ListArray::new(
Field::new_list_field(DataType::Int64, true).into(),
OffsetBuffer::new(vec![0, 2, 2, 5].into()),
Arc::new(Int64Array::from(vec![1, 2, 3, 4, 5, 6])),
Some(NullBuffer::from(vec![true, false, true])),
);
let converted = ScalarValue::convert_array_to_scalar_vec(&array4).unwrap();
assert_eq!(
converted,
vec![
Some(vec![
ScalarValue::Int64(Some(1)),
ScalarValue::Int64(Some(2))
]),
None,
Some(vec![
ScalarValue::Int64(Some(3)),
ScalarValue::Int64(Some(4)),
ScalarValue::Int64(Some(5)),
]),
]
);
Comment on lines +9128 to +9152
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On main the middle element here would be an empty vec, not None.


// 5: Offsets + Values looks like this: [[1, 2], [], [5]]
// Same as 4, but the middle array is not null, so after conversion it's empty.
let array5 = ListArray::new(
Field::new_list_field(DataType::Int64, true).into(),
OffsetBuffer::new(vec![0, 2, 2, 5].into()),
Arc::new(Int64Array::from(vec![1, 2, 3, 4, 5, 6])),
Some(NullBuffer::from(vec![true, true, true])),
);
let converted = ScalarValue::convert_array_to_scalar_vec(&array5).unwrap();
assert_eq!(
converted,
vec![
Some(vec![
ScalarValue::Int64(Some(1)),
ScalarValue::Int64(Some(2))
]),
Some(vec![]),
Some(vec![
ScalarValue::Int64(Some(3)),
ScalarValue::Int64(Some(4)),
ScalarValue::Int64(Some(5)),
]),
]
Comment on lines +9154 to 9176
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On main the middle element here would also be an empty vec, not None.

);
}
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/tests/sql/aggregates/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ async fn csv_query_array_agg_distinct() -> Result<()> {
let column = actual[0].column(0);
assert_eq!(column.len(), 1);
let scalar_vec = ScalarValue::convert_array_to_scalar_vec(&column)?;
let mut scalars = scalar_vec[0].clone();
let mut scalars = scalar_vec[0].as_ref().unwrap().clone();

// workaround lack of Ord of ScalarValue
let cmp = |a: &ScalarValue, b: &ScalarValue| {
Expand Down
24 changes: 16 additions & 8 deletions datafusion/functions-aggregate-common/src/merge_arrays.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ impl PartialOrd for CustomElement<'_> {

/// This functions merges `values` array (`&[Vec<ScalarValue>]`) into single array `Vec<ScalarValue>`
/// Merging done according to ordering values stored inside `ordering_values` (`&[Vec<Vec<ScalarValue>>]`)
/// Inner `Vec<ScalarValue>` in the `ordering_values` can be thought as ordering information for the
/// Inner `Vec<ScalarValue>` in the `ordering_values` can be thought as ordering information for
/// each `ScalarValue` in the `values` array.
/// Desired ordering specified by `sort_options` argument (Should have same size with inner `Vec<ScalarValue>`
/// of the `ordering_values` array).
Expand Down Expand Up @@ -119,17 +119,25 @@ pub fn merge_ordered_arrays(
// Defines according to which ordering comparisons should be done.
sort_options: &[SortOptions],
) -> datafusion_common::Result<(Vec<ScalarValue>, Vec<Vec<ScalarValue>>)> {
// Keep track the most recent data of each branch, in binary heap data structure.
// Keep track of the most recent data of each branch, in a binary heap data structure.
let mut heap = BinaryHeap::<CustomElement>::new();

if values.len() != ordering_values.len()
|| values
.iter()
.zip(ordering_values.iter())
.any(|(vals, ordering_vals)| vals.len() != ordering_vals.len())
if values.len() != ordering_values.len() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't exactly feel strongly about these changes, but since these two conditions are different I liked the error message telling me which was the case.

return exec_err!(
"Expects values and ordering_values to have same size but got {} and {}",
values.len(),
ordering_values.len()
);
}
if values
.iter()
.zip(ordering_values.iter())
.any(|(vals, ordering_vals)| vals.len() != ordering_vals.len())
{
return exec_err!(
"Expects values arguments and/or ordering_values arguments to have same size"
"Expects values elements and ordering_values elements to have same size but got {} and {}",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice fix; is it worth including index too?

values.len(),
ordering_values.len()
);
}
let n_branch = values.len();
Expand Down
11 changes: 7 additions & 4 deletions datafusion/functions-aggregate/src/array_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -687,13 +687,16 @@ impl Accumulator for OrderSensitiveArrayAggAccumulator {

// Convert array to Scalars to sort them easily. Convert back to array at evaluation.
let array_agg_res = ScalarValue::convert_array_to_scalar_vec(array_agg_values)?;
for v in array_agg_res.into_iter() {
partition_values.push(v.into());
for maybe_v in array_agg_res.into_iter() {
if let Some(v) = maybe_v {
partition_values.push(v.into());
} else {
partition_values.push(vec![].into());
}
Comment on lines +693 to +695
Copy link
Contributor Author

@vegarsti vegarsti Oct 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure about this one! With the other changes, the group_by slt tests fail without this change. It is because in merge_ordered_arrays we require that partition_values and partition_ordering_values have the same length (see code snippet below). But that seems weird since we now only do partition_ordering_values.push(ordering_value) if the element in orderings is Some.

if values.len() != ordering_values.len()
|| values
.iter()
.zip(ordering_values.iter())
.any(|(vals, ordering_vals)| vals.len() != ordering_vals.len())
{
return exec_err!(
"Expects values arguments and/or ordering_values arguments to have same size"
);

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's also an exact same change done in nth_values.rs

}

let orderings = ScalarValue::convert_array_to_scalar_vec(agg_orderings)?;

for partition_ordering_rows in orderings.into_iter() {
for partition_ordering_rows in orderings.into_iter().flatten() {
// Extract value from struct to ordering_rows for each group/partition
let ordering_value = partition_ordering_rows.into_iter().map(|ordering_row| {
if let ScalarValue::Struct(s) = ordering_row {
Expand Down
6 changes: 3 additions & 3 deletions datafusion/functions-aggregate/src/nth_value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ impl Accumulator for TrivialNthValueAccumulator {
// First entry in the state is the aggregation result.
let n_required = self.n.unsigned_abs() as usize;
let array_agg_res = ScalarValue::convert_array_to_scalar_vec(&states[0])?;
for v in array_agg_res.into_iter() {
for v in array_agg_res.into_iter().flatten() {
self.values.extend(v);
if self.values.len() > n_required {
// There is enough data collected, can stop merging:
Expand Down Expand Up @@ -457,14 +457,14 @@ impl Accumulator for NthValueAccumulator {
let mut partition_values = vec![self.values.clone()];
// First entry in the state is the aggregation result.
let array_agg_res = ScalarValue::convert_array_to_scalar_vec(&states[0])?;
for v in array_agg_res.into_iter() {
for v in array_agg_res.into_iter().flatten() {
partition_values.push(v.into());
}
// Stores ordering requirement expression results coming from each partition:
let mut partition_ordering_values = vec![self.ordering_values.clone()];
let orderings = ScalarValue::convert_array_to_scalar_vec(agg_orderings)?;
// Extract value from struct to ordering_rows for each group/partition:
for partition_ordering_rows in orderings.into_iter() {
for partition_ordering_rows in orderings.into_iter().flatten() {
let ordering_values = partition_ordering_rows.into_iter().map(|ordering_row| {
let ScalarValue::Struct(s_array) = ordering_row else {
return exec_err!(
Expand Down
1 change: 1 addition & 0 deletions datafusion/functions-nested/src/array_has.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ impl ScalarUDFImpl for ArrayHas {
let list = scalar_values
.into_iter()
.flatten()
.flatten()
.map(|v| Expr::Literal(v, None))
.collect();

Expand Down