Skip to content

Commit

Permalink
ARROW-11074: [Rust][DataFusion] Implement predicate push-down for par…
Browse files Browse the repository at this point in the history
…quet tables

While profiling a DataFusion query I found that the code spends a lot of time in reading data from parquet files. Predicate / filter push-down is a commonly used performance optimization, where statistics data stored in parquet files (such as min / max values for columns in a parquet row group) is evaluated against query filters to determine which row groups could contain data requested by a query. In this way, by pushing down query filters all the way to the parquet data source, entire row groups or even parquet files can be skipped often resulting in significant performance improvements.

I have been working on an implementation for a few weeks and initial results look promising - with predicate push-down, DataFusion is now faster than Apache Spark (`140ms for DataFusion vs 200ms for Spark`) for the same query against the same parquet files. Without predicate push-down into parquet, DataFusion takes about 2 - 3s (depending on concurrency) for the same query, because the data is ordered and most files don't contain data that satisfies the query filters, but are still loaded and processed in vain.

This work is based on the following key ideas:
* predicate-push down is implemented by filtering row group metadata entries to only those which could contain data that could satisfy query filters
* it's best to reuse the existing code for evaluating physical expressions already implemented in DataFusion
* filter expressions pushed down to a parquet table are rewritten to use parquet statistics (instead of the actual column data), for example `(column / 2) = 4`  becomes  `(column_min / 2) <= 4 && 4 <= (column_max / 2)` - this is done once for all files in a parquet table
* for each parquet file, a RecordBatch containing all required statistics columns ( [`column_min`, `column_max`] in the example above) is produced, and the predicate expression from the previous step is evaluated, producing a binary array which is finally used to filter the row groups in each parquet file

This is still work in progress - more tests left to write; I am publishing this now to gather feedback.

@andygrove let me know what you think

Closes #9064 from yordan-pavlov/parquet_predicate_push_down

Authored-by: Yordan Pavlov <yordan.pavlov@outlook.com>
Signed-off-by: Andrew Lamb <andrew@nerdnetworks.org>
  • Loading branch information
yordan-pavlov authored and alamb committed Jan 19, 2021
1 parent e20f439 commit 18dc62c
Show file tree
Hide file tree
Showing 9 changed files with 1,137 additions and 25 deletions.
15 changes: 15 additions & 0 deletions rust/arrow/src/array/null.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,12 @@ impl NullArray {
let array_data = ArrayData::builder(DataType::Null).len(length).build();
NullArray::from(array_data)
}

/// Create a new null array of the specified length and type
pub fn new_with_type(length: usize, data_type: DataType) -> Self {
let array_data = ArrayData::builder(data_type).len(length).build();
NullArray::from(array_data)
}
}

impl Array for NullArray {
Expand Down Expand Up @@ -147,6 +153,15 @@ mod tests {
assert_eq!(array2.offset(), 8);
}

#[test]
fn test_null_array_new_with_type() {
let length = 10;
let data_type = DataType::Int8;
let array = NullArray::new_with_type(length, data_type.clone());
assert_eq!(array.len(), length);
assert_eq!(array.data_type(), &data_type);
}

#[test]
fn test_debug_null_array() {
let array = NullArray::new(1024 * 1024);
Expand Down
56 changes: 54 additions & 2 deletions rust/datafusion/src/datasource/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ use crate::logical_plan::Expr;
use crate::physical_plan::parquet::ParquetExec;
use crate::physical_plan::ExecutionPlan;

use super::datasource::TableProviderFilterPushDown;

/// Table-based representation of a `ParquetFile`.
pub struct ParquetTable {
path: String,
Expand All @@ -41,7 +43,7 @@ pub struct ParquetTable {
impl ParquetTable {
/// Attempt to initialize a new `ParquetTable` from a file path.
pub fn try_new(path: &str, max_concurrency: usize) -> Result<Self> {
let parquet_exec = ParquetExec::try_from_path(path, None, 0, 1)?;
let parquet_exec = ParquetExec::try_from_path(path, None, None, 0, 1)?;
let schema = parquet_exec.schema();
Ok(Self {
path: path.to_string(),
Expand All @@ -67,17 +69,26 @@ impl TableProvider for ParquetTable {
self.schema.clone()
}

fn supports_filter_pushdown(
&self,
_filter: &Expr,
) -> Result<TableProviderFilterPushDown> {
Ok(TableProviderFilterPushDown::Inexact)
}

/// Scan the file(s), using the provided projection, and return one BatchIterator per
/// partition.
fn scan(
&self,
projection: &Option<Vec<usize>>,
batch_size: usize,
_filters: &[Expr],
filters: &[Expr],
) -> Result<Arc<dyn ExecutionPlan>> {
let predicate = combine_filters(filters);
Ok(Arc::new(ParquetExec::try_from_path(
&self.path,
projection.clone(),
predicate,
batch_size,
self.max_concurrency,
)?))
Expand All @@ -88,6 +99,22 @@ impl TableProvider for ParquetTable {
}
}

/// Combines an array of filter expressions into a single filter expression
/// consisting of the input filter expressions joined with logical AND.
/// Returns None if the filters array is empty.
fn combine_filters(filters: &[Expr]) -> Option<Expr> {
if filters.is_empty() {
return None;
}
let combined_filter = filters
.iter()
.skip(1)
.fold(filters[0].clone(), |acc, filter| {
crate::logical_plan::and(acc, filter.clone())
});
Some(combined_filter)
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down Expand Up @@ -333,4 +360,29 @@ mod tests {
.expect("should have received at least one batch")
.map_err(|e| e.into())
}

#[test]
fn combine_zero_filters() {
let result = combine_filters(&[]);
assert_eq!(result, None);
}

#[test]
fn combine_one_filter() {
use crate::logical_plan::{binary_expr, col, lit, Operator};
let filter = binary_expr(col("c1"), Operator::Lt, lit(1));
let result = combine_filters(&[filter.clone()]);
assert_eq!(result, Some(filter));
}

#[test]
fn combine_multiple_filters() {
use crate::logical_plan::{and, binary_expr, col, lit, Operator};
let filter1 = binary_expr(col("c1"), Operator::Lt, lit(1));
let filter2 = binary_expr(col("c2"), Operator::Lt, lit(2));
let filter3 = binary_expr(col("c3"), Operator::Lt, lit(3));
let result =
combine_filters(&[filter1.clone(), filter2.clone(), filter3.clone()]);
assert_eq!(result, Some(and(and(filter1, filter2), filter3)));
}
}
2 changes: 1 addition & 1 deletion rust/datafusion/src/logical_plan/operators.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use std::{fmt, ops};
use super::{binary_expr, Expr};

/// Operators applied to expressions
#[derive(Debug, Clone, PartialEq, Eq)]
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub enum Operator {
/// Expressions are equal
Eq,
Expand Down
2 changes: 1 addition & 1 deletion rust/datafusion/src/optimizer/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ pub fn rewrite_expression(expr: &Expr, expressions: &Vec<Expr>) -> Result<Expr>
match expr {
Expr::BinaryExpr { op, .. } => Ok(Expr::BinaryExpr {
left: Box::new(expressions[0].clone()),
op: op.clone(),
op: *op,
right: Box::new(expressions[1].clone()),
}),
Expr::IsNull(_) => Ok(Expr::IsNull(Box::new(expressions[0].clone()))),
Expand Down
Loading

0 comments on commit 18dc62c

Please sign in to comment.