Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 46 additions & 9 deletions datafusion/core/src/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@ use crate::physical_plan::SendableRecordBatchStream;
use crate::physical_plan::{collect, collect_partitioned};
use crate::physical_plan::{execute_stream, execute_stream_partitioned, ExecutionPlan};
use crate::prelude::SessionContext;
use crate::scalar::ScalarValue;
use async_trait::async_trait;
use datafusion_common::{Column, DFSchema};
use datafusion_expr::TableProviderFilterPushDown;
use parking_lot::RwLock;
use parquet::file::properties::WriterProperties;
use std::any::Any;
Expand Down Expand Up @@ -773,6 +773,14 @@ impl TableProvider for DataFrame {
self
}

fn supports_filter_pushdown(
&self,
_filter: &Expr,
) -> Result<TableProviderFilterPushDown> {
// A filter is added on the DataFrame when given
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

Ok(TableProviderFilterPushDown::Exact)
}

fn schema(&self) -> SchemaRef {
let schema: Schema = self.plan.schema().as_ref().into();
Arc::new(schema)
Expand All @@ -789,7 +797,7 @@ impl TableProvider for DataFrame {
filters: &[Expr],
limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
let expr = projection
let mut expr = projection
.as_ref()
// construct projections
.map_or_else(
Expand All @@ -806,12 +814,12 @@ impl TableProvider for DataFrame {
.collect::<Vec<_>>();
self.select_columns(names.as_slice())
},
)?
// add predicates, otherwise use `true` as the predicate
.filter(filters.iter().cloned().fold(
Expr::Literal(ScalarValue::Boolean(Some(true))),
|acc, new| acc.and(new),
))?;
)?;
// Add filter when given
let filter = filters.iter().cloned().reduce(|acc, new| acc.and(new));
if let Some(filter) = filter {
expr = expr.filter(filter)?
}
// add a limit if given
Self::new(
self.session_state.clone(),
Expand All @@ -830,9 +838,10 @@ mod tests {
use std::vec;

use super::*;
use crate::execution::options::CsvReadOptions;
use crate::execution::options::{CsvReadOptions, ParquetReadOptions};
use crate::physical_plan::ColumnarValue;
use crate::test_util;
use crate::test_util::parquet_test_data;
use crate::{assert_batches_sorted_eq, execution::context::SessionContext};
use arrow::array::Int32Array;
use arrow::datatypes::DataType;
Expand Down Expand Up @@ -1349,6 +1358,34 @@ mod tests {
Ok(())
}

#[tokio::test]
async fn filter_pushdown_dataframe() -> Result<()> {
let ctx = SessionContext::new();

ctx.register_parquet(
"test",
&format!("{}/alltypes_plain.snappy.parquet", parquet_test_data()),
ParquetReadOptions::default(),
)
.await?;

ctx.register_table("t1", ctx.table("test")?)?;

let df = ctx
.table("t1")?
.filter(col("id").eq(lit(1)))?
.select_columns(&["bool_col", "int_col"])?;

let plan = df.explain(false, false)?.collect().await?;
// Filters all the way to Parquet
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🎉

let formatted = arrow::util::pretty::pretty_format_batches(&plan)
.unwrap()
.to_string();
assert!(formatted.contains("predicate=id_min@0 <= 1 AND 1 <= id_max@1"));

Ok(())
}

#[tokio::test]
async fn cast_expr_test() -> Result<()> {
let df = test_table()
Expand Down
99 changes: 90 additions & 9 deletions datafusion/core/src/datasource/view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use std::{any::Any, sync::Arc};

use arrow::datatypes::SchemaRef;
use async_trait::async_trait;
use datafusion_expr::LogicalPlanBuilder;
use datafusion_expr::{LogicalPlanBuilder, TableProviderFilterPushDown};

use crate::{
error::Result,
Expand Down Expand Up @@ -89,22 +89,30 @@ impl TableProvider for ViewTable {
self.definition.as_deref()
}

fn supports_filter_pushdown(
&self,
_filter: &Expr,
) -> Result<TableProviderFilterPushDown> {
// A filter is added on the View when given
Ok(TableProviderFilterPushDown::Exact)
}

async fn scan(
&self,
state: &SessionState,
projection: &Option<Vec<usize>>,
_filters: &[Expr],
_limit: Option<usize>,
filters: &[Expr],
limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
// clone state and start_execution so that now() works in views
let mut state_cloned = state.clone();
state_cloned.execution_props.start_execution();
if let Some(projection) = projection {
let plan = if let Some(projection) = projection {
// avoiding adding a redundant projection (e.g. SELECT * FROM view)
let current_projection =
(0..self.logical_plan.schema().fields().len()).collect::<Vec<usize>>();
if projection == &current_projection {
state_cloned.create_physical_plan(&self.logical_plan).await
self.logical_plan().clone()
} else {
let fields: Vec<Expr> = projection
.iter()
Expand All @@ -114,20 +122,35 @@ impl TableProvider for ViewTable {
)
})
.collect();
let plan = LogicalPlanBuilder::from(self.logical_plan.clone())
LogicalPlanBuilder::from(self.logical_plan.clone())
.project(fields)?
.build()?;
state_cloned.create_physical_plan(&plan).await
.build()?
}
} else {
state_cloned.create_physical_plan(&self.logical_plan).await
self.logical_plan().clone()
};
let mut plan = LogicalPlanBuilder::from(plan);
let filter = filters.iter().cloned().reduce(|acc, new| acc.and(new));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could use conjunction here and above too if you wanted : https://github.com/apache/arrow-datafusion/blob/master/datafusion/optimizer/src/utils.rs#L123-L147

it just does the same thing, so I don't feel strongly


if let Some(filter) = filter {
plan = plan.filter(filter)?;
}

if let Some(limit) = limit {
plan = plan.limit(0, Some(limit))?;
}

state_cloned.create_physical_plan(&plan.build()?).await
}
}

#[cfg(test)]
mod tests {
use datafusion_expr::{col, lit};

use crate::execution::options::ParquetReadOptions;
use crate::prelude::SessionContext;
use crate::test_util::parquet_test_data;
use crate::{assert_batches_eq, execution::context::SessionConfig};

use super::*;
Expand Down Expand Up @@ -393,6 +416,64 @@ mod tests {
Ok(())
}

#[tokio::test]
async fn filter_pushdown_view() -> Result<()> {
let ctx = SessionContext::new();

ctx.register_parquet(
"test",
&format!("{}/alltypes_plain.snappy.parquet", parquet_test_data()),
ParquetReadOptions::default(),
)
.await?;

ctx.register_table("t1", ctx.table("test")?)?;

ctx.sql("CREATE VIEW t2 as SELECT * FROM t1").await?;

let df = ctx
.table("t2")?
.filter(col("id").eq(lit(1)))?
.select_columns(&["bool_col", "int_col"])?;

let plan = df.explain(false, false)?.collect().await?;
// Filters all the way to Parquet
let formatted = arrow::util::pretty::pretty_format_batches(&plan)
.unwrap()
.to_string();
assert!(formatted.contains("predicate=id_min@0 <= 1 AND 1 <= id_max@1"));
Ok(())
}

#[tokio::test]
async fn limit_pushdown_view() -> Result<()> {
let ctx = SessionContext::new();

ctx.register_parquet(
"test",
&format!("{}/alltypes_plain.snappy.parquet", parquet_test_data()),
ParquetReadOptions::default(),
)
.await?;

ctx.register_table("t1", ctx.table("test")?)?;

ctx.sql("CREATE VIEW t2 as SELECT * FROM t1").await?;

let df = ctx
.table("t2")?
.limit(0, Some(10))?
.select_columns(&["bool_col", "int_col"])?;

let plan = df.explain(false, false)?.collect().await?;
// Limit is included in ParquetExec
let formatted = arrow::util::pretty::pretty_format_batches(&plan)
.unwrap()
.to_string();
assert!(formatted.contains("ParquetExec: limit=Some(10)"));
Ok(())
}

#[tokio::test]
async fn create_view_plan() -> Result<()> {
let session_ctx = SessionContext::with_config(
Expand Down