diff --git a/datafusion/core/src/dataframe.rs b/datafusion/core/src/dataframe.rs index c52f8259f2d0..4e9798df9ab0 100644 --- a/datafusion/core/src/dataframe.rs +++ b/datafusion/core/src/dataframe.rs @@ -36,9 +36,9 @@ use crate::physical_plan::SendableRecordBatchStream; use crate::physical_plan::{collect, collect_partitioned}; use crate::physical_plan::{execute_stream, execute_stream_partitioned, ExecutionPlan}; use crate::prelude::SessionContext; -use crate::scalar::ScalarValue; use async_trait::async_trait; use datafusion_common::{Column, DFSchema}; +use datafusion_expr::TableProviderFilterPushDown; use parking_lot::RwLock; use parquet::file::properties::WriterProperties; use std::any::Any; @@ -773,6 +773,14 @@ impl TableProvider for DataFrame { self } + fn supports_filter_pushdown( + &self, + _filter: &Expr, + ) -> Result { + // A filter is added on the DataFrame when given + Ok(TableProviderFilterPushDown::Exact) + } + fn schema(&self) -> SchemaRef { let schema: Schema = self.plan.schema().as_ref().into(); Arc::new(schema) @@ -789,7 +797,7 @@ impl TableProvider for DataFrame { filters: &[Expr], limit: Option, ) -> Result> { - let expr = projection + let mut expr = projection .as_ref() // construct projections .map_or_else( @@ -806,12 +814,12 @@ impl TableProvider for DataFrame { .collect::>(); self.select_columns(names.as_slice()) }, - )? - // add predicates, otherwise use `true` as the predicate - .filter(filters.iter().cloned().fold( - Expr::Literal(ScalarValue::Boolean(Some(true))), - |acc, new| acc.and(new), - ))?; + )?; + // Add filter when given + let filter = filters.iter().cloned().reduce(|acc, new| acc.and(new)); + if let Some(filter) = filter { + expr = expr.filter(filter)? + } // add a limit if given Self::new( self.session_state.clone(), @@ -830,9 +838,10 @@ mod tests { use std::vec; use super::*; - use crate::execution::options::CsvReadOptions; + use crate::execution::options::{CsvReadOptions, ParquetReadOptions}; use crate::physical_plan::ColumnarValue; use crate::test_util; + use crate::test_util::parquet_test_data; use crate::{assert_batches_sorted_eq, execution::context::SessionContext}; use arrow::array::Int32Array; use arrow::datatypes::DataType; @@ -1349,6 +1358,34 @@ mod tests { Ok(()) } + #[tokio::test] + async fn filter_pushdown_dataframe() -> Result<()> { + let ctx = SessionContext::new(); + + ctx.register_parquet( + "test", + &format!("{}/alltypes_plain.snappy.parquet", parquet_test_data()), + ParquetReadOptions::default(), + ) + .await?; + + ctx.register_table("t1", ctx.table("test")?)?; + + let df = ctx + .table("t1")? + .filter(col("id").eq(lit(1)))? + .select_columns(&["bool_col", "int_col"])?; + + let plan = df.explain(false, false)?.collect().await?; + // Filters all the way to Parquet + let formatted = arrow::util::pretty::pretty_format_batches(&plan) + .unwrap() + .to_string(); + assert!(formatted.contains("predicate=id_min@0 <= 1 AND 1 <= id_max@1")); + + Ok(()) + } + #[tokio::test] async fn cast_expr_test() -> Result<()> { let df = test_table() diff --git a/datafusion/core/src/datasource/view.rs b/datafusion/core/src/datasource/view.rs index 3cd9189dbfc1..dfe6d42f8e82 100644 --- a/datafusion/core/src/datasource/view.rs +++ b/datafusion/core/src/datasource/view.rs @@ -21,7 +21,7 @@ use std::{any::Any, sync::Arc}; use arrow::datatypes::SchemaRef; use async_trait::async_trait; -use datafusion_expr::LogicalPlanBuilder; +use datafusion_expr::{LogicalPlanBuilder, TableProviderFilterPushDown}; use crate::{ error::Result, @@ -89,22 +89,30 @@ impl TableProvider for ViewTable { self.definition.as_deref() } + fn supports_filter_pushdown( + &self, + _filter: &Expr, + ) -> Result { + // A filter is added on the View when given + Ok(TableProviderFilterPushDown::Exact) + } + async fn scan( &self, state: &SessionState, projection: &Option>, - _filters: &[Expr], - _limit: Option, + filters: &[Expr], + limit: Option, ) -> Result> { // clone state and start_execution so that now() works in views let mut state_cloned = state.clone(); state_cloned.execution_props.start_execution(); - if let Some(projection) = projection { + let plan = if let Some(projection) = projection { // avoiding adding a redundant projection (e.g. SELECT * FROM view) let current_projection = (0..self.logical_plan.schema().fields().len()).collect::>(); if projection == ¤t_projection { - state_cloned.create_physical_plan(&self.logical_plan).await + self.logical_plan().clone() } else { let fields: Vec = projection .iter() @@ -114,20 +122,35 @@ impl TableProvider for ViewTable { ) }) .collect(); - let plan = LogicalPlanBuilder::from(self.logical_plan.clone()) + LogicalPlanBuilder::from(self.logical_plan.clone()) .project(fields)? - .build()?; - state_cloned.create_physical_plan(&plan).await + .build()? } } else { - state_cloned.create_physical_plan(&self.logical_plan).await + self.logical_plan().clone() + }; + let mut plan = LogicalPlanBuilder::from(plan); + let filter = filters.iter().cloned().reduce(|acc, new| acc.and(new)); + + if let Some(filter) = filter { + plan = plan.filter(filter)?; } + + if let Some(limit) = limit { + plan = plan.limit(0, Some(limit))?; + } + + state_cloned.create_physical_plan(&plan.build()?).await } } #[cfg(test)] mod tests { + use datafusion_expr::{col, lit}; + + use crate::execution::options::ParquetReadOptions; use crate::prelude::SessionContext; + use crate::test_util::parquet_test_data; use crate::{assert_batches_eq, execution::context::SessionConfig}; use super::*; @@ -393,6 +416,64 @@ mod tests { Ok(()) } + #[tokio::test] + async fn filter_pushdown_view() -> Result<()> { + let ctx = SessionContext::new(); + + ctx.register_parquet( + "test", + &format!("{}/alltypes_plain.snappy.parquet", parquet_test_data()), + ParquetReadOptions::default(), + ) + .await?; + + ctx.register_table("t1", ctx.table("test")?)?; + + ctx.sql("CREATE VIEW t2 as SELECT * FROM t1").await?; + + let df = ctx + .table("t2")? + .filter(col("id").eq(lit(1)))? + .select_columns(&["bool_col", "int_col"])?; + + let plan = df.explain(false, false)?.collect().await?; + // Filters all the way to Parquet + let formatted = arrow::util::pretty::pretty_format_batches(&plan) + .unwrap() + .to_string(); + assert!(formatted.contains("predicate=id_min@0 <= 1 AND 1 <= id_max@1")); + Ok(()) + } + + #[tokio::test] + async fn limit_pushdown_view() -> Result<()> { + let ctx = SessionContext::new(); + + ctx.register_parquet( + "test", + &format!("{}/alltypes_plain.snappy.parquet", parquet_test_data()), + ParquetReadOptions::default(), + ) + .await?; + + ctx.register_table("t1", ctx.table("test")?)?; + + ctx.sql("CREATE VIEW t2 as SELECT * FROM t1").await?; + + let df = ctx + .table("t2")? + .limit(0, Some(10))? + .select_columns(&["bool_col", "int_col"])?; + + let plan = df.explain(false, false)?.collect().await?; + // Limit is included in ParquetExec + let formatted = arrow::util::pretty::pretty_format_batches(&plan) + .unwrap() + .to_string(); + assert!(formatted.contains("ParquetExec: limit=Some(10)")); + Ok(()) + } + #[tokio::test] async fn create_view_plan() -> Result<()> { let session_ctx = SessionContext::with_config(