From 9ef88c77d3e3f170cdf524f3357a187101010064 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rn=20Horstmann?= Date: Fri, 10 Nov 2023 14:24:37 +0100 Subject: [PATCH 1/7] Refactor numeric aggregation kernels to make better use of auto-vectorization. Remove the explicit simd implementations since the autovectorized versions are faster on average. The min/max kernels for floating point numbers now use the total order relation. --- arrow-arith/src/aggregate.rs | 714 ++++++++++++----------------- arrow-array/src/arithmetic.rs | 93 +++- arrow-buffer/src/buffer/boolean.rs | 1 + 3 files changed, 376 insertions(+), 432 deletions(-) diff --git a/arrow-arith/src/aggregate.rs b/arrow-arith/src/aggregate.rs index 0dabaa50f5f..64233753084 100644 --- a/arrow-arith/src/aggregate.rs +++ b/arrow-arith/src/aggregate.rs @@ -20,39 +20,298 @@ use arrow_array::cast::*; use arrow_array::iterator::ArrayIter; use arrow_array::*; -use arrow_buffer::ArrowNativeType; +use arrow_buffer::{ArrowNativeType, NullBuffer}; use arrow_data::bit_iterator::try_for_each_valid_idx; use arrow_schema::ArrowError; use arrow_schema::*; +use std::borrow::BorrowMut; use std::ops::{BitAnd, BitOr, BitXor}; -/// Generic test for NaN, the optimizer should be able to remove this for integer types. -#[inline] -pub(crate) fn is_nan(a: T) -> bool { - #[allow(clippy::eq_op)] - !(a == a) +#[inline(always)] +fn select(m: bool, a: T, b: T) -> T { + if m { + a + } else { + b + } } -/// Returns the minimum value in the array, according to the natural order. -/// For floating point arrays any NaN values are considered to be greater than any other non-null value -#[cfg(not(feature = "simd"))] -pub fn min(array: &PrimitiveArray) -> Option -where - T: ArrowNumericType, - T::Native: ArrowNativeType, -{ - min_max_helper::(array, |a, b| (is_nan(*a) & !is_nan(*b)) || a > b) +trait NumericAccumulator: Copy + Default { + fn accumulate(&mut self, value: T); + fn accumulate_nullable(&mut self, value: T, valid: bool); + fn merge(&mut self, other: Self); + fn finish(&mut self) -> T; } -/// Returns the maximum value in the array, according to the natural order. -/// For floating point arrays any NaN values are considered to be greater than any other non-null value -#[cfg(not(feature = "simd"))] -pub fn max(array: &PrimitiveArray) -> Option -where - T: ArrowNumericType, - T::Native: ArrowNativeType, -{ - min_max_helper::(array, |a, b| (!is_nan(*a) & is_nan(*b)) || a < b) +#[derive(Clone, Copy)] +struct SumAccumulator { + sum: T, +} + +impl Default for SumAccumulator { + fn default() -> Self { + Self { sum: T::ZERO } + } +} + +impl NumericAccumulator for SumAccumulator { + fn accumulate(&mut self, value: T) { + self.sum = self.sum.add_wrapping(value); + } + + fn accumulate_nullable(&mut self, value: T, valid: bool) { + let sum = self.sum; + self.sum = select(valid, sum.add_wrapping(value), sum) + } + + fn merge(&mut self, other: Self) { + self.sum = self.sum.add_wrapping(other.sum); + } + + fn finish(&mut self) -> T { + self.sum + } +} + +#[derive(Clone, Copy)] +struct MinAccumulator { + min: T, +} + +impl Default for MinAccumulator { + fn default() -> Self { + Self { min: T::MAX } + } +} + +impl NumericAccumulator for MinAccumulator { + fn accumulate(&mut self, value: T) { + let min = self.min; + self.min = select(value.is_lt(min), value, min); + } + + fn accumulate_nullable(&mut self, value: T, valid: bool) { + let min = self.min; + let is_lt = valid & value.is_lt(min); + self.min = select(is_lt, value, min); + } + + fn merge(&mut self, other: Self) { + self.accumulate(other.min) + } + + fn finish(&mut self) -> T { + self.min + } +} + +#[derive(Clone, Copy)] +struct MaxAccumulator { + max: T, +} + +impl Default for MaxAccumulator { + fn default() -> Self { + Self { max: T::MIN } + } +} + +impl NumericAccumulator for MaxAccumulator { + fn accumulate(&mut self, value: T) { + let max = self.max; + self.max = select(value.is_gt(max), value, max); + } + + fn accumulate_nullable(&mut self, value: T, valid: bool) { + let max = self.max; + let is_gt = value.is_gt(max) & valid; + self.max = select(is_gt, value, max); + } + + fn merge(&mut self, other: Self) { + self.accumulate(other.max) + } + + fn finish(&mut self) -> T { + self.max + } +} + +fn reduce_accumulators, const LANES: usize>( + mut acc: [A; LANES], +) -> A { + assert!(LANES > 0 && LANES.is_power_of_two()); + let mut len = LANES; + + // attempt at tree reduction, unfortunately llvm does not fully recognize this pattern, + // but the generated code is still a little faster than purely sequential reduction for floats. + while len >= 2 { + let mid = len / 2; + let (h, t) = acc[..len].split_at_mut(mid); + + for i in 0..mid { + h[i].merge(t[i]); + } + len /= 2; + } + acc[0] +} + +#[inline(always)] +fn aggregate_nonnull_chunk, const LANES: usize>( + acc: &mut [A; LANES], + values: &[T; LANES], +) { + for i in 0..LANES { + acc[i].accumulate(values[i]); + } +} + +#[inline(always)] +fn aggregate_nullable_chunk, const LANES: usize>( + acc: &mut [A; LANES], + values: &[T; LANES], + validity: u64, +) { + let mut bit = 1; + for i in 0..LANES { + acc[i].accumulate_nullable(values[i], (validity & bit) != 0); + bit <<= 1; + } +} + +fn aggregate_nonnull_simple>(values: &[T]) -> T { + return values + .iter() + .copied() + .fold(A::default(), |mut a, b| { + a.accumulate(b); + a + }) + .finish(); +} + +#[inline(never)] +fn aggregate_nonnull_lanes, const LANES: usize>( + values: &[T], +) -> T { + let mut acc = [A::default(); LANES]; + let mut chunks = values.chunks_exact(LANES); + chunks.borrow_mut().for_each(|chunk| { + aggregate_nonnull_chunk(&mut acc, chunk[..LANES].try_into().unwrap()); + }); + + let remainder = chunks.remainder(); + if remainder.len() > 0 { + if remainder.len() > 0 { + for i in 0..remainder.len() { + acc[i].accumulate(remainder[i]); + } + } + } + + reduce_accumulators(acc).finish() +} + +#[inline(never)] +fn aggregate_nullable_lanes, const LANES: usize>( + values: &[T], + validity: &NullBuffer, +) -> T { + assert!(LANES > 0 && 64 % LANES == 0); + assert_eq!(values.len(), validity.len()); + + let mut acc = [A::default(); LANES]; + let mut values_chunks = values.chunks_exact(64); + let validity_chunks = validity.inner().bit_chunks(); + let mut validity_chunks_iter = validity_chunks.iter(); + + values_chunks.borrow_mut().for_each(|chunk| { + // Safety: we asserted that values and validity have the same length and trust the iterator impl + let mut validity = unsafe { validity_chunks_iter.next().unwrap_unchecked() }; + chunk.chunks_exact(LANES).for_each(|chunk| { + aggregate_nullable_chunk(&mut acc, chunk[..LANES].try_into().unwrap(), validity); + validity >>= LANES; + }); + }); + + let remainder = values_chunks.remainder(); + if remainder.len() > 0 { + let mut validity = validity_chunks.remainder_bits(); + + let mut remainder_chunks = remainder.chunks_exact(LANES); + remainder_chunks.borrow_mut().for_each(|chunk| { + aggregate_nullable_chunk(&mut acc, chunk[..LANES].try_into().unwrap(), validity); + validity >>= LANES; + }); + + let remainder = remainder_chunks.remainder(); + if remainder.len() > 0 { + let mut bit = 1; + for i in 0..remainder.len() { + acc[i].accumulate_nullable(remainder[i], (validity & bit) != 0); + bit <<= 1; + } + } + } + + reduce_accumulators(acc).finish() +} + +// The preferred vector size in bytes for the target platform. +// Note that the avx512 target feature is still unstable and this also means it is not detected on stable rust. +const PREFERRED_VECTOR_SIZE: usize = + if cfg!(all(target_arch = "x86_64", target_feature = "avx512f")) { + 64 + } else if cfg!(all(target_arch = "x86_64", target_feature = "avx")) { + 32 + } else { + 16 + }; + +// non-nullable aggregation requires fewer temporary registers so we can use more of them for accumulators +const PREFERRED_VECTOR_SIZE_NON_NULL: usize = PREFERRED_VECTOR_SIZE * 2; + +fn aggregate, A: NumericAccumulator>( + array: &PrimitiveArray

, +) -> Option { + let null_count = array.null_count(); + if null_count == array.len() { + return None; + } + let values = array.values().as_ref(); + match array.nulls() { + Some(nulls) if null_count > 0 => match PREFERRED_VECTOR_SIZE / std::mem::size_of::() { + 64 => Some(aggregate_nullable_lanes::(values, nulls)), + 32 => Some(aggregate_nullable_lanes::(values, nulls)), + 16 => Some(aggregate_nullable_lanes::(values, nulls)), + 8 => Some(aggregate_nullable_lanes::(values, nulls)), + 4 => Some(aggregate_nullable_lanes::(values, nulls)), + 2 => Some(aggregate_nullable_lanes::(values, nulls)), + _ => Some(aggregate_nullable_lanes::(values, nulls)), + }, + _ => { + let is_float = matches!( + array.data_type(), + DataType::Float16 | DataType::Float32 | DataType::Float64 + ); + if is_float { + match PREFERRED_VECTOR_SIZE_NON_NULL / std::mem::size_of::() { + 64 => Some(aggregate_nonnull_lanes::(values)), + 32 => Some(aggregate_nonnull_lanes::(values)), + 16 => Some(aggregate_nonnull_lanes::(values)), + 8 => Some(aggregate_nonnull_lanes::(values)), + 4 => Some(aggregate_nonnull_lanes::(values)), + 2 => Some(aggregate_nonnull_lanes::(values)), + _ => Some(aggregate_nonnull_lanes::(values)), + } + } else { + // for non-null integers its better to not chunk ourselves and instead + // let llvm fully handle loop unrolling and vectorization + Some(aggregate_nonnull_simple::(values)) + } + } + } } /// Returns the minimum value in the boolean array. @@ -230,7 +489,7 @@ where T: ArrowNumericType, T::Native: ArrowNativeType, { - min_max_array_helper::(array, |a, b| (is_nan(*a) & !is_nan(*b)) || a > b, min) + min_max_array_helper::(array, |a, b| a.is_gt(*b), min) } /// Returns the max of values in the array of `ArrowNumericType` type, or dictionary @@ -238,9 +497,9 @@ where pub fn max_array>(array: A) -> Option where T: ArrowNumericType, - T::Native: ArrowNativeType, + T::Native: ArrowNativeTypeOp, { - min_max_array_helper::(array, |a, b| (!is_nan(*a) & is_nan(*b)) || a < b, max) + min_max_array_helper::(array, |a, b| a.is_lt(*b), max) } fn min_max_array_helper, F, M>( @@ -259,66 +518,6 @@ where } } -/// Returns the sum of values in the primitive array. -/// -/// Returns `None` if the array is empty or only contains null values. -/// -/// This doesn't detect overflow. Once overflowing, the result will wrap around. -/// For an overflow-checking variant, use `sum_checked` instead. -#[cfg(not(feature = "simd"))] -pub fn sum(array: &PrimitiveArray) -> Option -where - T: ArrowNumericType, - T::Native: ArrowNativeTypeOp, -{ - let null_count = array.null_count(); - - if null_count == array.len() { - return None; - } - - let data: &[T::Native] = array.values(); - - match array.nulls() { - None => { - let sum = data.iter().fold(T::default_value(), |accumulator, value| { - accumulator.add_wrapping(*value) - }); - - Some(sum) - } - Some(nulls) => { - let mut sum = T::default_value(); - let data_chunks = data.chunks_exact(64); - let remainder = data_chunks.remainder(); - - let bit_chunks = nulls.inner().bit_chunks(); - data_chunks - .zip(bit_chunks.iter()) - .for_each(|(chunk, mask)| { - // index_mask has value 1 << i in the loop - let mut index_mask = 1; - chunk.iter().for_each(|value| { - if (mask & index_mask) != 0 { - sum = sum.add_wrapping(*value); - } - index_mask <<= 1; - }); - }); - - let remainder_bits = bit_chunks.remainder_bits(); - - remainder.iter().enumerate().for_each(|(i, value)| { - if remainder_bits & (1 << i) != 0 { - sum = sum.add_wrapping(*value); - } - }); - - Some(sum) - } - } -} - macro_rules! bit_operation { ($NAME:ident, $OP:ident, $NATIVE:ident, $DEFAULT:expr, $DOC:expr) => { #[doc = $DOC] @@ -476,369 +675,35 @@ where } } -#[cfg(feature = "simd")] -mod simd { - use super::is_nan; - use arrow_array::*; - use std::marker::PhantomData; - - pub(super) trait SimdAggregate { - type ScalarAccumulator; - type SimdAccumulator; - - /// Returns the accumulator for aggregating scalar values - fn init_accumulator_scalar() -> Self::ScalarAccumulator; - - /// Returns the accumulator for aggregating simd chunks of values - fn init_accumulator_chunk() -> Self::SimdAccumulator; - - /// Updates the accumulator with the values of one chunk - fn accumulate_chunk_non_null(accumulator: &mut Self::SimdAccumulator, chunk: T::Simd); - - /// Updates the accumulator with the values of one chunk according to the given vector mask - fn accumulate_chunk_nullable( - accumulator: &mut Self::SimdAccumulator, - chunk: T::Simd, - mask: T::SimdMask, - ); - - /// Updates the accumulator with one value - fn accumulate_scalar(accumulator: &mut Self::ScalarAccumulator, value: T::Native); - - /// Reduces the vector lanes of the simd accumulator and the scalar accumulator to a single value - fn reduce( - simd_accumulator: Self::SimdAccumulator, - scalar_accumulator: Self::ScalarAccumulator, - ) -> Option; - } - - pub(super) struct SumAggregate { - phantom: PhantomData, - } - - impl SimdAggregate for SumAggregate - where - T::Native: ArrowNativeTypeOp, - { - type ScalarAccumulator = T::Native; - type SimdAccumulator = T::Simd; - - fn init_accumulator_scalar() -> Self::ScalarAccumulator { - T::default_value() - } - - fn init_accumulator_chunk() -> Self::SimdAccumulator { - T::init(Self::init_accumulator_scalar()) - } - - fn accumulate_chunk_non_null(accumulator: &mut T::Simd, chunk: T::Simd) { - *accumulator = *accumulator + chunk; - } - - fn accumulate_chunk_nullable( - accumulator: &mut T::Simd, - chunk: T::Simd, - vecmask: T::SimdMask, - ) { - let zero = T::init(T::default_value()); - let blended = T::mask_select(vecmask, chunk, zero); - - *accumulator = *accumulator + blended; - } - - fn accumulate_scalar(accumulator: &mut T::Native, value: T::Native) { - *accumulator = accumulator.add_wrapping(value) - } - - fn reduce( - simd_accumulator: Self::SimdAccumulator, - scalar_accumulator: Self::ScalarAccumulator, - ) -> Option { - // we can't use T::lanes() as the slice len because it is not const, - // instead always reserve the maximum number of lanes - let mut tmp = [T::default_value(); 64]; - let slice = &mut tmp[0..T::lanes()]; - T::write(simd_accumulator, slice); - - let mut reduced = Self::init_accumulator_scalar(); - slice - .iter() - .for_each(|value| Self::accumulate_scalar(&mut reduced, *value)); - - Self::accumulate_scalar(&mut reduced, scalar_accumulator); - - // result can not be None because we checked earlier for the null count - Some(reduced) - } - } - - pub(super) struct MinAggregate { - phantom: PhantomData, - } - - impl SimdAggregate for MinAggregate - where - T::Native: PartialOrd, - { - type ScalarAccumulator = (T::Native, bool); - type SimdAccumulator = (T::Simd, T::SimdMask); - - fn init_accumulator_scalar() -> Self::ScalarAccumulator { - (T::default_value(), false) - } - - fn init_accumulator_chunk() -> Self::SimdAccumulator { - (T::init(T::default_value()), T::mask_init(false)) - } - - fn accumulate_chunk_non_null(accumulator: &mut Self::SimdAccumulator, chunk: T::Simd) { - let acc_is_nan = !T::eq(accumulator.0, accumulator.0); - let is_lt = acc_is_nan | T::lt(chunk, accumulator.0); - let first_or_lt = !accumulator.1 | is_lt; - - accumulator.0 = T::mask_select(first_or_lt, chunk, accumulator.0); - accumulator.1 = T::mask_init(true); - } - - fn accumulate_chunk_nullable( - accumulator: &mut Self::SimdAccumulator, - chunk: T::Simd, - vecmask: T::SimdMask, - ) { - let acc_is_nan = !T::eq(accumulator.0, accumulator.0); - let is_lt = vecmask & (acc_is_nan | T::lt(chunk, accumulator.0)); - let first_or_lt = !accumulator.1 | is_lt; - - accumulator.0 = T::mask_select(first_or_lt, chunk, accumulator.0); - accumulator.1 |= vecmask; - } - - fn accumulate_scalar(accumulator: &mut Self::ScalarAccumulator, value: T::Native) { - if !accumulator.1 { - accumulator.0 = value; - } else { - let acc_is_nan = is_nan(accumulator.0); - if acc_is_nan || value < accumulator.0 { - accumulator.0 = value - } - } - accumulator.1 = true - } - - fn reduce( - simd_accumulator: Self::SimdAccumulator, - scalar_accumulator: Self::ScalarAccumulator, - ) -> Option { - // we can't use T::lanes() as the slice len because it is not const, - // instead always reserve the maximum number of lanes - let mut tmp = [T::default_value(); 64]; - let slice = &mut tmp[0..T::lanes()]; - T::write(simd_accumulator.0, slice); - - let mut reduced = Self::init_accumulator_scalar(); - slice - .iter() - .enumerate() - .filter(|(i, _value)| T::mask_get(&simd_accumulator.1, *i)) - .for_each(|(_i, value)| Self::accumulate_scalar(&mut reduced, *value)); - - if scalar_accumulator.1 { - Self::accumulate_scalar(&mut reduced, scalar_accumulator.0); - } - - if reduced.1 { - Some(reduced.0) - } else { - None - } - } - } - - pub(super) struct MaxAggregate { - phantom: PhantomData, - } - - impl SimdAggregate for MaxAggregate - where - T::Native: PartialOrd, - { - type ScalarAccumulator = (T::Native, bool); - type SimdAccumulator = (T::Simd, T::SimdMask); - - fn init_accumulator_scalar() -> Self::ScalarAccumulator { - (T::default_value(), false) - } - - fn init_accumulator_chunk() -> Self::SimdAccumulator { - (T::init(T::default_value()), T::mask_init(false)) - } - - fn accumulate_chunk_non_null(accumulator: &mut Self::SimdAccumulator, chunk: T::Simd) { - let chunk_is_nan = !T::eq(chunk, chunk); - let is_gt = chunk_is_nan | T::gt(chunk, accumulator.0); - let first_or_gt = !accumulator.1 | is_gt; - - accumulator.0 = T::mask_select(first_or_gt, chunk, accumulator.0); - accumulator.1 = T::mask_init(true); - } - - fn accumulate_chunk_nullable( - accumulator: &mut Self::SimdAccumulator, - chunk: T::Simd, - vecmask: T::SimdMask, - ) { - let chunk_is_nan = !T::eq(chunk, chunk); - let is_gt = vecmask & (chunk_is_nan | T::gt(chunk, accumulator.0)); - let first_or_gt = !accumulator.1 | is_gt; - - accumulator.0 = T::mask_select(first_or_gt, chunk, accumulator.0); - accumulator.1 |= vecmask; - } - - fn accumulate_scalar(accumulator: &mut Self::ScalarAccumulator, value: T::Native) { - if !accumulator.1 { - accumulator.0 = value; - } else { - let value_is_nan = is_nan(value); - if value_is_nan || value > accumulator.0 { - accumulator.0 = value - } - } - accumulator.1 = true; - } - - fn reduce( - simd_accumulator: Self::SimdAccumulator, - scalar_accumulator: Self::ScalarAccumulator, - ) -> Option { - // we can't use T::lanes() as the slice len because it is not const, - // instead always reserve the maximum number of lanes - let mut tmp = [T::default_value(); 64]; - let slice = &mut tmp[0..T::lanes()]; - T::write(simd_accumulator.0, slice); - - let mut reduced = Self::init_accumulator_scalar(); - slice - .iter() - .enumerate() - .filter(|(i, _value)| T::mask_get(&simd_accumulator.1, *i)) - .for_each(|(_i, value)| Self::accumulate_scalar(&mut reduced, *value)); - - if scalar_accumulator.1 { - Self::accumulate_scalar(&mut reduced, scalar_accumulator.0); - } - - if reduced.1 { - Some(reduced.0) - } else { - None - } - } - } - - pub(super) fn simd_aggregation>( - array: &PrimitiveArray, - ) -> Option { - let null_count = array.null_count(); - - if null_count == array.len() { - return None; - } - - let data: &[T::Native] = array.values(); - - let mut chunk_acc = A::init_accumulator_chunk(); - let mut rem_acc = A::init_accumulator_scalar(); - - match array.nulls() { - None => { - let data_chunks = data.chunks_exact(64); - let remainder = data_chunks.remainder(); - - data_chunks.for_each(|chunk| { - chunk.chunks_exact(T::lanes()).for_each(|chunk| { - let chunk = T::load(&chunk); - A::accumulate_chunk_non_null(&mut chunk_acc, chunk); - }); - }); - - remainder.iter().for_each(|value| { - A::accumulate_scalar(&mut rem_acc, *value); - }); - } - Some(nulls) => { - // process data in chunks of 64 elements since we also get 64 bits of validity information at a time - let data_chunks = data.chunks_exact(64); - let remainder = data_chunks.remainder(); - - let bit_chunks = nulls.inner().bit_chunks(); - let remainder_bits = bit_chunks.remainder_bits(); - - data_chunks.zip(bit_chunks).for_each(|(chunk, mut mask)| { - // split chunks further into slices corresponding to the vector length - // the compiler is able to unroll this inner loop and remove bounds checks - // since the outer chunk size (64) is always a multiple of the number of lanes - chunk.chunks_exact(T::lanes()).for_each(|chunk| { - let vecmask = T::mask_from_u64(mask); - let chunk = T::load(&chunk); - - A::accumulate_chunk_nullable(&mut chunk_acc, chunk, vecmask); - - // skip the shift and avoid overflow for u8 type, which uses 64 lanes. - mask >>= T::lanes() % 64; - }); - }); - - remainder.iter().enumerate().for_each(|(i, value)| { - if remainder_bits & (1 << i) != 0 { - A::accumulate_scalar(&mut rem_acc, *value) - } - }); - } - } - - A::reduce(chunk_acc, rem_acc) - } -} - /// Returns the sum of values in the primitive array. /// /// Returns `None` if the array is empty or only contains null values. /// /// This doesn't detect overflow in release mode by default. Once overflowing, the result will /// wrap around. For an overflow-checking variant, use `sum_checked` instead. -#[cfg(feature = "simd")] pub fn sum(array: &PrimitiveArray) -> Option where T::Native: ArrowNativeTypeOp, { - use simd::*; - - simd::simd_aggregation::>(&array) + aggregate::>(array) } -#[cfg(feature = "simd")] /// Returns the minimum value in the array, according to the natural order. /// For floating point arrays any NaN values are considered to be greater than any other non-null value pub fn min(array: &PrimitiveArray) -> Option where T::Native: PartialOrd, { - use simd::*; - - simd::simd_aggregation::>(&array) + aggregate::>(array) } -#[cfg(feature = "simd")] /// Returns the maximum value in the array, according to the natural order. /// For floating point arrays any NaN values are considered to be greater than any other non-null value pub fn max(array: &PrimitiveArray) -> Option where T::Native: PartialOrd, { - use simd::*; - - simd::simd_aggregation::>(&array) + aggregate::>(array) } #[cfg(test)] @@ -1455,7 +1320,6 @@ mod tests { } #[test] - #[cfg(not(feature = "simd"))] fn test_sum_overflow() { let a = Int32Array::from(vec![i32::MAX, 1]); diff --git a/arrow-array/src/arithmetic.rs b/arrow-array/src/arithmetic.rs index c9be39d4414..d6e2e04faa8 100644 --- a/arrow-array/src/arithmetic.rs +++ b/arrow-array/src/arithmetic.rs @@ -45,6 +45,19 @@ pub trait ArrowNativeTypeOp: ArrowNativeType { /// The multiplicative identity const ONE: Self; + /// The minimum value and identity for the `max` aggregation. + /// Note that the aggregation uses the total order predicate for floating point values, + /// which means that this value is a negative NaN. + const MIN: Self; + + /// The maximum value and identity for the `min` aggregation. + /// Note that the aggregation uses the total order predicate for floating point values, + /// which means that this value is a positive NaN. + const MAX: Self; + + /// The number of bytes occupied by this type + const BYTES: usize; + /// Checked addition operation fn add_checked(self, rhs: Self) -> Result; @@ -129,12 +142,15 @@ pub trait ArrowNativeTypeOp: ArrowNativeType { macro_rules! native_type_op { ($t:tt) => { - native_type_op!($t, 0, 1); + native_type_op!($t, 0, 1, $t::MIN, $t::MAX); }; - ($t:tt, $zero:expr, $one: expr) => { + ($t:tt, $zero:expr, $one: expr, $min: expr, $max: expr) => { impl ArrowNativeTypeOp for $t { const ZERO: Self = $zero; const ONE: Self = $one; + const MIN: Self = $min; + const MAX: Self = $max; + const BYTES: usize = std::mem::size_of::(); #[inline] fn add_checked(self, rhs: Self) -> Result { @@ -270,13 +286,57 @@ native_type_op!(u8); native_type_op!(u16); native_type_op!(u32); native_type_op!(u64); -native_type_op!(i256, i256::ZERO, i256::ONE); +native_type_op!(i256, i256::ZERO, i256::ONE, i256::MIN, i256::MAX); + +/* +trait ToTotalOrder { + type TotalOrder; + fn to_total_order(self) -> Self::TotalOrder; +} + +impl ToTotalOrder for f64 { + type TotalOrder = u64; + + #[inline] + fn to_total_order(self) -> Self::TotalOrder { + // reading via integer pointer instead of calling to_bits seems to avoid a move from xmm to gp reg + // let bits = unsafe { (self as *const f64 as *const u64).read() }; + let bits = self.to_bits(); + (bits ^ ((bits as i64 >> 63) as u64 >> 1)) ^ (1 << 63) + } +} + +impl ToTotalOrder for f32 { + type TotalOrder = u32; + + #[inline] + fn to_total_order(self) -> Self::TotalOrder { + // reading via integer pointer instead of calling to_bits seems to avoid a move from xmm to gp reg + // let bits = unsafe { (self as *const f32 as *const u32).read() }; + let bits = self.to_bits(); + (bits ^ ((bits as i32 >> 31) as u32 >> 1)) ^ (1 << 31) + } +} + +impl ToTotalOrder for f16 { + type TotalOrder = u16; + + #[inline] + fn to_total_order(self) -> Self::TotalOrder { + let bits = self.to_bits(); + (bits ^ ((bits as i16 >> 15) as u16 >> 1)) ^ (1 << 15) + } +} +*/ macro_rules! native_type_float_op { - ($t:tt, $zero:expr, $one:expr) => { + ($t:tt, $zero:expr, $one:expr, $min:expr, $max:expr, $bits:ty) => { impl ArrowNativeTypeOp for $t { const ZERO: Self = $zero; const ONE: Self = $one; + const MIN: Self = $min; + const MAX: Self = $max; + const BYTES: usize = std::mem::size_of::(); #[inline] fn add_checked(self, rhs: Self) -> Result { @@ -377,9 +437,16 @@ macro_rules! native_type_float_op { }; } -native_type_float_op!(f16, f16::ZERO, f16::ONE); -native_type_float_op!(f32, 0., 1.); -native_type_float_op!(f64, 0., 1.); +native_type_float_op!( + f16, + f16::ZERO, + f16::ONE, + f16::from_bits(f16::NAN.to_bits() ^ 0x8000), + f16::NAN, + u16 +); +native_type_float_op!(f32, 0., 1., -f32::NAN, f32::NAN, u32); +native_type_float_op!(f64, 0., 1., -f64::NAN, f64::NAN, u64); #[cfg(test)] mod tests { @@ -780,4 +847,16 @@ mod tests { assert_eq!(8.0_f32.pow_checked(2_u32).unwrap(), 64_f32); assert_eq!(8.0_f64.pow_checked(2_u32).unwrap(), 64_f64); } + + #[test] + fn test_float_total_order_identity() { + assert!(::MIN.is_lt(f64::NEG_INFINITY)); + assert!(::MAX.is_gt(f64::INFINITY)); + + assert!(::MIN.is_lt(f32::NEG_INFINITY)); + assert!(::MAX.is_gt(f32::INFINITY)); + + assert!(::MIN.is_lt(f16::NEG_INFINITY)); + assert!(::MAX.is_gt(f16::INFINITY)); + } } diff --git a/arrow-buffer/src/buffer/boolean.rs b/arrow-buffer/src/buffer/boolean.rs index c651edcad92..1589cc5b102 100644 --- a/arrow-buffer/src/buffer/boolean.rs +++ b/arrow-buffer/src/buffer/boolean.rs @@ -90,6 +90,7 @@ impl BooleanBuffer { /// Returns a `BitChunks` instance which can be used to iterate over /// this buffer's bits in `u64` chunks + #[inline] pub fn bit_chunks(&self) -> BitChunks { BitChunks::new(self.values(), self.offset, self.len) } From d04849a77515d2962c63bf2b878a914e1d9805d9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rn=20Horstmann?= Date: Mon, 20 Nov 2023 22:56:38 +0100 Subject: [PATCH 2/7] Comments and cleanup --- arrow-arith/src/aggregate.rs | 58 +++++++++++++++++++++++------------ arrow-array/src/arithmetic.rs | 46 --------------------------- 2 files changed, 38 insertions(+), 66 deletions(-) diff --git a/arrow-arith/src/aggregate.rs b/arrow-arith/src/aggregate.rs index 64233753084..321e96e0c14 100644 --- a/arrow-arith/src/aggregate.rs +++ b/arrow-arith/src/aggregate.rs @@ -27,6 +27,21 @@ use arrow_schema::*; use std::borrow::BorrowMut; use std::ops::{BitAnd, BitOr, BitXor}; +/// An accumulator for primitive numeric values. +trait NumericAccumulator: Copy + Default { + /// Accumulate a non-null value. + fn accumulate(&mut self, value: T); + /// Accumulate a nullable values. + /// If `valid` is false the `value` should not affect the accumulator state. + fn accumulate_nullable(&mut self, value: T, valid: bool); + /// Merge another accumulator into this accumulator + fn merge(&mut self, other: Self); + /// Return the aggregated value. + fn finish(&mut self) -> T; +} + +/// Helper for branchlessly selecting either `a` or `b` based on the boolean `m`. +/// After verifying the generated assembly this can be a simple `if`. #[inline(always)] fn select(m: bool, a: T, b: T) -> T { if m { @@ -36,13 +51,6 @@ fn select(m: bool, a: T, b: T) -> T { } } -trait NumericAccumulator: Copy + Default { - fn accumulate(&mut self, value: T); - fn accumulate_nullable(&mut self, value: T, valid: bool); - fn merge(&mut self, other: Self); - fn finish(&mut self) -> T; -} - #[derive(Clone, Copy)] struct SumAccumulator { sum: T, @@ -195,6 +203,7 @@ fn aggregate_nonnull_simple>(valu fn aggregate_nonnull_lanes, const LANES: usize>( values: &[T], ) -> T { + // aggregate into multiple independent accumulators allows the compiler to use vector registers let mut acc = [A::default(); LANES]; let mut chunks = values.chunks_exact(LANES); chunks.borrow_mut().for_each(|chunk| { @@ -221,7 +230,9 @@ fn aggregate_nullable_lanes, cons assert!(LANES > 0 && 64 % LANES == 0); assert_eq!(values.len(), validity.len()); + // aggregate into multiple independent accumulators allows the compiler to use vector registers let mut acc = [A::default(); LANES]; + // we process 64 bits of validity at a time let mut values_chunks = values.chunks_exact(64); let validity_chunks = validity.inner().bit_chunks(); let mut validity_chunks_iter = validity_chunks.iter(); @@ -229,6 +240,7 @@ fn aggregate_nullable_lanes, cons values_chunks.borrow_mut().for_each(|chunk| { // Safety: we asserted that values and validity have the same length and trust the iterator impl let mut validity = unsafe { validity_chunks_iter.next().unwrap_unchecked() }; + // chunk further based on the number of vector lanes chunk.chunks_exact(LANES).for_each(|chunk| { aggregate_nullable_chunk(&mut acc, chunk[..LANES].try_into().unwrap(), validity); validity >>= LANES; @@ -258,8 +270,8 @@ fn aggregate_nullable_lanes, cons reduce_accumulators(acc).finish() } -// The preferred vector size in bytes for the target platform. -// Note that the avx512 target feature is still unstable and this also means it is not detected on stable rust. +/// The preferred vector size in bytes for the target platform. +/// Note that the avx512 target feature is still unstable and this also means it is not detected on stable rust. const PREFERRED_VECTOR_SIZE: usize = if cfg!(all(target_arch = "x86_64", target_feature = "avx512f")) { 64 @@ -269,9 +281,11 @@ const PREFERRED_VECTOR_SIZE: usize = 16 }; -// non-nullable aggregation requires fewer temporary registers so we can use more of them for accumulators +/// non-nullable aggregation requires fewer temporary registers so we can use more of them for accumulators const PREFERRED_VECTOR_SIZE_NON_NULL: usize = PREFERRED_VECTOR_SIZE * 2; +/// Generic aggregation for any primitive type. +/// Returns None if there are no non-null values in `array`. fn aggregate, A: NumericAccumulator>( array: &PrimitiveArray

, ) -> Option { @@ -281,15 +295,19 @@ fn aggregate, A: Numeric } let values = array.values().as_ref(); match array.nulls() { - Some(nulls) if null_count > 0 => match PREFERRED_VECTOR_SIZE / std::mem::size_of::() { - 64 => Some(aggregate_nullable_lanes::(values, nulls)), - 32 => Some(aggregate_nullable_lanes::(values, nulls)), - 16 => Some(aggregate_nullable_lanes::(values, nulls)), - 8 => Some(aggregate_nullable_lanes::(values, nulls)), - 4 => Some(aggregate_nullable_lanes::(values, nulls)), - 2 => Some(aggregate_nullable_lanes::(values, nulls)), - _ => Some(aggregate_nullable_lanes::(values, nulls)), - }, + Some(nulls) if null_count > 0 => { + // const generics depending on a generic type parameter are not supported + // so we have to match and call aggregate with the corresponding constant + match PREFERRED_VECTOR_SIZE / std::mem::size_of::() { + 64 => Some(aggregate_nullable_lanes::(values, nulls)), + 32 => Some(aggregate_nullable_lanes::(values, nulls)), + 16 => Some(aggregate_nullable_lanes::(values, nulls)), + 8 => Some(aggregate_nullable_lanes::(values, nulls)), + 4 => Some(aggregate_nullable_lanes::(values, nulls)), + 2 => Some(aggregate_nullable_lanes::(values, nulls)), + _ => Some(aggregate_nullable_lanes::(values, nulls)), + } + } _ => { let is_float = matches!( array.data_type(), @@ -303,7 +321,7 @@ fn aggregate, A: Numeric 8 => Some(aggregate_nonnull_lanes::(values)), 4 => Some(aggregate_nonnull_lanes::(values)), 2 => Some(aggregate_nonnull_lanes::(values)), - _ => Some(aggregate_nonnull_lanes::(values)), + _ => Some(aggregate_nonnull_simple::(values)), } } else { // for non-null integers its better to not chunk ourselves and instead diff --git a/arrow-array/src/arithmetic.rs b/arrow-array/src/arithmetic.rs index d6e2e04faa8..20e98aaa257 100644 --- a/arrow-array/src/arithmetic.rs +++ b/arrow-array/src/arithmetic.rs @@ -55,9 +55,6 @@ pub trait ArrowNativeTypeOp: ArrowNativeType { /// which means that this value is a positive NaN. const MAX: Self; - /// The number of bytes occupied by this type - const BYTES: usize; - /// Checked addition operation fn add_checked(self, rhs: Self) -> Result; @@ -150,7 +147,6 @@ macro_rules! native_type_op { const ONE: Self = $one; const MIN: Self = $min; const MAX: Self = $max; - const BYTES: usize = std::mem::size_of::(); #[inline] fn add_checked(self, rhs: Self) -> Result { @@ -288,47 +284,6 @@ native_type_op!(u32); native_type_op!(u64); native_type_op!(i256, i256::ZERO, i256::ONE, i256::MIN, i256::MAX); -/* -trait ToTotalOrder { - type TotalOrder; - fn to_total_order(self) -> Self::TotalOrder; -} - -impl ToTotalOrder for f64 { - type TotalOrder = u64; - - #[inline] - fn to_total_order(self) -> Self::TotalOrder { - // reading via integer pointer instead of calling to_bits seems to avoid a move from xmm to gp reg - // let bits = unsafe { (self as *const f64 as *const u64).read() }; - let bits = self.to_bits(); - (bits ^ ((bits as i64 >> 63) as u64 >> 1)) ^ (1 << 63) - } -} - -impl ToTotalOrder for f32 { - type TotalOrder = u32; - - #[inline] - fn to_total_order(self) -> Self::TotalOrder { - // reading via integer pointer instead of calling to_bits seems to avoid a move from xmm to gp reg - // let bits = unsafe { (self as *const f32 as *const u32).read() }; - let bits = self.to_bits(); - (bits ^ ((bits as i32 >> 31) as u32 >> 1)) ^ (1 << 31) - } -} - -impl ToTotalOrder for f16 { - type TotalOrder = u16; - - #[inline] - fn to_total_order(self) -> Self::TotalOrder { - let bits = self.to_bits(); - (bits ^ ((bits as i16 >> 15) as u16 >> 1)) ^ (1 << 15) - } -} -*/ - macro_rules! native_type_float_op { ($t:tt, $zero:expr, $one:expr, $min:expr, $max:expr, $bits:ty) => { impl ArrowNativeTypeOp for $t { @@ -336,7 +291,6 @@ macro_rules! native_type_float_op { const ONE: Self = $one; const MIN: Self = $min; const MAX: Self = $max; - const BYTES: usize = std::mem::size_of::(); #[inline] fn add_checked(self, rhs: Self) -> Result { From 9b0b727a2a030fbe157f65139555a984133de730 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rn=20Horstmann?= Date: Mon, 20 Nov 2023 23:23:48 +0100 Subject: [PATCH 3/7] Clippy fixes --- arrow-arith/src/aggregate.rs | 17 +++++++---------- 1 file changed, 7 insertions(+), 10 deletions(-) diff --git a/arrow-arith/src/aggregate.rs b/arrow-arith/src/aggregate.rs index 321e96e0c14..e2d9bb1fccd 100644 --- a/arrow-arith/src/aggregate.rs +++ b/arrow-arith/src/aggregate.rs @@ -203,7 +203,8 @@ fn aggregate_nonnull_simple>(valu fn aggregate_nonnull_lanes, const LANES: usize>( values: &[T], ) -> T { - // aggregate into multiple independent accumulators allows the compiler to use vector registers + // aggregating into multiple independent accumulators allows the compiler to use vector registers + // with a single accumulator the compiler would not be allowed to reorder floating point addition let mut acc = [A::default(); LANES]; let mut chunks = values.chunks_exact(LANES); chunks.borrow_mut().for_each(|chunk| { @@ -211,12 +212,8 @@ fn aggregate_nonnull_lanes, const }); let remainder = chunks.remainder(); - if remainder.len() > 0 { - if remainder.len() > 0 { - for i in 0..remainder.len() { - acc[i].accumulate(remainder[i]); - } - } + for i in 0..remainder.len() { + acc[i].accumulate(remainder[i]); } reduce_accumulators(acc).finish() @@ -230,7 +227,7 @@ fn aggregate_nullable_lanes, cons assert!(LANES > 0 && 64 % LANES == 0); assert_eq!(values.len(), validity.len()); - // aggregate into multiple independent accumulators allows the compiler to use vector registers + // aggregating into multiple independent accumulators allows the compiler to use vector registers let mut acc = [A::default(); LANES]; // we process 64 bits of validity at a time let mut values_chunks = values.chunks_exact(64); @@ -248,7 +245,7 @@ fn aggregate_nullable_lanes, cons }); let remainder = values_chunks.remainder(); - if remainder.len() > 0 { + if !remainder.is_empty() { let mut validity = validity_chunks.remainder_bits(); let mut remainder_chunks = remainder.chunks_exact(LANES); @@ -258,7 +255,7 @@ fn aggregate_nullable_lanes, cons }); let remainder = remainder_chunks.remainder(); - if remainder.len() > 0 { + if !remainder.is_empty() { let mut bit = 1; for i in 0..remainder.len() { acc[i].accumulate_nullable(remainder[i], (validity & bit) != 0); From 739c52cbca71e6508899686ac8987b4b9bee44d6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rn=20Horstmann?= Date: Tue, 21 Nov 2023 23:14:44 +0100 Subject: [PATCH 4/7] Use largest/smallest bit patterns for float MIN/MAX constants, these differ from the canonical NAN bit pattern --- arrow-array/src/arithmetic.rs | 52 ++++++++++++++++++++++++++++++----- 1 file changed, 45 insertions(+), 7 deletions(-) diff --git a/arrow-array/src/arithmetic.rs b/arrow-array/src/arithmetic.rs index 20e98aaa257..d8f3e750155 100644 --- a/arrow-array/src/arithmetic.rs +++ b/arrow-array/src/arithmetic.rs @@ -285,7 +285,7 @@ native_type_op!(u64); native_type_op!(i256, i256::ZERO, i256::ONE, i256::MIN, i256::MAX); macro_rules! native_type_float_op { - ($t:tt, $zero:expr, $one:expr, $min:expr, $max:expr, $bits:ty) => { + ($t:tt, $zero:expr, $one:expr, $min:expr, $max:expr) => { impl ArrowNativeTypeOp for $t { const ZERO: Self = $zero; const ONE: Self = $one; @@ -391,16 +391,30 @@ macro_rules! native_type_float_op { }; } +// the smallest/largest bit patterns for floating point numbers are NaN, but differ from the canonical NAN constants. +// See test_float_total_order_min_max for details. native_type_float_op!( f16, f16::ZERO, f16::ONE, - f16::from_bits(f16::NAN.to_bits() ^ 0x8000), - f16::NAN, - u16 + f16::from_bits(-1 as _), + f16::from_bits(i16::MAX as _) +); +// from_bits is not yet stable as const fn, see https://github.com/rust-lang/rust/issues/72447 +native_type_float_op!( + f32, + 0., + 1., + unsafe { std::mem::transmute(-1_i32) }, + unsafe { std::mem::transmute(i32::MAX) } +); +native_type_float_op!( + f64, + 0., + 1., + unsafe { std::mem::transmute(-1_i64) }, + unsafe { std::mem::transmute(i64::MAX) } ); -native_type_float_op!(f32, 0., 1., -f32::NAN, f32::NAN, u32); -native_type_float_op!(f64, 0., 1., -f64::NAN, f64::NAN, u64); #[cfg(test)] mod tests { @@ -803,14 +817,38 @@ mod tests { } #[test] - fn test_float_total_order_identity() { + fn test_float_total_order_min_max() { assert!(::MIN.is_lt(f64::NEG_INFINITY)); assert!(::MAX.is_gt(f64::INFINITY)); + assert!(::MIN.is_nan()); + assert!(::MIN.is_sign_negative()); + assert!(::MIN.is_lt(-f64::NAN)); + + assert!(::MAX.is_nan()); + assert!(::MAX.is_sign_positive()); + assert!(::MAX.is_gt(f64::NAN)); + assert!(::MIN.is_lt(f32::NEG_INFINITY)); assert!(::MAX.is_gt(f32::INFINITY)); + assert!(::MIN.is_nan()); + assert!(::MIN.is_sign_negative()); + assert!(::MIN.is_lt(-f32::NAN)); + + assert!(::MAX.is_nan()); + assert!(::MAX.is_sign_positive()); + assert!(::MAX.is_gt(f32::NAN)); + assert!(::MIN.is_lt(f16::NEG_INFINITY)); assert!(::MAX.is_gt(f16::INFINITY)); + + assert!(::MIN.is_nan()); + assert!(::MIN.is_sign_negative()); + assert!(::MIN.is_lt(-f16::NAN)); + + assert!(::MAX.is_nan()); + assert!(::MAX.is_sign_positive()); + assert!(::MAX.is_gt(f16::NAN)); } } From 10df385dd9b1d438d8fee5e204836ee6ea956972 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rn=20Horstmann?= Date: Tue, 21 Nov 2023 23:46:17 +0100 Subject: [PATCH 5/7] Add test coverage for aggregating large non-null and float inputs --- arrow-arith/src/aggregate.rs | 57 ++++++++++++++++++++++++++++++++++-- 1 file changed, 54 insertions(+), 3 deletions(-) diff --git a/arrow-arith/src/aggregate.rs b/arrow-arith/src/aggregate.rs index e2d9bb1fccd..b2562d75ffa 100644 --- a/arrow-arith/src/aggregate.rs +++ b/arrow-arith/src/aggregate.rs @@ -752,8 +752,41 @@ mod tests { assert_eq!(None, sum(&a)); } + #[test] + fn test_primitive_array_sum_large_float_64() { + let c = Float64Array::new((1..=100).map(|x| x as f64).collect(), None); + assert_eq!(Some((1..=100).sum::() as f64), sum(&c)); + + // create an array that actually has non-zero values at the invalid indices + let validity = NullBuffer::new((1..=100).map(|x| x % 3 == 0).collect()); + let c = Float64Array::new((1..=100).map(|x| x as f64).collect(), Some(validity)); + + assert_eq!( + Some((1..=100).filter(|i| i % 3 == 0).sum::() as f64), + sum(&c) + ); + } + + #[test] + fn test_primitive_array_sum_large_float_32() { + let c = Float32Array::new((1..=100).map(|x| x as f32).collect(), None); + assert_eq!(Some((1..=100).sum::() as f32), sum(&c)); + + // create an array that actually has non-zero values at the invalid indices + let validity = NullBuffer::new((1..=100).map(|x| x % 3 == 0).collect()); + let c = Float32Array::new((1..=100).map(|x| x as f32).collect(), Some(validity)); + + assert_eq!( + Some((1..=100).filter(|i| i % 3 == 0).sum::() as f32), + sum(&c) + ); + } + #[test] fn test_primitive_array_sum_large_64() { + let c = Int64Array::new((1..=100).collect(), None); + assert_eq!(Some((1..=100).sum()), sum(&c)); + // create an array that actually has non-zero values at the invalid indices let validity = NullBuffer::new((1..=100).map(|x| x % 3 == 0).collect()); let c = Int64Array::new((1..=100).collect(), Some(validity)); @@ -763,6 +796,9 @@ mod tests { #[test] fn test_primitive_array_sum_large_32() { + let c = Int32Array::new((1..=100).collect(), None); + assert_eq!(Some((1..=100).sum()), sum(&c)); + // create an array that actually has non-zero values at the invalid indices let validity = NullBuffer::new((1..=100).map(|x| x % 3 == 0).collect()); let c = Int32Array::new((1..=100).collect(), Some(validity)); @@ -771,6 +807,9 @@ mod tests { #[test] fn test_primitive_array_sum_large_16() { + let c = Int16Array::new((1..=100).collect(), None); + assert_eq!(Some((1..=100).sum()), sum(&c)); + // create an array that actually has non-zero values at the invalid indices let validity = NullBuffer::new((1..=100).map(|x| x % 3 == 0).collect()); let c = Int16Array::new((1..=100).collect(), Some(validity)); @@ -779,11 +818,23 @@ mod tests { #[test] fn test_primitive_array_sum_large_8() { - // include fewer values than other large tests so the result does not overflow the u8 + let c = UInt8Array::new((1..=100).collect(), None); + assert_eq!( + Some((1..=100).fold(0_u8, |a, x| a.wrapping_add(x))), + sum(&c) + ); + // create an array that actually has non-zero values at the invalid indices - let validity = NullBuffer::new((1..=100).map(|x| x % 33 == 0).collect()); + let validity = NullBuffer::new((1..=100).map(|x| x % 3 == 0).collect()); let c = UInt8Array::new((1..=100).collect(), Some(validity)); - assert_eq!(Some((1..=100).filter(|i| i % 33 == 0).sum()), sum(&c)); + assert_eq!( + Some( + (1..=100) + .filter(|i| i % 3 == 0) + .fold(0_u8, |a, x| a.wrapping_add(x)) + ), + sum(&c) + ); } #[test] From 6a6b5f55f45c0956cb6cc13ed4fbffbaaf38b2c2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rn=20Horstmann?= Date: Tue, 21 Nov 2023 23:55:26 +0100 Subject: [PATCH 6/7] Add test with negative NaN --- arrow-arith/src/aggregate.rs | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/arrow-arith/src/aggregate.rs b/arrow-arith/src/aggregate.rs index b2562d75ffa..ffbdc54c4ac 100644 --- a/arrow-arith/src/aggregate.rs +++ b/arrow-arith/src/aggregate.rs @@ -1034,6 +1034,19 @@ mod tests { assert!(min(&a).unwrap().is_nan()); } + #[test] + fn test_primitive_min_max_float_negative_nan() { + let a: Float64Array = + Float64Array::from(vec![f64::NEG_INFINITY, f64::NAN, f64::INFINITY, -f64::NAN]); + let max = max(&a).unwrap(); + let min = min(&a).unwrap(); + assert!(max.is_nan()); + assert!(max.is_sign_positive()); + + assert!(min.is_nan()); + assert!(min.is_sign_negative()); + } + #[test] fn test_primitive_min_max_float_first_nan_nonnull() { let a: Float64Array = (0..100) From 175c7996dd0829605ed67f93027c6351667f8422 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rn=20Horstmann?= Date: Tue, 21 Nov 2023 23:57:40 +0100 Subject: [PATCH 7/7] Rename MIN/MAX constants to make it explicit they use the total order relation --- arrow-arith/src/aggregate.rs | 8 +++-- arrow-array/src/arithmetic.rs | 60 +++++++++++++++++------------------ 2 files changed, 36 insertions(+), 32 deletions(-) diff --git a/arrow-arith/src/aggregate.rs b/arrow-arith/src/aggregate.rs index ffbdc54c4ac..20ff0711d73 100644 --- a/arrow-arith/src/aggregate.rs +++ b/arrow-arith/src/aggregate.rs @@ -88,7 +88,9 @@ struct MinAccumulator { impl Default for MinAccumulator { fn default() -> Self { - Self { min: T::MAX } + Self { + min: T::MAX_TOTAL_ORDER, + } } } @@ -120,7 +122,9 @@ struct MaxAccumulator { impl Default for MaxAccumulator { fn default() -> Self { - Self { max: T::MIN } + Self { + max: T::MIN_TOTAL_ORDER, + } } } diff --git a/arrow-array/src/arithmetic.rs b/arrow-array/src/arithmetic.rs index d8f3e750155..59053619030 100644 --- a/arrow-array/src/arithmetic.rs +++ b/arrow-array/src/arithmetic.rs @@ -48,12 +48,12 @@ pub trait ArrowNativeTypeOp: ArrowNativeType { /// The minimum value and identity for the `max` aggregation. /// Note that the aggregation uses the total order predicate for floating point values, /// which means that this value is a negative NaN. - const MIN: Self; + const MIN_TOTAL_ORDER: Self; /// The maximum value and identity for the `min` aggregation. /// Note that the aggregation uses the total order predicate for floating point values, /// which means that this value is a positive NaN. - const MAX: Self; + const MAX_TOTAL_ORDER: Self; /// Checked addition operation fn add_checked(self, rhs: Self) -> Result; @@ -145,8 +145,8 @@ macro_rules! native_type_op { impl ArrowNativeTypeOp for $t { const ZERO: Self = $zero; const ONE: Self = $one; - const MIN: Self = $min; - const MAX: Self = $max; + const MIN_TOTAL_ORDER: Self = $min; + const MAX_TOTAL_ORDER: Self = $max; #[inline] fn add_checked(self, rhs: Self) -> Result { @@ -289,8 +289,8 @@ macro_rules! native_type_float_op { impl ArrowNativeTypeOp for $t { const ZERO: Self = $zero; const ONE: Self = $one; - const MIN: Self = $min; - const MAX: Self = $max; + const MIN_TOTAL_ORDER: Self = $min; + const MAX_TOTAL_ORDER: Self = $max; #[inline] fn add_checked(self, rhs: Self) -> Result { @@ -818,37 +818,37 @@ mod tests { #[test] fn test_float_total_order_min_max() { - assert!(::MIN.is_lt(f64::NEG_INFINITY)); - assert!(::MAX.is_gt(f64::INFINITY)); + assert!(::MIN_TOTAL_ORDER.is_lt(f64::NEG_INFINITY)); + assert!(::MAX_TOTAL_ORDER.is_gt(f64::INFINITY)); - assert!(::MIN.is_nan()); - assert!(::MIN.is_sign_negative()); - assert!(::MIN.is_lt(-f64::NAN)); + assert!(::MIN_TOTAL_ORDER.is_nan()); + assert!(::MIN_TOTAL_ORDER.is_sign_negative()); + assert!(::MIN_TOTAL_ORDER.is_lt(-f64::NAN)); - assert!(::MAX.is_nan()); - assert!(::MAX.is_sign_positive()); - assert!(::MAX.is_gt(f64::NAN)); + assert!(::MAX_TOTAL_ORDER.is_nan()); + assert!(::MAX_TOTAL_ORDER.is_sign_positive()); + assert!(::MAX_TOTAL_ORDER.is_gt(f64::NAN)); - assert!(::MIN.is_lt(f32::NEG_INFINITY)); - assert!(::MAX.is_gt(f32::INFINITY)); + assert!(::MIN_TOTAL_ORDER.is_lt(f32::NEG_INFINITY)); + assert!(::MAX_TOTAL_ORDER.is_gt(f32::INFINITY)); - assert!(::MIN.is_nan()); - assert!(::MIN.is_sign_negative()); - assert!(::MIN.is_lt(-f32::NAN)); + assert!(::MIN_TOTAL_ORDER.is_nan()); + assert!(::MIN_TOTAL_ORDER.is_sign_negative()); + assert!(::MIN_TOTAL_ORDER.is_lt(-f32::NAN)); - assert!(::MAX.is_nan()); - assert!(::MAX.is_sign_positive()); - assert!(::MAX.is_gt(f32::NAN)); + assert!(::MAX_TOTAL_ORDER.is_nan()); + assert!(::MAX_TOTAL_ORDER.is_sign_positive()); + assert!(::MAX_TOTAL_ORDER.is_gt(f32::NAN)); - assert!(::MIN.is_lt(f16::NEG_INFINITY)); - assert!(::MAX.is_gt(f16::INFINITY)); + assert!(::MIN_TOTAL_ORDER.is_lt(f16::NEG_INFINITY)); + assert!(::MAX_TOTAL_ORDER.is_gt(f16::INFINITY)); - assert!(::MIN.is_nan()); - assert!(::MIN.is_sign_negative()); - assert!(::MIN.is_lt(-f16::NAN)); + assert!(::MIN_TOTAL_ORDER.is_nan()); + assert!(::MIN_TOTAL_ORDER.is_sign_negative()); + assert!(::MIN_TOTAL_ORDER.is_lt(-f16::NAN)); - assert!(::MAX.is_nan()); - assert!(::MAX.is_sign_positive()); - assert!(::MAX.is_gt(f16::NAN)); + assert!(::MAX_TOTAL_ORDER.is_nan()); + assert!(::MAX_TOTAL_ORDER.is_sign_positive()); + assert!(::MAX_TOTAL_ORDER.is_gt(f16::NAN)); } }