From 19a87873074dfed49bdbf15f597074a1b0c22778 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Fri, 16 Aug 2024 06:37:38 -0600 Subject: [PATCH 1/4] Provide configuration option for which kernel to use to filter batches --- datafusion/physical-plan/src/filter.rs | 52 +++++++++++++++++++++++++- 1 file changed, 51 insertions(+), 1 deletion(-) diff --git a/datafusion/physical-plan/src/filter.rs b/datafusion/physical-plan/src/filter.rs index 568987b14798..69456f23ff06 100644 --- a/datafusion/physical-plan/src/filter.rs +++ b/datafusion/physical-plan/src/filter.rs @@ -29,9 +29,12 @@ use crate::{ DisplayFormatType, ExecutionPlan, }; -use arrow::compute::filter_record_batch; +use arrow::compute::{filter_record_batch, take, FilterBuilder}; use arrow::datatypes::{DataType, SchemaRef}; use arrow::record_batch::RecordBatch; +use arrow_array::builder::Int32Builder; +use arrow_array::{BooleanArray, RecordBatchOptions}; +use arrow_schema::ArrowError; use datafusion_common::cast::as_boolean_array; use datafusion_common::stats::Precision; use datafusion_common::{internal_err, plan_err, DataFusionError, Result}; @@ -47,6 +50,18 @@ use datafusion_physical_expr::{ use futures::stream::{Stream, StreamExt}; use log::trace; +/// Options that control how FilterExec processes batches +#[derive(Debug)] +pub enum FilterKernel { + /// Use Arrow's `filter_array` which can return input batches without copying data when the + /// predicate evaluates to true for all rows in a batch + Arrow, + /// Use the `take` kernel to produce filtered batches, ensuring that no input batches are + /// passed through (can be useful for systems where the input to FilterExec can re-use + /// arrays) + Take, +} + /// FilterExec evaluates a boolean predicate against all input batches to determine which rows to /// include in its output batches. #[derive(Debug)] @@ -61,6 +76,8 @@ pub struct FilterExec { default_selectivity: u8, /// Properties equivalence properties, partitioning, etc. cache: PlanProperties, + /// Which kernel to use when filtering batches + filter_kernel: FilterKernel, } impl FilterExec { @@ -68,6 +85,15 @@ impl FilterExec { pub fn try_new( predicate: Arc, input: Arc, + ) -> Result { + Self::try_new_with_kernel(predicate, input, FilterKernel::Arrow) + } + + /// Create a FilterExec on an input using the specified kernel to create filtered batches + pub fn try_new_with_kernel( + predicate: Arc, + input: Arc, + filter_kernel: FilterKernel, ) -> Result { match predicate.data_type(input.schema().as_ref())? { DataType::Boolean => { @@ -80,6 +106,7 @@ impl FilterExec { metrics: ExecutionPlanMetricsSet::new(), default_selectivity, cache, + filter_kernel, }) } other => { @@ -367,6 +394,29 @@ pub(crate) fn batch_filter( }) } +fn filter_record_batch_with_take( + record_batch: &RecordBatch, + predicate: &BooleanArray, +) -> std::result::Result { + // turn predicate into selection vector + let mut sv = Int32Builder::with_capacity(predicate.true_count()); + for i in 0..predicate.len() { + if predicate.value(i) { + sv.append_value(i as i32); + } + } + let sv = sv.finish(); + + // use take to ensure that no input batches are passed through + let filtered_arrays = record_batch + .columns() + .iter() + .map(|a| take(a, &sv, None)) + .collect::, _>>()?; + let options = RecordBatchOptions::default().with_row_count(Some(sv.len())); + RecordBatch::try_new_with_options(record_batch.schema(), filtered_arrays, &options) +} + impl Stream for FilterExecStream { type Item = Result; From c9157b22040c51366b15ef56fabc6f784c74aeb4 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Fri, 16 Aug 2024 06:44:25 -0600 Subject: [PATCH 2/4] integrate --- .../src/aggregates/no_grouping.rs | 6 +++-- datafusion/physical-plan/src/filter.rs | 24 +++++++++++++------ 2 files changed, 21 insertions(+), 9 deletions(-) diff --git a/datafusion/physical-plan/src/aggregates/no_grouping.rs b/datafusion/physical-plan/src/aggregates/no_grouping.rs index 99417e4ee3e9..bb8cb7dea8af 100644 --- a/datafusion/physical-plan/src/aggregates/no_grouping.rs +++ b/datafusion/physical-plan/src/aggregates/no_grouping.rs @@ -33,7 +33,7 @@ use std::borrow::Cow; use std::sync::Arc; use std::task::{Context, Poll}; -use crate::filter::batch_filter; +use crate::filter::{batch_filter, FilterKernel}; use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation}; use futures::stream::{Stream, StreamExt}; @@ -215,7 +215,9 @@ fn aggregate_batch( .try_for_each(|((accum, expr), filter)| { // 1.2 let batch = match filter { - Some(filter) => Cow::Owned(batch_filter(&batch, filter)?), + Some(filter) => { + Cow::Owned(batch_filter(&batch, filter, &FilterKernel::Default)?) + } None => Cow::Borrowed(&batch), }; diff --git a/datafusion/physical-plan/src/filter.rs b/datafusion/physical-plan/src/filter.rs index 69456f23ff06..6ec102ea8df2 100644 --- a/datafusion/physical-plan/src/filter.rs +++ b/datafusion/physical-plan/src/filter.rs @@ -29,7 +29,7 @@ use crate::{ DisplayFormatType, ExecutionPlan, }; -use arrow::compute::{filter_record_batch, take, FilterBuilder}; +use arrow::compute::{filter_record_batch, take}; use arrow::datatypes::{DataType, SchemaRef}; use arrow::record_batch::RecordBatch; use arrow_array::builder::Int32Builder; @@ -51,15 +51,15 @@ use futures::stream::{Stream, StreamExt}; use log::trace; /// Options that control how FilterExec processes batches -#[derive(Debug)] +#[derive(Debug, Copy, Clone)] pub enum FilterKernel { /// Use Arrow's `filter_array` which can return input batches without copying data when the /// predicate evaluates to true for all rows in a batch - Arrow, + Default, /// Use the `take` kernel to produce filtered batches, ensuring that no input batches are /// passed through (can be useful for systems where the input to FilterExec can re-use /// arrays) - Take, + AlwaysCreateNewBatches, } /// FilterExec evaluates a boolean predicate against all input batches to determine which rows to @@ -86,7 +86,7 @@ impl FilterExec { predicate: Arc, input: Arc, ) -> Result { - Self::try_new_with_kernel(predicate, input, FilterKernel::Arrow) + Self::try_new_with_kernel(predicate, input, FilterKernel::Default) } /// Create a FilterExec on an input using the specified kernel to create filtered batches @@ -310,6 +310,7 @@ impl ExecutionPlan for FilterExec { predicate: Arc::clone(&self.predicate), input: self.input.execute(partition, context)?, baseline_metrics, + filter_kernel: self.filter_kernel, })) } @@ -372,11 +373,14 @@ struct FilterExecStream { input: SendableRecordBatchStream, /// runtime metrics recording baseline_metrics: BaselineMetrics, + /// Filter kernel to use + filter_kernel: FilterKernel, } pub(crate) fn batch_filter( batch: &RecordBatch, predicate: &Arc, + filter_kernel: &FilterKernel, ) -> Result { predicate .evaluate(batch) @@ -384,7 +388,12 @@ pub(crate) fn batch_filter( .and_then(|array| { Ok(match as_boolean_array(&array) { // apply filter array to record batch - Ok(filter_array) => filter_record_batch(batch, filter_array)?, + Ok(filter_array) => match filter_kernel { + FilterKernel::Default => filter_record_batch(batch, filter_array)?, + FilterKernel::AlwaysCreateNewBatches => { + filter_record_batch_with_take(batch, filter_array)? + } + }, Err(_) => { return internal_err!( "Cannot create filter_array from non-boolean predicates" @@ -429,7 +438,8 @@ impl Stream for FilterExecStream { match ready!(self.input.poll_next_unpin(cx)) { Some(Ok(batch)) => { let timer = self.baseline_metrics.elapsed_compute().timer(); - let filtered_batch = batch_filter(&batch, &self.predicate)?; + let filtered_batch = + batch_filter(&batch, &self.predicate, &self.filter_kernel)?; timer.done(); // skip entirely filtered batches if filtered_batch.num_rows() == 0 { From 1a7aadf68db21bd4ccc1bd8963f6e9269ac68789 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Fri, 16 Aug 2024 16:06:10 -0600 Subject: [PATCH 3/4] save --- datafusion/physical-plan/src/filter.rs | 67 +++++++++++++------------- 1 file changed, 34 insertions(+), 33 deletions(-) diff --git a/datafusion/physical-plan/src/filter.rs b/datafusion/physical-plan/src/filter.rs index 6ec102ea8df2..fb1912f3a50d 100644 --- a/datafusion/physical-plan/src/filter.rs +++ b/datafusion/physical-plan/src/filter.rs @@ -15,11 +15,6 @@ // specific language governing permissions and limitations // under the License. -use std::any::Any; -use std::pin::Pin; -use std::sync::Arc; -use std::task::{ready, Context, Poll}; - use super::{ ColumnStatistics, DisplayAs, ExecutionPlanProperties, PlanProperties, RecordBatchStream, SendableRecordBatchStream, Statistics, @@ -28,13 +23,16 @@ use crate::{ metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet}, DisplayFormatType, ExecutionPlan, }; +use arrow::array::MutableArrayData; +use std::any::Any; +use std::pin::Pin; +use std::sync::Arc; +use std::task::{ready, Context, Poll}; -use arrow::compute::{filter_record_batch, take}; +use arrow::compute::filter_record_batch; use arrow::datatypes::{DataType, SchemaRef}; use arrow::record_batch::RecordBatch; -use arrow_array::builder::Int32Builder; -use arrow_array::{BooleanArray, RecordBatchOptions}; -use arrow_schema::ArrowError; +use arrow_array::{make_array, ArrayRef, RecordBatchOptions}; use datafusion_common::cast::as_boolean_array; use datafusion_common::stats::Precision; use datafusion_common::{internal_err, plan_err, DataFusionError, Result}; @@ -391,7 +389,33 @@ pub(crate) fn batch_filter( Ok(filter_array) => match filter_kernel { FilterKernel::Default => filter_record_batch(batch, filter_array)?, FilterKernel::AlwaysCreateNewBatches => { - filter_record_batch_with_take(batch, filter_array)? + if filter_array.true_count() == batch.num_rows() { + // special case where we just make an exact copy + let arrays: Vec = batch + .columns() + .iter() + .map(|array| { + let capacity = array.len(); + let data = array.to_data(); + let mut mutable = MutableArrayData::new( + vec![&data], + false, + capacity, + ); + mutable.extend(0, 0, capacity); + make_array(mutable.freeze()) + }) + .collect(); + let options = RecordBatchOptions::new() + .with_row_count(Some(batch.num_rows())); + RecordBatch::try_new_with_options( + batch.schema().clone(), + arrays, + &options, + )? + } else { + filter_record_batch(batch, filter_array)? + } } }, Err(_) => { @@ -403,29 +427,6 @@ pub(crate) fn batch_filter( }) } -fn filter_record_batch_with_take( - record_batch: &RecordBatch, - predicate: &BooleanArray, -) -> std::result::Result { - // turn predicate into selection vector - let mut sv = Int32Builder::with_capacity(predicate.true_count()); - for i in 0..predicate.len() { - if predicate.value(i) { - sv.append_value(i as i32); - } - } - let sv = sv.finish(); - - // use take to ensure that no input batches are passed through - let filtered_arrays = record_batch - .columns() - .iter() - .map(|a| take(a, &sv, None)) - .collect::, _>>()?; - let options = RecordBatchOptions::default().with_row_count(Some(sv.len())); - RecordBatch::try_new_with_options(record_batch.schema(), filtered_arrays, &options) -} - impl Stream for FilterExecStream { type Item = Result; From e1d0da6d1c68663b048efbeaafa859165f46b627 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Fri, 16 Aug 2024 16:16:20 -0600 Subject: [PATCH 4/4] use boolean --- .../src/aggregates/no_grouping.rs | 6 +-- datafusion/physical-plan/src/filter.rs | 45 ++++++++----------- 2 files changed, 20 insertions(+), 31 deletions(-) diff --git a/datafusion/physical-plan/src/aggregates/no_grouping.rs b/datafusion/physical-plan/src/aggregates/no_grouping.rs index bb8cb7dea8af..ecbf2ffc74cd 100644 --- a/datafusion/physical-plan/src/aggregates/no_grouping.rs +++ b/datafusion/physical-plan/src/aggregates/no_grouping.rs @@ -33,7 +33,7 @@ use std::borrow::Cow; use std::sync::Arc; use std::task::{Context, Poll}; -use crate::filter::{batch_filter, FilterKernel}; +use crate::filter::batch_filter; use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation}; use futures::stream::{Stream, StreamExt}; @@ -215,9 +215,7 @@ fn aggregate_batch( .try_for_each(|((accum, expr), filter)| { // 1.2 let batch = match filter { - Some(filter) => { - Cow::Owned(batch_filter(&batch, filter, &FilterKernel::Default)?) - } + Some(filter) => Cow::Owned(batch_filter(&batch, filter, true)?), None => Cow::Borrowed(&batch), }; diff --git a/datafusion/physical-plan/src/filter.rs b/datafusion/physical-plan/src/filter.rs index fb1912f3a50d..8945441ad15f 100644 --- a/datafusion/physical-plan/src/filter.rs +++ b/datafusion/physical-plan/src/filter.rs @@ -48,18 +48,6 @@ use datafusion_physical_expr::{ use futures::stream::{Stream, StreamExt}; use log::trace; -/// Options that control how FilterExec processes batches -#[derive(Debug, Copy, Clone)] -pub enum FilterKernel { - /// Use Arrow's `filter_array` which can return input batches without copying data when the - /// predicate evaluates to true for all rows in a batch - Default, - /// Use the `take` kernel to produce filtered batches, ensuring that no input batches are - /// passed through (can be useful for systems where the input to FilterExec can re-use - /// arrays) - AlwaysCreateNewBatches, -} - /// FilterExec evaluates a boolean predicate against all input batches to determine which rows to /// include in its output batches. #[derive(Debug)] @@ -74,8 +62,9 @@ pub struct FilterExec { default_selectivity: u8, /// Properties equivalence properties, partitioning, etc. cache: PlanProperties, - /// Which kernel to use when filtering batches - filter_kernel: FilterKernel, + /// Whether to allow an input batch to be returned unmodified in the case where + /// the predicate evaluates to true for all rows in the batch + reuse_input_batches: bool, } impl FilterExec { @@ -84,14 +73,14 @@ impl FilterExec { predicate: Arc, input: Arc, ) -> Result { - Self::try_new_with_kernel(predicate, input, FilterKernel::Default) + Self::try_new_with_reuse_input_batches(predicate, input, true) } /// Create a FilterExec on an input using the specified kernel to create filtered batches - pub fn try_new_with_kernel( + pub fn try_new_with_reuse_input_batches( predicate: Arc, input: Arc, - filter_kernel: FilterKernel, + reuse_input_batches: bool, ) -> Result { match predicate.data_type(input.schema().as_ref())? { DataType::Boolean => { @@ -104,7 +93,7 @@ impl FilterExec { metrics: ExecutionPlanMetricsSet::new(), default_selectivity, cache, - filter_kernel, + reuse_input_batches, }) } other => { @@ -308,7 +297,7 @@ impl ExecutionPlan for FilterExec { predicate: Arc::clone(&self.predicate), input: self.input.execute(partition, context)?, baseline_metrics, - filter_kernel: self.filter_kernel, + reuse_input_batches: self.reuse_input_batches, })) } @@ -371,14 +360,15 @@ struct FilterExecStream { input: SendableRecordBatchStream, /// runtime metrics recording baseline_metrics: BaselineMetrics, - /// Filter kernel to use - filter_kernel: FilterKernel, + /// Whether to allow an input batch to be returned unmodified in the case where + /// the predicate evaluates to true for all rows in the batch + reuse_input_batches: bool, } pub(crate) fn batch_filter( batch: &RecordBatch, predicate: &Arc, - filter_kernel: &FilterKernel, + reuse_input_batches: bool, ) -> Result { predicate .evaluate(batch) @@ -386,9 +376,10 @@ pub(crate) fn batch_filter( .and_then(|array| { Ok(match as_boolean_array(&array) { // apply filter array to record batch - Ok(filter_array) => match filter_kernel { - FilterKernel::Default => filter_record_batch(batch, filter_array)?, - FilterKernel::AlwaysCreateNewBatches => { + Ok(filter_array) => { + if reuse_input_batches { + filter_record_batch(batch, filter_array)? + } else { if filter_array.true_count() == batch.num_rows() { // special case where we just make an exact copy let arrays: Vec = batch @@ -417,7 +408,7 @@ pub(crate) fn batch_filter( filter_record_batch(batch, filter_array)? } } - }, + } Err(_) => { return internal_err!( "Cannot create filter_array from non-boolean predicates" @@ -440,7 +431,7 @@ impl Stream for FilterExecStream { Some(Ok(batch)) => { let timer = self.baseline_metrics.elapsed_compute().timer(); let filtered_batch = - batch_filter(&batch, &self.predicate, &self.filter_kernel)?; + batch_filter(&batch, &self.predicate, self.reuse_input_batches)?; timer.done(); // skip entirely filtered batches if filtered_batch.num_rows() == 0 {