From d5f4a479990ccd3e4fbde758b84c87db19c1b28e Mon Sep 17 00:00:00 2001 From: Fabian Murariu Date: Sat, 30 Mar 2024 16:40:15 +0000 Subject: [PATCH 1/5] initial version of filter for run end array with i64 run_ends --- arrow-select/src/filter.rs | 113 ++++++++++++++++++++++++++++++++++++- 1 file changed, 111 insertions(+), 2 deletions(-) diff --git a/arrow-select/src/filter.rs b/arrow-select/src/filter.rs index c365d0b841b..276ded011f9 100644 --- a/arrow-select/src/filter.rs +++ b/arrow-select/src/filter.rs @@ -21,9 +21,9 @@ use std::sync::Arc; use arrow_array::builder::BooleanBufferBuilder; use arrow_array::cast::AsArray; -use arrow_array::types::{ArrowDictionaryKeyType, ByteArrayType}; +use arrow_array::types::{ArrowDictionaryKeyType, ByteArrayType, Int64Type}; use arrow_array::*; -use arrow_buffer::{bit_util, BooleanBuffer, NullBuffer}; +use arrow_buffer::{bit_util, BooleanBuffer, NullBuffer, RunEndBuffer}; use arrow_buffer::{Buffer, MutableBuffer}; use arrow_data::bit_iterator::{BitIndexIterator, BitSliceIterator}; use arrow_data::transform::MutableArrayData; @@ -336,6 +336,13 @@ fn filter_array(values: &dyn Array, predicate: &FilterPredicate) -> Result { Ok(Arc::new(filter_bytes(values.as_binary::(), predicate))) } + DataType::RunEndEncoded(_, _) => { + if let Some(ree_array) = values.as_any().downcast_ref::>() { + Ok(Arc::new(filter_run_end_array(ree_array, predicate)?)) + } else { + unimplemented!("Filter not supported for RunEndEncoded type {:?}", values.data_type()) + } + } DataType::Dictionary(_, _) => downcast_dictionary_array! { values => Ok(Arc::new(filter_dict(values, predicate))), t => unimplemented!("Filter not supported for dictionary type {:?}", t) @@ -368,6 +375,42 @@ fn filter_array(values: &dyn Array, predicate: &FilterPredicate) -> Result, + pred: &FilterPredicate, +) -> Result, ArrowError> { + let run_ends: &RunEndBuffer = re_arr.run_ends(); + let mut values_filter = BooleanBufferBuilder::new(run_ends.len()); + let mut new_run_ends = vec![0i64; run_ends.len()]; + + let mut start = 0i64; + let mut i = 0; + let filter_values = pred.filter.values(); + let mut count = 0; + for end in run_ends.inner() { + let mut keep = false; + for pred in (start..*end).map(|i| unsafe { filter_values.value_unchecked(i as usize) }) { + count += pred as i64; + keep |= pred + } + new_run_ends[i] = count; + i += keep as usize; + values_filter.append(keep); + start = *end; + } + + new_run_ends.truncate(i); + + if values_filter.is_empty() { + new_run_ends.clear(); + } + + let values = re_arr.values(); + let pred = BooleanArray::new(values_filter.finish(), None); + let new_values = filter(&values, &pred)?; + RunArray::try_new(&PrimitiveArray::from(new_run_ends), &new_values) +} + /// Computes a new null mask for `data` based on `predicate` /// /// If the predicate selected no null-rows, returns `None`, otherwise returns @@ -844,6 +887,72 @@ mod tests { assert_eq!(9, d.value(1)); } + #[test] + fn test_filter_run_end_encoding_array() { + let run_ends = Int64Array::from(vec![2, 3, 8]); + let values = Int64Array::from(vec![7, -2, 9]); + let a = RunArray::try_new(&run_ends, &values).expect("Failed to create RunArray"); + let b = BooleanArray::from(vec![true, false, true, false, true, false, true, false]); + let c = filter(&a, &b).unwrap(); + let actual = c + .as_ref() + .as_any() + .downcast_ref::>() + .unwrap(); + assert_eq!(4, actual.len()); + + let expected = RunArray::try_new( + &Int64Array::from(vec![1, 2, 4]), + &Int64Array::from(vec![7, -2, 9]), + ) + .expect("Failed to make expected RunArray test is broken"); + + assert_eq!(&actual.run_ends().values(), &expected.run_ends().values()); + assert_eq!(actual.values(), expected.values()) + } + + #[test] + fn test_filter_run_end_encoding_array_remove_value() { + let run_ends = Int64Array::from(vec![2, 3, 8, 10]); + let values = Int64Array::from(vec![7, -2, 9, -8]); + let a = RunArray::try_new(&run_ends, &values).expect("Failed to create RunArray"); + let b = BooleanArray::from(vec![ + false, true, false, false, true, false, true, false, false, false, + ]); + let c = filter(&a, &b).unwrap(); + let actual = c + .as_ref() + .as_any() + .downcast_ref::>() + .unwrap(); + assert_eq!(3, actual.len()); + + let expected = + RunArray::try_new(&Int64Array::from(vec![1, 3]), &Int64Array::from(vec![7, 9])) + .expect("Failed to make expected RunArray test is broken"); + + assert_eq!(&actual.run_ends().values(), &expected.run_ends().values()); + assert_eq!(actual.values(), expected.values()) + } + + #[test] + fn test_filter_run_end_encoding_array_empty() { + let run_ends = Int64Array::from(vec![2, 3, 8, 10]); + let values = Int64Array::from(vec![7, -2, 9, -8]); + let a = RunArray::try_new(&run_ends, &values).expect("Failed to create RunArray"); + let b = BooleanArray::from(vec![ + false, false, false, false, false, false, false, false, false, false, + ]); + let c = filter(&a, &b).unwrap(); + let actual = c + .as_ref() + .as_any() + .downcast_ref::>() + .unwrap(); + + assert_eq!(0, actual.len()); + } + #[test] fn test_filter_dictionary_array() { let values = [Some("hello"), None, Some("world"), Some("!")]; From 0751ad980ad4242fe00dfe7293fc97ed5e098257 Mon Sep 17 00:00:00 2001 From: Fabian Murariu Date: Sat, 30 Mar 2024 17:56:16 +0000 Subject: [PATCH 2/5] add comment on filter_run_end_array --- arrow-select/src/filter.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/arrow-select/src/filter.rs b/arrow-select/src/filter.rs index 276ded011f9..1802b249712 100644 --- a/arrow-select/src/filter.rs +++ b/arrow-select/src/filter.rs @@ -375,6 +375,7 @@ fn filter_array(values: &dyn Array, predicate: &FilterPredicate) -> Result, pred: &FilterPredicate, From 34cf37f387e9722a8fe4cb5b6457e05d7299c084 Mon Sep 17 00:00:00 2001 From: Fabian Murariu Date: Sun, 31 Mar 2024 15:12:18 +0100 Subject: [PATCH 3/5] add notes for value_unchecked and use as_run_array in tests --- arrow-select/src/filter.rs | 26 ++++++++++---------------- 1 file changed, 10 insertions(+), 16 deletions(-) diff --git a/arrow-select/src/filter.rs b/arrow-select/src/filter.rs index 1802b249712..9ec848d9ee4 100644 --- a/arrow-select/src/filter.rs +++ b/arrow-select/src/filter.rs @@ -388,14 +388,20 @@ fn filter_run_end_array( let mut i = 0; let filter_values = pred.filter.values(); let mut count = 0; + for end in run_ends.inner() { let mut keep = false; + // in filter_array the predicate array is checked to have the same len as the run end array + // this means the largest value in the run_ends is == to pred.len() + // so we're always within bounds when calling value_unchecked for pred in (start..*end).map(|i| unsafe { filter_values.value_unchecked(i as usize) }) { count += pred as i64; keep |= pred } + // this is to avoid branching new_run_ends[i] = count; i += keep as usize; + values_filter.append(keep); start = *end; } @@ -679,6 +685,7 @@ where #[cfg(test)] mod tests { use arrow_array::builder::*; + use arrow_array::cast::as_run_array; use arrow_array::types::*; use rand::distributions::{Alphanumeric, Standard}; use rand::prelude::*; @@ -895,11 +902,7 @@ mod tests { let a = RunArray::try_new(&run_ends, &values).expect("Failed to create RunArray"); let b = BooleanArray::from(vec![true, false, true, false, true, false, true, false]); let c = filter(&a, &b).unwrap(); - let actual = c - .as_ref() - .as_any() - .downcast_ref::>() - .unwrap(); + let actual: &RunArray = as_run_array(&c); assert_eq!(4, actual.len()); let expected = RunArray::try_new( @@ -921,11 +924,7 @@ mod tests { false, true, false, false, true, false, true, false, false, false, ]); let c = filter(&a, &b).unwrap(); - let actual = c - .as_ref() - .as_any() - .downcast_ref::>() - .unwrap(); + let actual: &RunArray = as_run_array(&c); assert_eq!(3, actual.len()); let expected = @@ -945,12 +944,7 @@ mod tests { false, false, false, false, false, false, false, false, false, false, ]); let c = filter(&a, &b).unwrap(); - let actual = c - .as_ref() - .as_any() - .downcast_ref::>() - .unwrap(); - + let actual: &RunArray = as_run_array(&c); assert_eq!(0, actual.len()); } From 540e5488fa483e08a6c2e4fd8e9ad304ecb0a93b Mon Sep 17 00:00:00 2001 From: Fabian Murariu Date: Sun, 31 Mar 2024 16:03:12 +0100 Subject: [PATCH 4/5] made RunArray filtering generic on all run_ends types --- arrow-select/src/filter.rs | 101 +++++++++++++++++++++++++++++-------- 1 file changed, 80 insertions(+), 21 deletions(-) diff --git a/arrow-select/src/filter.rs b/arrow-select/src/filter.rs index 9ec848d9ee4..b0e25a93842 100644 --- a/arrow-select/src/filter.rs +++ b/arrow-select/src/filter.rs @@ -17,11 +17,15 @@ //! Defines filter kernels +use std::ops::AddAssign; use std::sync::Arc; use arrow_array::builder::BooleanBufferBuilder; use arrow_array::cast::AsArray; -use arrow_array::types::{ArrowDictionaryKeyType, ByteArrayType, Int64Type}; +use arrow_array::types::{ + ArrowDictionaryKeyType, ArrowPrimitiveType, ByteArrayType, Int16Type, Int32Type, Int64Type, + RunEndIndexType, +}; use arrow_array::*; use arrow_buffer::{bit_util, BooleanBuffer, NullBuffer, RunEndBuffer}; use arrow_buffer::{Buffer, MutableBuffer}; @@ -337,10 +341,9 @@ fn filter_array(values: &dyn Array, predicate: &FilterPredicate) -> Result(), predicate))) } DataType::RunEndEncoded(_, _) => { - if let Some(ree_array) = values.as_any().downcast_ref::>() { - Ok(Arc::new(filter_run_end_array(ree_array, predicate)?)) - } else { - unimplemented!("Filter not supported for RunEndEncoded type {:?}", values.data_type()) + downcast_run_array!{ + values => filter_run_end_array(values, predicate), + t => unimplemented!("Filter not supported for RunEndEncoded type {:?}", t) } } DataType::Dictionary(_, _) => downcast_dictionary_array! { @@ -375,27 +378,64 @@ fn filter_array(values: &dyn Array, predicate: &FilterPredicate) -> Result, +fn filter_run_end_array( + ree_arr: &RunArray, pred: &FilterPredicate, -) -> Result, ArrowError> { - let run_ends: &RunEndBuffer = re_arr.run_ends(); +) -> Result { + fn downcast_safe( + re_arr: &RunArray, + ) -> Option<&RunArray> { + re_arr.as_any().downcast_ref::>() + } + + if let Some(ree_array) = downcast_safe::(ree_arr) { + let (run_ends, values) = filter_run_end_array_generic(ree_array, pred)?; + let ree_arr: RunArray = + RunArray::try_new(&PrimitiveArray::from(run_ends), &values)?; + Ok(Arc::new(ree_arr)) + } else if let Some(ree_array) = downcast_safe::(ree_arr) { + let (run_ends, values) = filter_run_end_array_generic(ree_array, pred)?; + let ree_arr: RunArray = + RunArray::try_new(&PrimitiveArray::from(run_ends), &values)?; + Ok(Arc::new(ree_arr)) + } else if let Some(ree_array) = downcast_safe::(ree_arr) { + let (run_ends, values) = filter_run_end_array_generic(ree_array, pred)?; + let ree_arr: RunArray = + RunArray::try_new(&PrimitiveArray::from(run_ends), &values)?; + Ok(Arc::new(ree_arr)) + } else { + Err(ArrowError::CastError( + "Run ends for RunArray must be i16 or i32 or i64".to_string(), + )) + } +} + +/// Filter any supported [`RunArray`] based on a [`FilterPredicate`] +#[allow(clippy::type_complexity)] +fn filter_run_end_array_generic( + re_arr: &RunArray, + pred: &FilterPredicate, +) -> Result<(Vec, Arc), ArrowError> +where + R::Native: Into + From, + R::Native: AddAssign, +{ + let run_ends: &RunEndBuffer = re_arr.run_ends(); let mut values_filter = BooleanBufferBuilder::new(run_ends.len()); - let mut new_run_ends = vec![0i64; run_ends.len()]; + let mut new_run_ends = vec![R::default_value(); run_ends.len()]; let mut start = 0i64; let mut i = 0; let filter_values = pred.filter.values(); - let mut count = 0; + let mut count = R::default_value(); - for end in run_ends.inner() { + for end in run_ends.inner().into_iter().map(|i| (*i).into()) { let mut keep = false; // in filter_array the predicate array is checked to have the same len as the run end array // this means the largest value in the run_ends is == to pred.len() // so we're always within bounds when calling value_unchecked - for pred in (start..*end).map(|i| unsafe { filter_values.value_unchecked(i as usize) }) { - count += pred as i64; + for pred in (start..end).map(|i| unsafe { filter_values.value_unchecked(i as usize) }) { + count += R::Native::from(pred); keep |= pred } // this is to avoid branching @@ -403,7 +443,7 @@ fn filter_run_end_array( i += keep as usize; values_filter.append(keep); - start = *end; + start = end; } new_run_ends.truncate(i); @@ -415,7 +455,7 @@ fn filter_run_end_array( let values = re_arr.values(); let pred = BooleanArray::new(values_filter.finish(), None); let new_values = filter(&values, &pred)?; - RunArray::try_new(&PrimitiveArray::from(new_run_ends), &new_values) + Ok((new_run_ends, new_values)) } /// Computes a new null mask for `data` based on `predicate` @@ -917,24 +957,43 @@ mod tests { #[test] fn test_filter_run_end_encoding_array_remove_value() { - let run_ends = Int64Array::from(vec![2, 3, 8, 10]); - let values = Int64Array::from(vec![7, -2, 9, -8]); + let run_ends = Int32Array::from(vec![2, 3, 8, 10]); + let values = Int32Array::from(vec![7, -2, 9, -8]); let a = RunArray::try_new(&run_ends, &values).expect("Failed to create RunArray"); let b = BooleanArray::from(vec![ false, true, false, false, true, false, true, false, false, false, ]); let c = filter(&a, &b).unwrap(); - let actual: &RunArray = as_run_array(&c); + let actual: &RunArray = as_run_array(&c); assert_eq!(3, actual.len()); let expected = - RunArray::try_new(&Int64Array::from(vec![1, 3]), &Int64Array::from(vec![7, 9])) + RunArray::try_new(&Int32Array::from(vec![1, 3]), &Int32Array::from(vec![7, 9])) .expect("Failed to make expected RunArray test is broken"); assert_eq!(&actual.run_ends().values(), &expected.run_ends().values()); assert_eq!(actual.values(), expected.values()) } + #[test] + fn test_filter_run_end_encoding_array_remove_all_but_one() { + let run_ends = Int16Array::from(vec![2, 3, 8, 10]); + let values = Int16Array::from(vec![7, -2, 9, -8]); + let a = RunArray::try_new(&run_ends, &values).expect("Failed to create RunArray"); + let b = BooleanArray::from(vec![ + false, false, false, false, false, false, true, false, false, false, + ]); + let c = filter(&a, &b).unwrap(); + let actual: &RunArray = as_run_array(&c); + assert_eq!(1, actual.len()); + + let expected = RunArray::try_new(&Int16Array::from(vec![1]), &Int16Array::from(vec![9])) + .expect("Failed to make expected RunArray test is broken"); + + assert_eq!(&actual.run_ends().values(), &expected.run_ends().values()); + assert_eq!(actual.values(), expected.values()) + } + #[test] fn test_filter_run_end_encoding_array_empty() { let run_ends = Int64Array::from(vec![2, 3, 8, 10]); From 9dd3eda0f6ac4fa8e8295360182f2cf4aef6edc7 Mon Sep 17 00:00:00 2001 From: Fabian Murariu Date: Tue, 2 Apr 2024 10:01:31 +0100 Subject: [PATCH 5/5] create a RunEnd array in a generic form to simplify filter_run_end_array --- arrow-select/src/filter.rs | 48 +++++++------------------------------- 1 file changed, 8 insertions(+), 40 deletions(-) diff --git a/arrow-select/src/filter.rs b/arrow-select/src/filter.rs index b0e25a93842..2af19ff8505 100644 --- a/arrow-select/src/filter.rs +++ b/arrow-select/src/filter.rs @@ -23,8 +23,7 @@ use std::sync::Arc; use arrow_array::builder::BooleanBufferBuilder; use arrow_array::cast::AsArray; use arrow_array::types::{ - ArrowDictionaryKeyType, ArrowPrimitiveType, ByteArrayType, Int16Type, Int32Type, Int64Type, - RunEndIndexType, + ArrowDictionaryKeyType, ArrowPrimitiveType, ByteArrayType, RunEndIndexType, }; use arrow_array::*; use arrow_buffer::{bit_util, BooleanBuffer, NullBuffer, RunEndBuffer}; @@ -342,7 +341,7 @@ fn filter_array(values: &dyn Array, predicate: &FilterPredicate) -> Result { downcast_run_array!{ - values => filter_run_end_array(values, predicate), + values => Ok(Arc::new(filter_run_end_array(values, predicate)?)), t => unimplemented!("Filter not supported for RunEndEncoded type {:?}", t) } } @@ -378,44 +377,11 @@ fn filter_array(values: &dyn Array, predicate: &FilterPredicate) -> Result( - ree_arr: &RunArray, - pred: &FilterPredicate, -) -> Result { - fn downcast_safe( - re_arr: &RunArray, - ) -> Option<&RunArray> { - re_arr.as_any().downcast_ref::>() - } - - if let Some(ree_array) = downcast_safe::(ree_arr) { - let (run_ends, values) = filter_run_end_array_generic(ree_array, pred)?; - let ree_arr: RunArray = - RunArray::try_new(&PrimitiveArray::from(run_ends), &values)?; - Ok(Arc::new(ree_arr)) - } else if let Some(ree_array) = downcast_safe::(ree_arr) { - let (run_ends, values) = filter_run_end_array_generic(ree_array, pred)?; - let ree_arr: RunArray = - RunArray::try_new(&PrimitiveArray::from(run_ends), &values)?; - Ok(Arc::new(ree_arr)) - } else if let Some(ree_array) = downcast_safe::(ree_arr) { - let (run_ends, values) = filter_run_end_array_generic(ree_array, pred)?; - let ree_arr: RunArray = - RunArray::try_new(&PrimitiveArray::from(run_ends), &values)?; - Ok(Arc::new(ree_arr)) - } else { - Err(ArrowError::CastError( - "Run ends for RunArray must be i16 or i32 or i64".to_string(), - )) - } -} - /// Filter any supported [`RunArray`] based on a [`FilterPredicate`] -#[allow(clippy::type_complexity)] -fn filter_run_end_array_generic( +fn filter_run_end_array( re_arr: &RunArray, pred: &FilterPredicate, -) -> Result<(Vec, Arc), ArrowError> +) -> Result, ArrowError> where R::Native: Into + From, R::Native: AddAssign, @@ -454,8 +420,10 @@ where let values = re_arr.values(); let pred = BooleanArray::new(values_filter.finish(), None); - let new_values = filter(&values, &pred)?; - Ok((new_run_ends, new_values)) + let values = filter(&values, &pred)?; + + let run_ends = PrimitiveArray::::new(new_run_ends.into(), None); + RunArray::try_new(&run_ends, &values) } /// Computes a new null mask for `data` based on `predicate`