Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion datafusion/physical-plan/src/aggregates/no_grouping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ fn aggregate_batch(
.try_for_each(|((accum, expr), filter)| {
// 1.2
let batch = match filter {
Some(filter) => Cow::Owned(batch_filter(&batch, filter)?),
Some(filter) => Cow::Owned(batch_filter(&batch, filter, true)?),
None => Cow::Borrowed(&batch),
};

Expand Down
66 changes: 59 additions & 7 deletions datafusion/physical-plan/src/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,6 @@
// specific language governing permissions and limitations
// under the License.

use std::any::Any;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{ready, Context, Poll};

use super::{
ColumnStatistics, DisplayAs, ExecutionPlanProperties, PlanProperties,
RecordBatchStream, SendableRecordBatchStream, Statistics,
Expand All @@ -28,10 +23,16 @@ use crate::{
metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet},
DisplayFormatType, ExecutionPlan,
};
use arrow::array::MutableArrayData;
use std::any::Any;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{ready, Context, Poll};

use arrow::compute::filter_record_batch;
use arrow::datatypes::{DataType, SchemaRef};
use arrow::record_batch::RecordBatch;
use arrow_array::{make_array, ArrayRef, RecordBatchOptions};
use datafusion_common::cast::as_boolean_array;
use datafusion_common::stats::Precision;
use datafusion_common::{internal_err, plan_err, DataFusionError, Result};
Expand Down Expand Up @@ -61,13 +62,25 @@ pub struct FilterExec {
default_selectivity: u8,
/// Properties equivalence properties, partitioning, etc.
cache: PlanProperties,
/// Whether to allow an input batch to be returned unmodified in the case where
/// the predicate evaluates to true for all rows in the batch
reuse_input_batches: bool,
}

impl FilterExec {
/// Create a FilterExec on an input
pub fn try_new(
predicate: Arc<dyn PhysicalExpr>,
input: Arc<dyn ExecutionPlan>,
) -> Result<Self> {
Self::try_new_with_reuse_input_batches(predicate, input, true)
}

/// Create a FilterExec on an input using the specified kernel to create filtered batches
pub fn try_new_with_reuse_input_batches(
predicate: Arc<dyn PhysicalExpr>,
input: Arc<dyn ExecutionPlan>,
reuse_input_batches: bool,
) -> Result<Self> {
match predicate.data_type(input.schema().as_ref())? {
DataType::Boolean => {
Expand All @@ -80,6 +93,7 @@ impl FilterExec {
metrics: ExecutionPlanMetricsSet::new(),
default_selectivity,
cache,
reuse_input_batches,
})
}
other => {
Expand Down Expand Up @@ -283,6 +297,7 @@ impl ExecutionPlan for FilterExec {
predicate: Arc::clone(&self.predicate),
input: self.input.execute(partition, context)?,
baseline_metrics,
reuse_input_batches: self.reuse_input_batches,
}))
}

Expand Down Expand Up @@ -345,19 +360,55 @@ struct FilterExecStream {
input: SendableRecordBatchStream,
/// runtime metrics recording
baseline_metrics: BaselineMetrics,
/// Whether to allow an input batch to be returned unmodified in the case where
/// the predicate evaluates to true for all rows in the batch
reuse_input_batches: bool,
}

pub(crate) fn batch_filter(
batch: &RecordBatch,
predicate: &Arc<dyn PhysicalExpr>,
reuse_input_batches: bool,
) -> Result<RecordBatch> {
predicate
.evaluate(batch)
.and_then(|v| v.into_array(batch.num_rows()))
.and_then(|array| {
Ok(match as_boolean_array(&array) {
// apply filter array to record batch
Ok(filter_array) => filter_record_batch(batch, filter_array)?,
Ok(filter_array) => {
if reuse_input_batches {
filter_record_batch(batch, filter_array)?
} else {
if filter_array.true_count() == batch.num_rows() {
Copy link
Contributor

@Dandandan Dandandan Aug 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As computing true count is not free, I am wondering if we can either

  • move this to arrow filter compute kernel
  • check the returned array(s) on pointer equality, copy if equal

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Specifically, I think the optimize function would be a natural place to put this: https://docs.rs/arrow-select/52.2.0/src/arrow_select/filter.rs.html#181

// special case where we just make an exact copy
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I am missing something -- why go through all the effort with MutableArrayData

In other words, why isn't thus simply

Ok(batch.clone())

To literally return the input batch?

I think that would be less code and faster

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That would be the same that the filter.

The use case as far as I understand is that for comet the data needs to be a new copy, as spark will reuse the existing data/arrays.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

@alamb alamb Aug 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see -- I think it would help if we made a function with a name that made it clear what was going on, something like

fn force_new_data_copy(...)

I also left a suggestion here apache/datafusion-comet#835 (comment)

let arrays: Vec<ArrayRef> = batch
.columns()
.iter()
.map(|array| {
let capacity = array.len();
let data = array.to_data();
let mut mutable = MutableArrayData::new(
vec![&data],
false,
capacity,
);
mutable.extend(0, 0, capacity);
make_array(mutable.freeze())
})
.collect();
let options = RecordBatchOptions::new()
.with_row_count(Some(batch.num_rows()));
RecordBatch::try_new_with_options(
batch.schema().clone(),
arrays,
&options,
)?
} else {
filter_record_batch(batch, filter_array)?
}
}
}
Err(_) => {
return internal_err!(
"Cannot create filter_array from non-boolean predicates"
Expand All @@ -379,7 +430,8 @@ impl Stream for FilterExecStream {
match ready!(self.input.poll_next_unpin(cx)) {
Some(Ok(batch)) => {
let timer = self.baseline_metrics.elapsed_compute().timer();
let filtered_batch = batch_filter(&batch, &self.predicate)?;
let filtered_batch =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I recommend adding a test so we don't accidentally break the feature by accident

batch_filter(&batch, &self.predicate, self.reuse_input_batches)?;
timer.done();
// skip entirely filtered batches
if filtered_batch.num_rows() == 0 {
Expand Down