From 0e41362c986272d7f12595e3fed698af3319f47e Mon Sep 17 00:00:00 2001 From: Simon Vandel Sillesen Date: Sat, 22 Jul 2023 12:24:10 +0000 Subject: [PATCH 01/10] almost same performance for null as well (72ns simd or 83ns for auto-vectorized) --- arrow-arith/src/aggregate.rs | 74 +++++++++++++++++++++++------- arrow/benches/aggregate_kernels.rs | 6 +++ 2 files changed, 63 insertions(+), 17 deletions(-) diff --git a/arrow-arith/src/aggregate.rs b/arrow-arith/src/aggregate.rs index 04417c666c8..e09365d0125 100644 --- a/arrow-arith/src/aggregate.rs +++ b/arrow-arith/src/aggregate.rs @@ -273,8 +273,8 @@ where /// /// This doesn't detect overflow. Once overflowing, the result will wrap around. /// For an overflow-checking variant, use `sum_checked` instead. -#[cfg(not(feature = "simd"))] -pub fn sum(array: &PrimitiveArray) -> Option +// #[cfg(not(feature = "simd"))] +pub fn sum_test(array: &PrimitiveArray) -> Option where T: ArrowNumericType, T::Native: ArrowNativeTypeOp, @@ -286,42 +286,82 @@ where } let data: &[T::Native] = array.values(); + const LANES: usize = 16; + let mut chunk_acc = [T::default_value(); LANES]; + let mut rem_acc = T::default_value(); match array.nulls() { None => { - let sum = data.iter().fold(T::default_value(), |accumulator, value| { - accumulator.add_wrapping(*value) + let data_chunks = data.chunks_exact(64); + let remainder = data_chunks.remainder(); + + data_chunks.for_each(|chunk| { + chunk.chunks_exact(LANES).for_each(|chunk| { + let chunk: [T::Native; LANES] = chunk.try_into().unwrap(); + + for i in 0..LANES { + chunk_acc[i] = chunk_acc[i].add_wrapping(chunk[i]); + } + }) }); + remainder.iter().copied().for_each(|value| { + rem_acc = rem_acc.add_wrapping(value); + }); + + let mut reduced = T::default_value(); + for i in 0..LANES { + reduced = reduced.add_wrapping(chunk_acc[i]); + } + let sum = reduced.add_wrapping(rem_acc); + Some(sum) } Some(nulls) => { - let mut sum = T::default_value(); + // let mut sum = T::default_value(); + // process data in chunks of 64 elements since we also get 64 bits of validity information at a time let data_chunks = data.chunks_exact(64); let remainder = data_chunks.remainder(); let bit_chunks = nulls.inner().bit_chunks(); - data_chunks - .zip(bit_chunks.iter()) - .for_each(|(chunk, mask)| { - // index_mask has value 1 << i in the loop - let mut index_mask = 1; - chunk.iter().for_each(|value| { - if (mask & index_mask) != 0 { - sum = sum.add_wrapping(*value); + let remainder_bits = bit_chunks.remainder_bits(); + + data_chunks.zip(bit_chunks).for_each(|(chunk, mut mask)| { + // index_mask has value 1 << i in the loop + let mut index_mask = 1; + // split chunks further into slices corresponding to the vector length + // the compiler is able to unroll this inner loop and remove bounds checks + // since the outer chunk size (64) is always a multiple of the number of lanes + chunk.chunks_exact(LANES).for_each(|chunk| { + let mut chunk: [T::Native; LANES] = chunk.try_into().unwrap(); + + // mask_select + for i in 0..LANES { + if mask & index_mask == 0 { + chunk[i] = T::default_value(); } index_mask <<= 1; - }); - }); + } - let remainder_bits = bit_chunks.remainder_bits(); + // by now blended[i] will be 0 if null, or else the value + for i in 0..LANES { + chunk_acc[i] = chunk_acc[i].add_wrapping(chunk[i]); + } + }) + }); remainder.iter().enumerate().for_each(|(i, value)| { if remainder_bits & (1 << i) != 0 { - sum = sum.add_wrapping(*value); + rem_acc = rem_acc.add_wrapping(*value); } }); + let mut reduced = T::default_value(); + for i in 0..LANES { + reduced = reduced.add_wrapping(chunk_acc[i]); + } + let sum = reduced.add_wrapping(rem_acc); + Some(sum) } } diff --git a/arrow/benches/aggregate_kernels.rs b/arrow/benches/aggregate_kernels.rs index c7b09f70f70..e159349adc2 100644 --- a/arrow/benches/aggregate_kernels.rs +++ b/arrow/benches/aggregate_kernels.rs @@ -29,6 +29,10 @@ fn bench_sum(arr_a: &Float32Array) { criterion::black_box(sum(arr_a).unwrap()); } +fn bench_sum_test(arr_a: &Float32Array) { + criterion::black_box(sum_test(arr_a).unwrap()); +} + fn bench_min(arr_a: &Float32Array) { criterion::black_box(min(arr_a).unwrap()); } @@ -45,12 +49,14 @@ fn add_benchmark(c: &mut Criterion) { let arr_a = create_primitive_array::(512, 0.0); c.bench_function("sum 512", |b| b.iter(|| bench_sum(&arr_a))); + c.bench_function("sum test 512", |b| b.iter(|| bench_sum_test(&arr_a))); c.bench_function("min 512", |b| b.iter(|| bench_min(&arr_a))); c.bench_function("max 512", |b| b.iter(|| bench_max(&arr_a))); let arr_a = create_primitive_array::(512, 0.5); c.bench_function("sum nulls 512", |b| b.iter(|| bench_sum(&arr_a))); + c.bench_function("sum nulls test 512", |b| b.iter(|| bench_sum_test(&arr_a))); c.bench_function("min nulls 512", |b| b.iter(|| bench_min(&arr_a))); c.bench_function("max nulls 512", |b| b.iter(|| bench_max(&arr_a))); From 4912ae3ed0d7ff584a7c68495d6a62b9f0373a8a Mon Sep 17 00:00:00 2001 From: Simon Vandel Sillesen Date: Sat, 22 Jul 2023 12:45:57 +0000 Subject: [PATCH 02/10] more --- arrow-arith/src/aggregate.rs | 23 ++++++++++++++++++++--- 1 file changed, 20 insertions(+), 3 deletions(-) diff --git a/arrow-arith/src/aggregate.rs b/arrow-arith/src/aggregate.rs index e09365d0125..348badced51 100644 --- a/arrow-arith/src/aggregate.rs +++ b/arrow-arith/src/aggregate.rs @@ -334,16 +334,33 @@ where // since the outer chunk size (64) is always a multiple of the number of lanes chunk.chunks_exact(LANES).for_each(|chunk| { let mut chunk: [T::Native; LANES] = chunk.try_into().unwrap(); + let mut set_zero = [false; LANES]; + // let mut zeros = [T::default_value(); LANES]; + + // blend + // fn generic_bit_blend(mask: T, y: T, n: T) -> T + // where + // T: Copy + BitXor + BitAnd, + // { + // n ^ ((n ^ y) & mask) + // } + // for i in 0..LANES { + // generic_bit_blend(mask, y, n) + // } + + for i in 0..LANES { + set_zero[i] = mask & index_mask == 0; + index_mask <<= 1; + } // mask_select for i in 0..LANES { - if mask & index_mask == 0 { + if set_zero[i] { chunk[i] = T::default_value(); } - index_mask <<= 1; } - // by now blended[i] will be 0 if null, or else the value + // by now chunk[i] will be 0 if null, or else the value for i in 0..LANES { chunk_acc[i] = chunk_acc[i].add_wrapping(chunk[i]); } From 51be1031d823e173d7798ea6c5043927a72a4f34 Mon Sep 17 00:00:00 2001 From: Simon Vandel Sillesen Date: Sat, 22 Jul 2023 13:13:36 +0000 Subject: [PATCH 03/10] simd sum --- arrow-arith/src/aggregate.rs | 36 ++++++------------------------ arrow/benches/aggregate_kernels.rs | 6 ----- 2 files changed, 7 insertions(+), 35 deletions(-) diff --git a/arrow-arith/src/aggregate.rs b/arrow-arith/src/aggregate.rs index 348badced51..1df5b49cd12 100644 --- a/arrow-arith/src/aggregate.rs +++ b/arrow-arith/src/aggregate.rs @@ -273,8 +273,8 @@ where /// /// This doesn't detect overflow. Once overflowing, the result will wrap around. /// For an overflow-checking variant, use `sum_checked` instead. -// #[cfg(not(feature = "simd"))] -pub fn sum_test(array: &PrimitiveArray) -> Option +#[cfg(not(feature = "simd"))] +pub fn sum(array: &PrimitiveArray) -> Option where T: ArrowNumericType, T::Native: ArrowNativeTypeOp, @@ -286,6 +286,7 @@ where } let data: &[T::Native] = array.values(); + // TODO choose lanes based on T::Native. Extract from simd module const LANES: usize = 16; let mut chunk_acc = [T::default_value(); LANES]; let mut rem_acc = T::default_value(); @@ -318,7 +319,6 @@ where Some(sum) } Some(nulls) => { - // let mut sum = T::default_value(); // process data in chunks of 64 elements since we also get 64 bits of validity information at a time let data_chunks = data.chunks_exact(64); let remainder = data_chunks.remainder(); @@ -327,43 +327,21 @@ where let remainder_bits = bit_chunks.remainder_bits(); data_chunks.zip(bit_chunks).for_each(|(chunk, mut mask)| { - // index_mask has value 1 << i in the loop - let mut index_mask = 1; // split chunks further into slices corresponding to the vector length // the compiler is able to unroll this inner loop and remove bounds checks // since the outer chunk size (64) is always a multiple of the number of lanes chunk.chunks_exact(LANES).for_each(|chunk| { let mut chunk: [T::Native; LANES] = chunk.try_into().unwrap(); - let mut set_zero = [false; LANES]; - // let mut zeros = [T::default_value(); LANES]; - - // blend - // fn generic_bit_blend(mask: T, y: T, n: T) -> T - // where - // T: Copy + BitXor + BitAnd, - // { - // n ^ ((n ^ y) & mask) - // } - // for i in 0..LANES { - // generic_bit_blend(mask, y, n) - // } - - for i in 0..LANES { - set_zero[i] = mask & index_mask == 0; - index_mask <<= 1; - } - // mask_select for i in 0..LANES { - if set_zero[i] { + if mask & (1 << i) == 0 { chunk[i] = T::default_value(); } - } - - // by now chunk[i] will be 0 if null, or else the value - for i in 0..LANES { chunk_acc[i] = chunk_acc[i].add_wrapping(chunk[i]); } + + // skip the shift and avoid overflow for u8 type, which uses 64 lanes. + mask >>= LANES % 64; }) }); diff --git a/arrow/benches/aggregate_kernels.rs b/arrow/benches/aggregate_kernels.rs index e159349adc2..c7b09f70f70 100644 --- a/arrow/benches/aggregate_kernels.rs +++ b/arrow/benches/aggregate_kernels.rs @@ -29,10 +29,6 @@ fn bench_sum(arr_a: &Float32Array) { criterion::black_box(sum(arr_a).unwrap()); } -fn bench_sum_test(arr_a: &Float32Array) { - criterion::black_box(sum_test(arr_a).unwrap()); -} - fn bench_min(arr_a: &Float32Array) { criterion::black_box(min(arr_a).unwrap()); } @@ -49,14 +45,12 @@ fn add_benchmark(c: &mut Criterion) { let arr_a = create_primitive_array::(512, 0.0); c.bench_function("sum 512", |b| b.iter(|| bench_sum(&arr_a))); - c.bench_function("sum test 512", |b| b.iter(|| bench_sum_test(&arr_a))); c.bench_function("min 512", |b| b.iter(|| bench_min(&arr_a))); c.bench_function("max 512", |b| b.iter(|| bench_max(&arr_a))); let arr_a = create_primitive_array::(512, 0.5); c.bench_function("sum nulls 512", |b| b.iter(|| bench_sum(&arr_a))); - c.bench_function("sum nulls test 512", |b| b.iter(|| bench_sum_test(&arr_a))); c.bench_function("min nulls 512", |b| b.iter(|| bench_min(&arr_a))); c.bench_function("max nulls 512", |b| b.iter(|| bench_max(&arr_a))); From 857fdd4209794ea3f02e3d310c4dd5b91f0b4930 Mon Sep 17 00:00:00 2001 From: Simon Vandel Sillesen Date: Sun, 23 Jul 2023 12:20:54 +0000 Subject: [PATCH 04/10] fix clippy --- arrow-arith/src/aggregate.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/arrow-arith/src/aggregate.rs b/arrow-arith/src/aggregate.rs index 1df5b49cd12..f654e866945 100644 --- a/arrow-arith/src/aggregate.rs +++ b/arrow-arith/src/aggregate.rs @@ -311,8 +311,8 @@ where }); let mut reduced = T::default_value(); - for i in 0..LANES { - reduced = reduced.add_wrapping(chunk_acc[i]); + for v in chunk_acc { + reduced = reduced.add_wrapping(v); } let sum = reduced.add_wrapping(rem_acc); @@ -352,8 +352,8 @@ where }); let mut reduced = T::default_value(); - for i in 0..LANES { - reduced = reduced.add_wrapping(chunk_acc[i]); + for v in chunk_acc { + reduced = reduced.add_wrapping(v); } let sum = reduced.add_wrapping(rem_acc); From 4bc686a3d0a986ac2ab1d068324027f06d63c548 Mon Sep 17 00:00:00 2001 From: Simon Vandel Sillesen Date: Tue, 25 Jul 2023 16:51:23 +0000 Subject: [PATCH 05/10] handle all number of lanes --- arrow-arith/src/aggregate.rs | 135 +++++++++++++++++++---------------- arrow-array/src/numeric.rs | 88 ++++++++++++++--------- 2 files changed, 128 insertions(+), 95 deletions(-) diff --git a/arrow-arith/src/aggregate.rs b/arrow-arith/src/aggregate.rs index f654e866945..7cb55228ead 100644 --- a/arrow-arith/src/aggregate.rs +++ b/arrow-arith/src/aggregate.rs @@ -285,81 +285,96 @@ where return None; } - let data: &[T::Native] = array.values(); - // TODO choose lanes based on T::Native. Extract from simd module - const LANES: usize = 16; - let mut chunk_acc = [T::default_value(); LANES]; - let mut rem_acc = T::default_value(); + pub fn sum_impl(array: &PrimitiveArray) -> Option + where + T: ArrowNumericType, + T::Native: ArrowNativeTypeOp, + { + let data: &[T::Native] = array.values(); + let mut chunk_acc = [T::default_value(); LANES]; + let mut rem_acc = T::default_value(); - match array.nulls() { - None => { - let data_chunks = data.chunks_exact(64); - let remainder = data_chunks.remainder(); + match array.nulls() { + None => { + let data_chunks = data.chunks_exact(64); + let remainder = data_chunks.remainder(); + + data_chunks.for_each(|chunk| { + chunk.chunks_exact(LANES).for_each(|chunk| { + let chunk: [T::Native; LANES] = chunk.try_into().unwrap(); - data_chunks.for_each(|chunk| { - chunk.chunks_exact(LANES).for_each(|chunk| { - let chunk: [T::Native; LANES] = chunk.try_into().unwrap(); + for i in 0..LANES { + chunk_acc[i] = chunk_acc[i].add_wrapping(chunk[i]); + } + }) + }); - for i in 0..LANES { - chunk_acc[i] = chunk_acc[i].add_wrapping(chunk[i]); - } - }) - }); + remainder.iter().copied().for_each(|value| { + rem_acc = rem_acc.add_wrapping(value); + }); - remainder.iter().copied().for_each(|value| { - rem_acc = rem_acc.add_wrapping(value); - }); + let mut reduced = T::default_value(); + for v in chunk_acc { + reduced = reduced.add_wrapping(v); + } + let sum = reduced.add_wrapping(rem_acc); - let mut reduced = T::default_value(); - for v in chunk_acc { - reduced = reduced.add_wrapping(v); + Some(sum) } - let sum = reduced.add_wrapping(rem_acc); + Some(nulls) => { + // process data in chunks of 64 elements since we also get 64 bits of validity information at a time + let data_chunks = data.chunks_exact(64); + let remainder = data_chunks.remainder(); - Some(sum) - } - Some(nulls) => { - // process data in chunks of 64 elements since we also get 64 bits of validity information at a time - let data_chunks = data.chunks_exact(64); - let remainder = data_chunks.remainder(); - - let bit_chunks = nulls.inner().bit_chunks(); - let remainder_bits = bit_chunks.remainder_bits(); - - data_chunks.zip(bit_chunks).for_each(|(chunk, mut mask)| { - // split chunks further into slices corresponding to the vector length - // the compiler is able to unroll this inner loop and remove bounds checks - // since the outer chunk size (64) is always a multiple of the number of lanes - chunk.chunks_exact(LANES).for_each(|chunk| { - let mut chunk: [T::Native; LANES] = chunk.try_into().unwrap(); - - for i in 0..LANES { - if mask & (1 << i) == 0 { - chunk[i] = T::default_value(); + let bit_chunks = nulls.inner().bit_chunks(); + let remainder_bits = bit_chunks.remainder_bits(); + + data_chunks.zip(bit_chunks).for_each(|(chunk, mut mask)| { + // split chunks further into slices corresponding to the vector length + // the compiler is able to unroll this inner loop and remove bounds checks + // since the outer chunk size (64) is always a multiple of the number of lanes + chunk.chunks_exact(LANES).for_each(|chunk| { + let mut chunk: [T::Native; LANES] = chunk.try_into().unwrap(); + + for i in 0..LANES { + if mask & (1 << i) == 0 { + chunk[i] = T::default_value(); + } + chunk_acc[i] = chunk_acc[i].add_wrapping(chunk[i]); } - chunk_acc[i] = chunk_acc[i].add_wrapping(chunk[i]); - } - // skip the shift and avoid overflow for u8 type, which uses 64 lanes. - mask >>= LANES % 64; - }) - }); + // skip the shift and avoid overflow for u8 type, which uses 64 lanes. + mask >>= LANES % 64; + }) + }); + + remainder.iter().enumerate().for_each(|(i, value)| { + if remainder_bits & (1 << i) != 0 { + rem_acc = rem_acc.add_wrapping(*value); + } + }); - remainder.iter().enumerate().for_each(|(i, value)| { - if remainder_bits & (1 << i) != 0 { - rem_acc = rem_acc.add_wrapping(*value); + let mut reduced = T::default_value(); + for v in chunk_acc { + reduced = reduced.add_wrapping(v); } - }); + let sum = reduced.add_wrapping(rem_acc); - let mut reduced = T::default_value(); - for v in chunk_acc { - reduced = reduced.add_wrapping(v); + Some(sum) } - let sum = reduced.add_wrapping(rem_acc); - - Some(sum) } } + + match T::lanes() { + 1 => sum_impl::(array), + 2 => sum_impl::(array), + 4 => sum_impl::(array), + 8 => sum_impl::(array), + 16 => sum_impl::(array), + 32 => sum_impl::(array), + 64 => sum_impl::(array), + unhandled => unreachable!("Unhandled number of lanes: {unhandled}"), + } } macro_rules! bit_operation { diff --git a/arrow-array/src/numeric.rs b/arrow-array/src/numeric.rs index afc0e2c3301..65b431899de 100644 --- a/arrow-array/src/numeric.rs +++ b/arrow-array/src/numeric.rs @@ -113,10 +113,13 @@ where /// A subtype of primitive type that represents numeric values. #[cfg(not(feature = "simd"))] -pub trait ArrowNumericType: ArrowPrimitiveType {} +pub trait ArrowNumericType: ArrowPrimitiveType { + /// The number of SIMD lanes available + fn lanes() -> usize; +} macro_rules! make_numeric_type { - ($impl_ty:ty, $native_ty:ty, $simd_ty:ident, $simd_mask_ty:ident) => { + ($impl_ty:ty, $native_ty:ty, $simd_ty:ident, $simd_mask_ty:ident, $lanes:expr) => { #[cfg(feature = "simd")] impl ArrowNumericType for $impl_ty { type Simd = $simd_ty; @@ -125,7 +128,7 @@ macro_rules! make_numeric_type { #[inline] fn lanes() -> usize { - Self::Simd::lanes() + $lanes } #[inline] @@ -336,42 +339,52 @@ macro_rules! make_numeric_type { } #[cfg(not(feature = "simd"))] - impl ArrowNumericType for $impl_ty {} + impl ArrowNumericType for $impl_ty { + #[inline] + fn lanes() -> usize { + $lanes + } + } }; } -make_numeric_type!(Int8Type, i8, i8x64, m8x64); -make_numeric_type!(Int16Type, i16, i16x32, m16x32); -make_numeric_type!(Int32Type, i32, i32x16, m32x16); -make_numeric_type!(Int64Type, i64, i64x8, m64x8); -make_numeric_type!(UInt8Type, u8, u8x64, m8x64); -make_numeric_type!(UInt16Type, u16, u16x32, m16x32); -make_numeric_type!(UInt32Type, u32, u32x16, m32x16); -make_numeric_type!(UInt64Type, u64, u64x8, m64x8); -make_numeric_type!(Float32Type, f32, f32x16, m32x16); -make_numeric_type!(Float64Type, f64, f64x8, m64x8); - -make_numeric_type!(TimestampSecondType, i64, i64x8, m64x8); -make_numeric_type!(TimestampMillisecondType, i64, i64x8, m64x8); -make_numeric_type!(TimestampMicrosecondType, i64, i64x8, m64x8); -make_numeric_type!(TimestampNanosecondType, i64, i64x8, m64x8); -make_numeric_type!(Date32Type, i32, i32x16, m32x16); -make_numeric_type!(Date64Type, i64, i64x8, m64x8); -make_numeric_type!(Time32SecondType, i32, i32x16, m32x16); -make_numeric_type!(Time32MillisecondType, i32, i32x16, m32x16); -make_numeric_type!(Time64MicrosecondType, i64, i64x8, m64x8); -make_numeric_type!(Time64NanosecondType, i64, i64x8, m64x8); -make_numeric_type!(IntervalYearMonthType, i32, i32x16, m32x16); -make_numeric_type!(IntervalDayTimeType, i64, i64x8, m64x8); -make_numeric_type!(IntervalMonthDayNanoType, i128, i128x4, m128x4); -make_numeric_type!(DurationSecondType, i64, i64x8, m64x8); -make_numeric_type!(DurationMillisecondType, i64, i64x8, m64x8); -make_numeric_type!(DurationMicrosecondType, i64, i64x8, m64x8); -make_numeric_type!(DurationNanosecondType, i64, i64x8, m64x8); -make_numeric_type!(Decimal128Type, i128, i128x4, m128x4); +make_numeric_type!(Int8Type, i8, i8x64, m8x64, 64); +make_numeric_type!(Int16Type, i16, i16x32, m16x32, 32); +make_numeric_type!(Int32Type, i32, i32x16, m32x16, 16); +make_numeric_type!(Int64Type, i64, i64x8, m64x8, 8); +make_numeric_type!(UInt8Type, u8, u8x64, m8x64, 64); +make_numeric_type!(UInt16Type, u16, u16x32, m16x32, 32); +make_numeric_type!(UInt32Type, u32, u32x16, m32x16, 16); +make_numeric_type!(UInt64Type, u64, u64x8, m64x8, 8); +make_numeric_type!(Float32Type, f32, f32x16, m32x16, 16); +make_numeric_type!(Float64Type, f64, f64x8, m64x8, 8); + +make_numeric_type!(TimestampSecondType, i64, i64x8, m64x8, 8); +make_numeric_type!(TimestampMillisecondType, i64, i64x8, m64x8, 8); +make_numeric_type!(TimestampMicrosecondType, i64, i64x8, m64x8, 8); +make_numeric_type!(TimestampNanosecondType, i64, i64x8, m64x8, 8); +make_numeric_type!(Date32Type, i32, i32x16, m32x16, 16); +make_numeric_type!(Date64Type, i64, i64x8, m64x8, 8); +make_numeric_type!(Time32SecondType, i32, i32x16, m32x16, 16); +make_numeric_type!(Time32MillisecondType, i32, i32x16, m32x16, 16); +make_numeric_type!(Time64MicrosecondType, i64, i64x8, m64x8, 8); +make_numeric_type!(Time64NanosecondType, i64, i64x8, m64x8, 8); +make_numeric_type!(IntervalYearMonthType, i32, i32x16, m32x16, 16); +make_numeric_type!(IntervalDayTimeType, i64, i64x8, m64x8, 8); +make_numeric_type!(IntervalMonthDayNanoType, i128, i128x4, m128x4, 4); +make_numeric_type!(DurationSecondType, i64, i64x8, m64x8, 8); +make_numeric_type!(DurationMillisecondType, i64, i64x8, m64x8, 8); +make_numeric_type!(DurationMicrosecondType, i64, i64x8, m64x8, 8); +make_numeric_type!(DurationNanosecondType, i64, i64x8, m64x8, 8); +make_numeric_type!(Decimal128Type, i128, i128x4, m128x4, 4); #[cfg(not(feature = "simd"))] -impl ArrowNumericType for Float16Type {} +impl ArrowNumericType for Float16Type { + #[inline] + fn lanes() -> usize { + Float32Type::lanes() + } +} #[cfg(feature = "simd")] impl ArrowNumericType for Float16Type { @@ -467,7 +480,12 @@ impl ArrowNumericType for Float16Type { } #[cfg(not(feature = "simd"))] -impl ArrowNumericType for Decimal256Type {} +impl ArrowNumericType for Decimal256Type { + #[inline] + fn lanes() -> usize { + 1 + } +} #[cfg(feature = "simd")] impl ArrowNumericType for Decimal256Type { From f472f3fbd0a9ab76903071b70696789cfefbe341 Mon Sep 17 00:00:00 2001 From: Simon Vandel Sillesen Date: Tue, 25 Jul 2023 17:18:51 +0000 Subject: [PATCH 06/10] expand benchmarks for min, max, sum --- arrow/benches/aggregate_kernels.rs | 47 ++++++++++++++++++++++-------- 1 file changed, 35 insertions(+), 12 deletions(-) diff --git a/arrow/benches/aggregate_kernels.rs b/arrow/benches/aggregate_kernels.rs index c7b09f70f70..3b703030c81 100644 --- a/arrow/benches/aggregate_kernels.rs +++ b/arrow/benches/aggregate_kernels.rs @@ -17,6 +17,8 @@ #[macro_use] extern crate criterion; +use arrow_array::types::{TimestampMillisecondType, UInt8Type}; +use arrow_array::ArrowNumericType; use criterion::Criterion; extern crate arrow; @@ -24,16 +26,18 @@ extern crate arrow; use arrow::compute::kernels::aggregate::*; use arrow::util::bench_util::*; use arrow::{array::*, datatypes::Float32Type}; +use rand::distributions::Standard; +use rand::prelude::Distribution; -fn bench_sum(arr_a: &Float32Array) { +fn bench_sum(arr_a: &PrimitiveArray) { criterion::black_box(sum(arr_a).unwrap()); } -fn bench_min(arr_a: &Float32Array) { +fn bench_min(arr_a: &PrimitiveArray) { criterion::black_box(min(arr_a).unwrap()); } -fn bench_max(arr_a: &Float32Array) { +fn bench_max(arr_a: &PrimitiveArray) { criterion::black_box(max(arr_a).unwrap()); } @@ -41,18 +45,37 @@ fn bench_min_string(arr_a: &StringArray) { criterion::black_box(min_string(arr_a).unwrap()); } -fn add_benchmark(c: &mut Criterion) { - let arr_a = create_primitive_array::(512, 0.0); +fn sum_min_max_bench( + c: &mut Criterion, + size: usize, + null_density: f32, + description: &str, +) where + T: ArrowNumericType, + Standard: Distribution, +{ + let arr_a = create_primitive_array::(size, null_density); - c.bench_function("sum 512", |b| b.iter(|| bench_sum(&arr_a))); - c.bench_function("min 512", |b| b.iter(|| bench_min(&arr_a))); - c.bench_function("max 512", |b| b.iter(|| bench_max(&arr_a))); + c.bench_function(&format!("sum {size} {description}"), |b| { + b.iter(|| bench_sum(&arr_a)) + }); + c.bench_function(&format!("min {size} {description}"), |b| { + b.iter(|| bench_min(&arr_a)) + }); + c.bench_function(&format!("max {size} {description}"), |b| { + b.iter(|| bench_max(&arr_a)) + }); +} + +fn add_benchmark(c: &mut Criterion) { + sum_min_max_bench::(c, 512, 0.0, "u8 no nulls"); + sum_min_max_bench::(c, 512, 0.5, "u8 50% nulls"); - let arr_a = create_primitive_array::(512, 0.5); + sum_min_max_bench::(c, 512, 0.0, "ts_millis no nulls"); + sum_min_max_bench::(c, 512, 0.5, "ts_millis 50% nulls"); - c.bench_function("sum nulls 512", |b| b.iter(|| bench_sum(&arr_a))); - c.bench_function("min nulls 512", |b| b.iter(|| bench_min(&arr_a))); - c.bench_function("max nulls 512", |b| b.iter(|| bench_max(&arr_a))); + sum_min_max_bench::(c, 512, 0.0, "f32 no nulls"); + sum_min_max_bench::(c, 512, 0.5, "f32 50% nulls"); let arr_b = create_string_array::(512, 0.0); c.bench_function("min string 512", |b| b.iter(|| bench_min_string(&arr_b))); From be004924c21da69340d579932168b500d36a38b9 Mon Sep 17 00:00:00 2001 From: Simon Vandel Sillesen Date: Sat, 29 Jul 2023 18:07:17 +0000 Subject: [PATCH 07/10] don't change code under simd feature flag --- arrow-array/src/numeric.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/arrow-array/src/numeric.rs b/arrow-array/src/numeric.rs index 65b431899de..68ecb74bc86 100644 --- a/arrow-array/src/numeric.rs +++ b/arrow-array/src/numeric.rs @@ -128,7 +128,7 @@ macro_rules! make_numeric_type { #[inline] fn lanes() -> usize { - $lanes + Self::Simd::lanes() } #[inline] From 94f7b1806605470f7930de56723ffe547819e244 Mon Sep 17 00:00:00 2001 From: Simon Vandel Sillesen Date: Sun, 30 Jul 2023 17:29:37 +0000 Subject: [PATCH 08/10] separate impl for floating and integers --- arrow-arith/src/aggregate.rs | 119 ++++++++++++++++++++++++----- arrow/benches/aggregate_kernels.rs | 16 +++- 2 files changed, 117 insertions(+), 18 deletions(-) diff --git a/arrow-arith/src/aggregate.rs b/arrow-arith/src/aggregate.rs index 7cb55228ead..4ce271e6fbc 100644 --- a/arrow-arith/src/aggregate.rs +++ b/arrow-arith/src/aggregate.rs @@ -285,7 +285,56 @@ where return None; } - pub fn sum_impl(array: &PrimitiveArray) -> Option + pub fn sum_impl_integer(array: &PrimitiveArray) -> Option + where + T: ArrowNumericType, + T::Native: ArrowNativeTypeOp, + { + let data: &[T::Native] = array.values(); + + match array.nulls() { + None => { + let sum = data.iter().fold(T::default_value(), |accumulator, value| { + accumulator.add_wrapping(*value) + }); + + Some(sum) + } + Some(nulls) => { + let mut sum = T::default_value(); + let data_chunks = data.chunks_exact(64); + let remainder = data_chunks.remainder(); + + let bit_chunks = nulls.inner().bit_chunks(); + data_chunks + .zip(bit_chunks.iter()) + .for_each(|(chunk, mask)| { + // index_mask has value 1 << i in the loop + let mut index_mask = 1; + chunk.iter().for_each(|value| { + if (mask & index_mask) != 0 { + sum = sum.add_wrapping(*value); + } + index_mask <<= 1; + }); + }); + + let remainder_bits = bit_chunks.remainder_bits(); + + remainder.iter().enumerate().for_each(|(i, value)| { + if remainder_bits & (1 << i) != 0 { + sum = sum.add_wrapping(*value); + } + }); + + Some(sum) + } + } + } + + pub fn sum_impl_floating( + array: &PrimitiveArray, + ) -> Option where T: ArrowNumericType, T::Native: ArrowNativeTypeOp, @@ -296,17 +345,15 @@ where match array.nulls() { None => { - let data_chunks = data.chunks_exact(64); + let data_chunks = data.chunks_exact(LANES); let remainder = data_chunks.remainder(); data_chunks.for_each(|chunk| { - chunk.chunks_exact(LANES).for_each(|chunk| { - let chunk: [T::Native; LANES] = chunk.try_into().unwrap(); + let chunk: [T::Native; LANES] = chunk.try_into().unwrap(); - for i in 0..LANES { - chunk_acc[i] = chunk_acc[i].add_wrapping(chunk[i]); - } - }) + for i in 0..LANES { + chunk_acc[i] = chunk_acc[i].add_wrapping(chunk[i]); + } }); remainder.iter().copied().for_each(|value| { @@ -365,15 +412,53 @@ where } } - match T::lanes() { - 1 => sum_impl::(array), - 2 => sum_impl::(array), - 4 => sum_impl::(array), - 8 => sum_impl::(array), - 16 => sum_impl::(array), - 32 => sum_impl::(array), - 64 => sum_impl::(array), - unhandled => unreachable!("Unhandled number of lanes: {unhandled}"), + match T::DATA_TYPE { + DataType::Timestamp(_, _) + | DataType::Time32(_) + | DataType::Time64(_) + | DataType::Date32 + | DataType::Date64 + | DataType::Duration(_) + | DataType::Interval(_) + | DataType::Int8 + | DataType::Int16 + | DataType::Int32 + | DataType::Int64 + | DataType::UInt8 + | DataType::UInt16 + | DataType::UInt32 + | DataType::UInt64 => sum_impl_integer(array), + DataType::Float16 + | DataType::Float32 + | DataType::Float64 + | DataType::Decimal128(_, _) + | DataType::Decimal256(_, _) => match T::lanes() { + 1 => sum_impl_floating::(array), + 2 => sum_impl_floating::(array), + 4 => sum_impl_floating::(array), + 8 => sum_impl_floating::(array), + 16 => sum_impl_floating::(array), + 32 => sum_impl_floating::(array), + 64 => sum_impl_floating::(array), + unhandled => unreachable!("Unhandled number of lanes: {unhandled}"), + }, + DataType::Null + | DataType::Boolean + | DataType::Binary + | DataType::FixedSizeBinary(_) + | DataType::LargeBinary + | DataType::Utf8 + | DataType::LargeUtf8 + | DataType::List(_) + | DataType::FixedSizeList(_, _) + | DataType::LargeList(_) + | DataType::Struct(_) + | DataType::Union(_, _) + | DataType::Dictionary(_, _) + | DataType::Map(_, _) + | DataType::RunEndEncoded(_, _) => { + unreachable!("Unsupported data type: {:?}", T::DATA_TYPE) + } } } diff --git a/arrow/benches/aggregate_kernels.rs b/arrow/benches/aggregate_kernels.rs index 3b703030c81..7536be2365e 100644 --- a/arrow/benches/aggregate_kernels.rs +++ b/arrow/benches/aggregate_kernels.rs @@ -17,7 +17,9 @@ #[macro_use] extern crate criterion; -use arrow_array::types::{TimestampMillisecondType, UInt8Type}; +use arrow_array::types::{ + Float64Type, TimestampMillisecondType, UInt16Type, UInt32Type, UInt64Type, UInt8Type, +}; use arrow_array::ArrowNumericType; use criterion::Criterion; @@ -71,12 +73,24 @@ fn add_benchmark(c: &mut Criterion) { sum_min_max_bench::(c, 512, 0.0, "u8 no nulls"); sum_min_max_bench::(c, 512, 0.5, "u8 50% nulls"); + sum_min_max_bench::(c, 512, 0.0, "u16 no nulls"); + sum_min_max_bench::(c, 512, 0.5, "u16 50% nulls"); + + sum_min_max_bench::(c, 512, 0.0, "u32 no nulls"); + sum_min_max_bench::(c, 512, 0.5, "u32 50% nulls"); + + sum_min_max_bench::(c, 512, 0.0, "u64 no nulls"); + sum_min_max_bench::(c, 512, 0.5, "u64 50% nulls"); + sum_min_max_bench::(c, 512, 0.0, "ts_millis no nulls"); sum_min_max_bench::(c, 512, 0.5, "ts_millis 50% nulls"); sum_min_max_bench::(c, 512, 0.0, "f32 no nulls"); sum_min_max_bench::(c, 512, 0.5, "f32 50% nulls"); + sum_min_max_bench::(c, 512, 0.0, "f64 no nulls"); + sum_min_max_bench::(c, 512, 0.5, "f64 50% nulls"); + let arr_b = create_string_array::(512, 0.0); c.bench_function("min string 512", |b| b.iter(|| bench_min_string(&arr_b))); From 181c6da0e85b6f8041bbe643fe84ceefaa3bdb4d Mon Sep 17 00:00:00 2001 From: Simon Vandel Sillesen Date: Sun, 30 Jul 2023 17:31:11 +0000 Subject: [PATCH 09/10] make not pub --- arrow-arith/src/aggregate.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/arrow-arith/src/aggregate.rs b/arrow-arith/src/aggregate.rs index 4ce271e6fbc..04c99fe6797 100644 --- a/arrow-arith/src/aggregate.rs +++ b/arrow-arith/src/aggregate.rs @@ -285,7 +285,7 @@ where return None; } - pub fn sum_impl_integer(array: &PrimitiveArray) -> Option + fn sum_impl_integer(array: &PrimitiveArray) -> Option where T: ArrowNumericType, T::Native: ArrowNativeTypeOp, @@ -332,7 +332,7 @@ where } } - pub fn sum_impl_floating( + fn sum_impl_floating( array: &PrimitiveArray, ) -> Option where From 0a03c833074c3d4a818769d4f347cae93283de4c Mon Sep 17 00:00:00 2001 From: Simon Vandel Sillesen Date: Sun, 30 Jul 2023 17:33:54 +0000 Subject: [PATCH 10/10] remove unneeded mod --- arrow-arith/src/aggregate.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/arrow-arith/src/aggregate.rs b/arrow-arith/src/aggregate.rs index 04c99fe6797..e6314c973f4 100644 --- a/arrow-arith/src/aggregate.rs +++ b/arrow-arith/src/aggregate.rs @@ -390,8 +390,7 @@ where chunk_acc[i] = chunk_acc[i].add_wrapping(chunk[i]); } - // skip the shift and avoid overflow for u8 type, which uses 64 lanes. - mask >>= LANES % 64; + mask >>= LANES; }) });