Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions datafusion/catalog/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
// under the License.

use std::any::Any;
use std::borrow::Cow;
use std::sync::Arc;

use crate::session::Session;
Expand Down Expand Up @@ -56,8 +57,8 @@ pub trait TableProvider: Sync + Send {
None
}

/// Get the [`LogicalPlan`] of this table, if available
fn get_logical_plan(&self) -> Option<&LogicalPlan> {
/// Get the [`LogicalPlan`] of this table, if available.
fn get_logical_plan(&self) -> Option<Cow<LogicalPlan>> {
None
}

Expand Down
5 changes: 3 additions & 2 deletions datafusion/core/src/dataframe/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
mod parquet;

use std::any::Any;
use std::borrow::Cow;
use std::collections::HashMap;
use std::sync::Arc;

Expand Down Expand Up @@ -1648,8 +1649,8 @@ impl TableProvider for DataFrameTableProvider {
self
}

fn get_logical_plan(&self) -> Option<&LogicalPlan> {
Some(&self.plan)
fn get_logical_plan(&self) -> Option<Cow<LogicalPlan>> {
Some(Cow::Borrowed(&self.plan))
}

fn supports_filters_pushdown(
Expand Down
4 changes: 2 additions & 2 deletions datafusion/core/src/datasource/cte_worktable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@

//! CteWorkTable implementation used for recursive queries

use std::any::Any;
use std::sync::Arc;
use std::{any::Any, borrow::Cow};

use arrow::datatypes::SchemaRef;
use async_trait::async_trait;
Expand Down Expand Up @@ -63,7 +63,7 @@ impl TableProvider for CteWorkTable {
self
}

fn get_logical_plan(&self) -> Option<&LogicalPlan> {
fn get_logical_plan(&self) -> Option<Cow<LogicalPlan>> {
None
}

Expand Down
4 changes: 2 additions & 2 deletions datafusion/core/src/datasource/default_table_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@

//! Default TableSource implementation used in DataFusion physical plans

use std::any::Any;
use std::sync::Arc;
use std::{any::Any, borrow::Cow};

use crate::datasource::TableProvider;

Expand Down Expand Up @@ -70,7 +70,7 @@ impl TableSource for DefaultTableSource {
self.table_provider.supports_filters_pushdown(filter)
}

fn get_logical_plan(&self) -> Option<&datafusion_expr::LogicalPlan> {
fn get_logical_plan(&self) -> Option<Cow<datafusion_expr::LogicalPlan>> {
self.table_provider.get_logical_plan()
}

Expand Down
6 changes: 3 additions & 3 deletions datafusion/core/src/datasource/view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

//! View data source which uses a LogicalPlan as it's input.

use std::{any::Any, sync::Arc};
use std::{any::Any, borrow::Cow, sync::Arc};

use crate::{
error::Result,
Expand Down Expand Up @@ -90,8 +90,8 @@ impl TableProvider for ViewTable {
self
}

fn get_logical_plan(&self) -> Option<&LogicalPlan> {
Some(&self.logical_plan)
fn get_logical_plan(&self) -> Option<Cow<LogicalPlan>> {
Some(Cow::Borrowed(&self.logical_plan))
}

fn schema(&self) -> SchemaRef {
Expand Down
4 changes: 2 additions & 2 deletions datafusion/expr/src/table_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use crate::{Expr, LogicalPlan};
use arrow::datatypes::SchemaRef;
use datafusion_common::{Constraints, Result};

use std::any::Any;
use std::{any::Any, borrow::Cow};

/// Indicates how a filter expression is handled by
/// [`TableProvider::scan`].
Expand Down Expand Up @@ -122,7 +122,7 @@ pub trait TableSource: Sync + Send {
}

/// Get the Logical plan of this table provider, if available.
fn get_logical_plan(&self) -> Option<&LogicalPlan> {
fn get_logical_plan(&self) -> Option<Cow<LogicalPlan>> {
None
}

Expand Down
43 changes: 21 additions & 22 deletions datafusion/optimizer/src/analyzer/inline_table_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use datafusion_common::config::ConfigOptions;
use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
use datafusion_common::{Column, Result};
use datafusion_expr::expr::WildcardOptions;
use datafusion_expr::{logical_plan::LogicalPlan, Expr, LogicalPlanBuilder, TableScan};
use datafusion_expr::{logical_plan::LogicalPlan, Expr, LogicalPlanBuilder};

/// Analyzed rule that inlines TableScan that provide a [`LogicalPlan`]
/// (DataFrame / ViewTable)
Expand Down Expand Up @@ -56,24 +56,23 @@ fn analyze_internal(plan: LogicalPlan) -> Result<Transformed<LogicalPlan>> {
match plan {
// Match only on scans without filter / projection / fetch
// Views and DataFrames won't have those added
// during the early stage of planning
LogicalPlan::TableScan(TableScan {
table_name,
source,
projection,
filters,
..
}) if filters.is_empty() && source.get_logical_plan().is_some() => {
let sub_plan = source.get_logical_plan().unwrap();
let projection_exprs = generate_projection_expr(&projection, sub_plan)?;
LogicalPlanBuilder::from(sub_plan.clone())
.project(projection_exprs)?
// Ensures that the reference to the inlined table remains the
// same, meaning we don't have to change any of the parent nodes
// that reference this table.
.alias(table_name)?
.build()
.map(Transformed::yes)
// during the early stage of planning.
LogicalPlan::TableScan(table_scan) if table_scan.filters.is_empty() => {
if let Some(sub_plan) = table_scan.source.get_logical_plan() {
let sub_plan = sub_plan.into_owned();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

let projection_exprs =
generate_projection_expr(&table_scan.projection, &sub_plan)?;
LogicalPlanBuilder::from(sub_plan)
.project(projection_exprs)?
// Ensures that the reference to the inlined table remains the
// same, meaning we don't have to change any of the parent nodes
// that reference this table.
.alias(table_scan.table_name)?
.build()
.map(Transformed::yes)
} else {
Ok(Transformed::no(LogicalPlan::TableScan(table_scan)))
}
}
_ => Ok(Transformed::no(plan)),
}
Expand Down Expand Up @@ -104,7 +103,7 @@ fn generate_projection_expr(

#[cfg(test)]
mod tests {
use std::{sync::Arc, vec};
use std::{borrow::Cow, sync::Arc, vec};

use crate::analyzer::inline_table_scan::InlineTableScan;
use crate::test::assert_analyzed_plan_eq;
Expand Down Expand Up @@ -167,8 +166,8 @@ mod tests {
Arc::new(Schema::new(vec![Field::new("a", DataType::Int64, false)]))
}

fn get_logical_plan(&self) -> Option<&LogicalPlan> {
Some(&self.plan)
fn get_logical_plan(&self) -> Option<Cow<LogicalPlan>> {
Some(Cow::Borrowed(&self.plan))
}
}

Expand Down