diff --git a/benchmarks/expected-plans/q15.txt b/benchmarks/expected-plans/q15.txt index e2f59dc5ca0f..e78f8e0d9887 100644 --- a/benchmarks/expected-plans/q15.txt +++ b/benchmarks/expected-plans/q15.txt @@ -4,8 +4,18 @@ Sort: supplier.s_suppkey ASC NULLS LAST Inner Join: revenue0.total_revenue = __sq_1.__value Inner Join: supplier.s_suppkey = revenue0.supplier_no TableScan: supplier projection=[s_suppkey, s_name, s_address, s_phone] - TableScan: revenue0 projection=[supplier_no, total_revenue] + Projection: supplier_no, total_revenue, alias=revenue0 + Projection: lineitem.l_suppkey AS supplier_no, SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount) AS total_revenue + Projection: lineitem.l_suppkey, SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount) + Aggregate: groupBy=[[lineitem.l_suppkey]], aggr=[[SUM(CAST(lineitem.l_extendedprice AS Decimal128(38, 4)) * CAST(Decimal128(Some(100),23,2) - CAST(lineitem.l_discount AS Decimal128(23, 2)) AS Decimal128(38, 4))) AS SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)]] + Filter: lineitem.l_shipdate >= Date32("9496") AND lineitem.l_shipdate < Date32("9587") + TableScan: lineitem projection=[l_suppkey, l_extendedprice, l_discount, l_shipdate] Projection: MAX(revenue0.total_revenue) AS __value, alias=__sq_1 Aggregate: groupBy=[[]], aggr=[[MAX(revenue0.total_revenue)]] - TableScan: revenue0 projection=[total_revenue] + Projection: total_revenue, alias=revenue0 + Projection: SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount) AS total_revenue + Projection: SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount) + Aggregate: groupBy=[[lineitem.l_suppkey]], aggr=[[SUM(CAST(lineitem.l_extendedprice AS Decimal128(38, 4)) * CAST(Decimal128(Some(100),23,2) - CAST(lineitem.l_discount AS Decimal128(23, 2)) AS Decimal128(38, 4))) AS SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)]] + Filter: lineitem.l_shipdate >= Date32("9496") AND lineitem.l_shipdate < Date32("9587") + TableScan: lineitem projection=[l_suppkey, l_extendedprice, l_discount, l_shipdate] EmptyRelation \ No newline at end of file diff --git a/datafusion/core/src/dataframe.rs b/datafusion/core/src/dataframe.rs index 4e9798df9ab0..a699a234cd6d 100644 --- a/datafusion/core/src/dataframe.rs +++ b/datafusion/core/src/dataframe.rs @@ -773,6 +773,10 @@ impl TableProvider for DataFrame { self } + fn get_logical_plan(&self) -> Option<&LogicalPlan> { + Some(&self.plan) + } + fn supports_filter_pushdown( &self, _filter: &Expr, @@ -1337,8 +1341,12 @@ mod tests { \n Limit: skip=0, fetch=1\ \n Sort: t1.c1 ASC NULLS FIRST, t1.c2 ASC NULLS FIRST, t1.c3 ASC NULLS FIRST, t2.c1 ASC NULLS FIRST, t2.c2 ASC NULLS FIRST, t2.c3 ASC NULLS FIRST, fetch=1\ \n Inner Join: t1.c1 = t2.c1\ - \n TableScan: t1 projection=[c1, c2, c3]\ - \n TableScan: t2 projection=[c1, c2, c3]", + \n Projection: aggregate_test_100.c1, aggregate_test_100.c2, aggregate_test_100.c3, alias=t1\ + \n Projection: aggregate_test_100.c1, aggregate_test_100.c2, aggregate_test_100.c3\ + \n TableScan: aggregate_test_100 projection=[c1, c2, c3]\ + \n Projection: aggregate_test_100.c1, aggregate_test_100.c2, aggregate_test_100.c3, alias=t2\ + \n Projection: aggregate_test_100.c1, aggregate_test_100.c2, aggregate_test_100.c3\ + \n TableScan: aggregate_test_100 projection=[c1, c2, c3]", format!("{:?}", df_renamed.to_logical_plan()?) ); diff --git a/datafusion/core/src/datasource/datasource.rs b/datafusion/core/src/datasource/datasource.rs index 27074734709d..84111fed06ca 100644 --- a/datafusion/core/src/datasource/datasource.rs +++ b/datafusion/core/src/datasource/datasource.rs @@ -21,6 +21,7 @@ use std::any::Any; use std::sync::Arc; use async_trait::async_trait; +use datafusion_expr::LogicalPlan; pub use datafusion_expr::{TableProviderFilterPushDown, TableType}; use crate::arrow::datatypes::SchemaRef; @@ -47,6 +48,11 @@ pub trait TableProvider: Sync + Send { None } + /// Get the Logical Plan of this table, if available. + fn get_logical_plan(&self) -> Option<&LogicalPlan> { + None + } + /// Create an ExecutionPlan that will scan the table. /// The table provider will be usually responsible of grouping /// the source data into partitions that can be efficiently diff --git a/datafusion/core/src/datasource/default_table_source.rs b/datafusion/core/src/datasource/default_table_source.rs index 2e65be0bcc3a..bbb9fbdd6492 100644 --- a/datafusion/core/src/datasource/default_table_source.rs +++ b/datafusion/core/src/datasource/default_table_source.rs @@ -60,6 +60,10 @@ impl TableSource for DefaultTableSource { ) -> datafusion_common::Result { self.table_provider.supports_filter_pushdown(filter) } + + fn get_logical_plan(&self) -> Option<&datafusion_expr::LogicalPlan> { + self.table_provider.get_logical_plan() + } } /// Wrap TableProvider in TableSource diff --git a/datafusion/core/src/datasource/view.rs b/datafusion/core/src/datasource/view.rs index dfe6d42f8e82..043ce256b749 100644 --- a/datafusion/core/src/datasource/view.rs +++ b/datafusion/core/src/datasource/view.rs @@ -77,6 +77,10 @@ impl TableProvider for ViewTable { self } + fn get_logical_plan(&self) -> Option<&LogicalPlan> { + Some(&self.logical_plan) + } + fn schema(&self) -> SchemaRef { Arc::clone(&self.table_schema) } diff --git a/datafusion/expr/src/table_source.rs b/datafusion/expr/src/table_source.rs index 990022bbf199..10984f779936 100644 --- a/datafusion/expr/src/table_source.rs +++ b/datafusion/expr/src/table_source.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -use crate::Expr; +use crate::{Expr, LogicalPlan}; use arrow::datatypes::SchemaRef; use std::any::Any; @@ -76,4 +76,9 @@ pub trait TableSource: Sync + Send { ) -> datafusion_common::Result { Ok(TableProviderFilterPushDown::Unsupported) } + + /// Get the Logical plan of this table provider, if available. + fn get_logical_plan(&self) -> Option<&LogicalPlan> { + None + } } diff --git a/datafusion/optimizer/src/inline_table_scan.rs b/datafusion/optimizer/src/inline_table_scan.rs new file mode 100644 index 000000000000..89c78405ae67 --- /dev/null +++ b/datafusion/optimizer/src/inline_table_scan.rs @@ -0,0 +1,180 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Optimizer rule to replace TableScan references +//! such as DataFrames and Views and inlines the LogicalPlan +//! to support further optimization +use crate::{OptimizerConfig, OptimizerRule}; +use datafusion_common::Result; +use datafusion_expr::{ + logical_plan::LogicalPlan, utils::from_plan, Expr, LogicalPlanBuilder, TableScan, +}; + +/// Optimization rule that inlines TableScan that provide a [LogicalPlan] +/// ([DataFrame] / [ViewTable]) +#[derive(Default)] +pub struct InlineTableScan; + +impl InlineTableScan { + #[allow(missing_docs)] + pub fn new() -> Self { + Self {} + } +} + +/// Inline +fn inline_table_scan(plan: &LogicalPlan) -> Result { + match plan { + // Match only on scans without filter / projection / fetch + // Views and DataFrames won't have those added + // during the early stage of planning + LogicalPlan::TableScan(TableScan { + source, + table_name, + filters, + fetch: None, + .. + }) if filters.is_empty() => { + if let Some(sub_plan) = source.get_logical_plan() { + // Recursively apply optimization + let plan = inline_table_scan(sub_plan)?; + let plan = LogicalPlanBuilder::from(plan).project_with_alias( + vec![Expr::Wildcard], + Some(table_name.to_string()), + )?; + plan.build() + } else { + // No plan available, return with table scan as is + Ok(plan.clone()) + } + } + + // Rest: Recurse + _ => { + // apply the optimization to all inputs of the plan + let inputs = plan.inputs(); + let new_inputs = inputs + .iter() + .map(|plan| inline_table_scan(plan)) + .collect::>>()?; + + from_plan(plan, &plan.expressions(), &new_inputs) + } + } +} + +impl OptimizerRule for InlineTableScan { + fn optimize( + &self, + plan: &LogicalPlan, + _optimizer_config: &mut OptimizerConfig, + ) -> Result { + inline_table_scan(plan) + } + + fn name(&self) -> &str { + "inline_table_scan" + } +} + +#[cfg(test)] +mod tests { + use std::{sync::Arc, vec}; + + use arrow::datatypes::{DataType, Field, Schema}; + use datafusion_expr::{col, lit, LogicalPlan, LogicalPlanBuilder, TableSource}; + + use crate::{inline_table_scan::InlineTableScan, OptimizerConfig, OptimizerRule}; + + pub struct RawTableSource {} + + impl TableSource for RawTableSource { + fn as_any(&self) -> &dyn std::any::Any { + self + } + + fn schema(&self) -> arrow::datatypes::SchemaRef { + Arc::new(Schema::new(vec![Field::new("a", DataType::Int64, false)])) + } + + fn supports_filter_pushdown( + &self, + _filter: &datafusion_expr::Expr, + ) -> datafusion_common::Result + { + Ok(datafusion_expr::TableProviderFilterPushDown::Inexact) + } + } + + pub struct CustomSource { + plan: LogicalPlan, + } + impl CustomSource { + fn new() -> Self { + Self { + plan: LogicalPlanBuilder::scan("y", Arc::new(RawTableSource {}), None) + .unwrap() + .build() + .unwrap(), + } + } + } + impl TableSource for CustomSource { + fn as_any(&self) -> &dyn std::any::Any { + self + } + + fn supports_filter_pushdown( + &self, + _filter: &datafusion_expr::Expr, + ) -> datafusion_common::Result + { + Ok(datafusion_expr::TableProviderFilterPushDown::Exact) + } + + fn schema(&self) -> arrow::datatypes::SchemaRef { + Arc::new(Schema::new(vec![Field::new("a", DataType::Int64, false)])) + } + + fn get_logical_plan(&self) -> Option<&LogicalPlan> { + Some(&self.plan) + } + } + + #[test] + fn inline_table_scan() { + let rule = InlineTableScan::new(); + + let source = Arc::new(CustomSource::new()); + + let scan = LogicalPlanBuilder::scan("x".to_string(), source, None).unwrap(); + + let plan = scan.filter(col("x.a").eq(lit(1))).unwrap().build().unwrap(); + + let optimized_plan = rule + .optimize(&plan, &mut OptimizerConfig::new()) + .expect("failed to optimize plan"); + let formatted_plan = format!("{:?}", optimized_plan); + let expected = "\ + Filter: x.a = Int32(1)\ + \n Projection: y.a, alias=x\ + \n TableScan: y"; + + assert_eq!(formatted_plan, expected); + assert_eq!(plan.schema(), optimized_plan.schema()); + } +} diff --git a/datafusion/optimizer/src/lib.rs b/datafusion/optimizer/src/lib.rs index 23814dcf9833..5e8108d6766e 100644 --- a/datafusion/optimizer/src/lib.rs +++ b/datafusion/optimizer/src/lib.rs @@ -23,6 +23,7 @@ pub mod eliminate_limit; pub mod expr_simplifier; pub mod filter_null_join_keys; pub mod filter_push_down; +pub mod inline_table_scan; pub mod limit_push_down; pub mod optimizer; pub mod projection_push_down; diff --git a/datafusion/optimizer/src/optimizer.rs b/datafusion/optimizer/src/optimizer.rs index 976131e047a8..7c37284e6fe8 100644 --- a/datafusion/optimizer/src/optimizer.rs +++ b/datafusion/optimizer/src/optimizer.rs @@ -24,6 +24,7 @@ use crate::eliminate_filter::EliminateFilter; use crate::eliminate_limit::EliminateLimit; use crate::filter_null_join_keys::FilterNullJoinKeys; use crate::filter_push_down::FilterPushDown; +use crate::inline_table_scan::InlineTableScan; use crate::limit_push_down::LimitPushDown; use crate::projection_push_down::ProjectionPushDown; use crate::reduce_cross_join::ReduceCrossJoin; @@ -148,6 +149,7 @@ impl Optimizer { /// Create a new optimizer using the recommended list of rules pub fn new(config: &OptimizerConfig) -> Self { let mut rules: Vec> = vec![ + Arc::new(InlineTableScan::new()), Arc::new(TypeCoercion::new()), Arc::new(SimplifyExpressions::new()), Arc::new(UnwrapCastInComparison::new()), diff --git a/datafusion/optimizer/src/projection_push_down.rs b/datafusion/optimizer/src/projection_push_down.rs index f6430b87a4a7..d6ed6e4884bc 100644 --- a/datafusion/optimizer/src/projection_push_down.rs +++ b/datafusion/optimizer/src/projection_push_down.rs @@ -527,7 +527,9 @@ fn optimize_plan( } fn projection_equal(p: &Projection, p2: &Projection) -> bool { - p.expr.len() == p2.expr.len() && p.expr.iter().zip(&p2.expr).all(|(l, r)| l == r) + p.expr.len() == p2.expr.len() + && p.alias == p2.alias + && p.expr.iter().zip(&p2.expr).all(|(l, r)| l == r) } #[cfg(test)]