Skip to content

Commit

Permalink
filter fast path
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed May 24, 2021
1 parent bace8d4 commit a18608b
Showing 1 changed file with 32 additions and 9 deletions.
41 changes: 32 additions & 9 deletions arrow/src/compute/kernels/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,10 @@ pub fn build_filter(filter: &BooleanArray) -> Result<Filter> {
let chunks = iter.collect::<Vec<_>>();

Ok(Box::new(move |array: &ArrayData| {
if filter_count == array.len() {
return array.clone();
}

let mut mutable = MutableArrayData::new(vec![array], false, filter_count);
chunks
.iter()
Expand All @@ -205,7 +209,8 @@ pub fn build_filter(filter: &BooleanArray) -> Result<Filter> {
}))
}

fn prepare_filter(filter: &BooleanArray) -> BooleanArray {
/// Remove null values by do a bitmask AND operation with null bits and the boolean bits.
fn prep_null_mask_filter(filter: &BooleanArray) -> BooleanArray {
let array_data = filter.data_ref();
let null_bitmap = array_data.null_buffer().unwrap();
let mask = filter.values();
Expand Down Expand Up @@ -240,18 +245,22 @@ pub fn filter(array: &Array, filter: &BooleanArray) -> Result<ArrayRef> {
if filter.null_count() > 0 {
// this greatly simplifies subsequent filtering code
// now we only have a boolean mask to deal with
let filter = prepare_filter(filter);
let filter = prep_null_mask_filter(filter);
// fully qualified syntax, because we have an argument with the same name
return crate::compute::kernels::filter::filter(array, &filter);
}

let iter = SlicesIterator::new(filter);

let mut mutable =
MutableArrayData::new(vec![array.data_ref()], false, iter.filter_count);
iter.for_each(|(start, end)| mutable.extend(0, start, end));
let data = mutable.freeze();
Ok(make_array(data))
if iter.filter_count == array.len() {
let data = array.data().clone();
Ok(make_array(data))
} else {
let mut mutable =
MutableArrayData::new(vec![array.data_ref()], false, iter.filter_count);
iter.for_each(|(start, end)| mutable.extend(0, start, end));
let data = mutable.freeze();
Ok(make_array(data))
}
}

/// Returns a new [RecordBatch] with arrays containing only values matching the filter.
Expand All @@ -262,7 +271,7 @@ pub fn filter_record_batch(
if filter.null_count() > 0 {
// this greatly simplifies subsequent filtering code
// now we only have a boolean mask to deal with
let filter = prepare_filter(filter);
let filter = prep_null_mask_filter(filter);
// fully qualified syntax, because we have an argument with the same name
return crate::compute::kernels::filter::filter_record_batch(
record_batch,
Expand Down Expand Up @@ -638,4 +647,18 @@ mod tests {
assert_eq!(out_arr0, out_arr1);
Ok(())
}

#[test]
fn test_fast_path() -> Result<()> {
let a: PrimitiveArray<Int64Type> =
PrimitiveArray::from(vec![Some(1), Some(2), None]);
let mask = BooleanArray::from(vec![true, true, true]);
let out = filter(&a, &mask)?;
let b = out
.as_any()
.downcast_ref::<PrimitiveArray<Int64Type>>()
.unwrap();
assert_eq!(&a, b);
Ok(())
}
}

0 comments on commit a18608b

Please sign in to comment.