Skip to content

Commit

Permalink
Fix aggregate nullability calculation
Browse files Browse the repository at this point in the history
  • Loading branch information
alamb committed Jul 13, 2023
1 parent a98b6a0 commit 5ab75b1
Show file tree
Hide file tree
Showing 5 changed files with 157 additions and 290 deletions.
11 changes: 6 additions & 5 deletions datafusion/physical-expr/src/aggregate/average.rs
Original file line number Diff line number Diff line change
Expand Up @@ -565,11 +565,12 @@ where
let sums = std::mem::take(&mut self.sums);
let nulls = self.null_state.build();

assert_eq!(nulls.len(), sums.len());
assert_eq!(counts.len(), sums.len());

// don't evaluate averages with null inputs to avoid errors on null values
let array: PrimitiveArray<T> = if let Some(nulls) = nulls.as_ref() {
assert_eq!(nulls.len(), sums.len());

let array: PrimitiveArray<T> = if nulls.null_count() > 0 {
let mut builder = PrimitiveBuilder::<T>::with_capacity(nulls.len());
let iter = sums.into_iter().zip(counts.into_iter()).zip(nulls.iter());

Expand All @@ -587,7 +588,7 @@ where
.zip(counts.into_iter())
.map(|(sum, count)| (self.avg_fn)(sum, count))
.collect::<Result<Vec<_>>>()?;
PrimitiveArray::new(averages.into(), nulls) // no copy
PrimitiveArray::new(averages.into(), Some(nulls)) // no copy
};

// fix up decimal precision and scale for decimals
Expand All @@ -598,9 +599,9 @@ where

// return arrays for sums and counts
fn state(&mut self) -> Result<Vec<ArrayRef>> {
let nulls = self.null_state.build();
let nulls = Some(self.null_state.build());
let counts = std::mem::take(&mut self.counts);
let counts = UInt64Array::from(counts); // zero copy
let counts = UInt64Array::new(counts.into(), nulls.clone()); // zero copy

let sums = std::mem::take(&mut self.sums);
let sums = PrimitiveArray::<T>::new(sums.into(), nulls); // zero copy
Expand Down

0 comments on commit 5ab75b1

Please sign in to comment.