From 42acadafee7ad7bb3a075738e7c42085598c54d5 Mon Sep 17 00:00:00 2001 From: jackwener Date: Sat, 19 Nov 2022 20:19:06 +0800 Subject: [PATCH] Support `SubqueryAlias` in optimizer-executor. --- datafusion/core/src/physical_plan/planner.rs | 7 +- .../optimizer/src/projection_push_down.rs | 201 +++++++++--------- 2 files changed, 98 insertions(+), 110 deletions(-) diff --git a/datafusion/core/src/physical_plan/planner.rs b/datafusion/core/src/physical_plan/planner.rs index b30b8ca82d1e..683df0d37976 100644 --- a/datafusion/core/src/physical_plan/planner.rs +++ b/datafusion/core/src/physical_plan/planner.rs @@ -969,12 +969,7 @@ impl DefaultPhysicalPlanner { SchemaRef::new(schema.as_ref().to_owned().into()), ))), LogicalPlan::SubqueryAlias(SubqueryAlias { input, .. }) => { - match input.as_ref() { - LogicalPlan::TableScan(..) => { - self.create_initial_plan(input, session_state).await - } - _ => Err(DataFusionError::Plan("SubqueryAlias should only wrap TableScan".to_string())) - } + self.create_initial_plan(input, session_state).await } LogicalPlan::Limit(Limit { input, skip, fetch, .. }) => { let input = self.create_initial_plan(input, session_state).await?; diff --git a/datafusion/optimizer/src/projection_push_down.rs b/datafusion/optimizer/src/projection_push_down.rs index 5a44247eabc1..916071bd5675 100644 --- a/datafusion/optimizer/src/projection_push_down.rs +++ b/datafusion/optimizer/src/projection_push_down.rs @@ -19,7 +19,7 @@ //! loaded into memory use crate::{OptimizerConfig, OptimizerRule}; -use arrow::datatypes::{Field, Schema}; +use arrow::datatypes::Field; use arrow::error::Result as ArrowResult; use datafusion_common::{ Column, DFField, DFSchema, DFSchemaRef, DataFusionError, Result, ToDFSchema, @@ -34,6 +34,7 @@ use datafusion_expr::{ utils::{expr_to_columns, exprlist_to_columns, find_sort_exprs, from_plan}, Expr, }; +use std::collections::HashMap; use std::{ collections::{BTreeSet, HashSet}, sync::Arc, @@ -72,60 +73,6 @@ impl ProjectionPushDown { } } -fn get_projected_schema( - table_name: Option<&String>, - schema: &Schema, - required_columns: &HashSet, - has_projection: bool, -) -> Result<(Vec, DFSchemaRef)> { - // once we reach the table scan, we can use the accumulated set of column - // names to construct the set of column indexes in the scan - // - // we discard non-existing columns because some column names are not part of the schema, - // e.g. when the column derives from an aggregation - // - // Use BTreeSet to remove potential duplicates (e.g. union) as - // well as to sort the projection to ensure deterministic behavior - let mut projection: BTreeSet = required_columns - .iter() - .filter(|c| c.relation.is_none() || c.relation.as_ref() == table_name) - .map(|c| schema.index_of(&c.name)) - .filter_map(ArrowResult::ok) - .collect(); - - if projection.is_empty() { - if has_projection && !schema.fields().is_empty() { - // Ensure that we are reading at least one column from the table in case the query - // does not reference any columns directly such as "SELECT COUNT(1) FROM table", - // except when the table is empty (no column) - projection.insert(0); - } else { - // for table scan without projection, we default to return all columns - projection = schema - .fields() - .iter() - .enumerate() - .map(|(i, _)| i) - .collect::>(); - } - } - - // create the projected schema - let projected_fields: Vec = match table_name { - Some(qualifier) => projection - .iter() - .map(|i| DFField::from_qualified(qualifier, schema.fields()[*i].clone())) - .collect(), - None => projection - .iter() - .map(|i| DFField::from(schema.fields()[*i].clone())) - .collect(), - }; - - let projection = projection.into_iter().collect::>(); - Ok((projection, projected_fields.to_dfschema_ref()?)) -} - /// Recursively transverses the logical plan removing expressions and that are not needed. fn optimize_plan( _optimizer: &ProjectionPushDown, @@ -348,28 +295,8 @@ fn optimize_plan( } // scans: // * remove un-used columns from the scan projection - LogicalPlan::TableScan(TableScan { - table_name, - source, - filters, - fetch: limit, - .. - }) => { - let (projection, projected_schema) = get_projected_schema( - Some(table_name), - &source.schema(), - required_columns, - has_projection, - )?; - // return the table scan with projection - Ok(LogicalPlan::TableScan(TableScan { - table_name: table_name.clone(), - source: source.clone(), - projection: Some(projection), - projected_schema, - filters: filters.clone(), - fetch: *limit, - })) + LogicalPlan::TableScan(scan) => { + push_down_scan(scan, &new_required_columns, has_projection) } LogicalPlan::Explain { .. } => Err(DataFusionError::Internal( "Unsupported logical plan: Explain must be root of the plan".to_string(), @@ -441,32 +368,16 @@ fn optimize_plan( })) } LogicalPlan::SubqueryAlias(SubqueryAlias { input, alias, .. }) => { - match input.as_ref() { - LogicalPlan::TableScan(TableScan { table_name, .. }) => { - let new_required_columns = new_required_columns - .iter() - .map(|c| match &c.relation { - Some(q) if q == alias => Column { - relation: Some(table_name.clone()), - name: c.name.clone(), - }, - _ => c.clone(), - }) - .collect(); - let new_inputs = vec![optimize_plan( - _optimizer, - input, - &new_required_columns, - has_projection, - _optimizer_config, - )?]; - let expr = vec![]; - from_plan(plan, &expr, &new_inputs) - } - _ => Err(DataFusionError::Plan( - "SubqueryAlias should only wrap TableScan".to_string(), - )), - } + let new_required_columns = + replace_alias(required_columns, alias, input.schema()); + let child = optimize_plan( + _optimizer, + input, + &new_required_columns, + has_projection, + _optimizer_config, + )?; + from_plan(plan, &plan.expressions(), &[child]) } // all other nodes: Add any additional columns used by // expressions in this node to the list of required columns @@ -532,11 +443,93 @@ fn projection_equal(p: &Projection, p2: &Projection) -> bool { && p.expr.iter().zip(&p2.expr).all(|(l, r)| l == r) } +fn replace_alias( + required_columns: &HashSet, + alias: &str, + input_schema: &DFSchemaRef, +) -> HashSet { + let mut map = HashMap::new(); + for field in input_schema.fields() { + let col = field.qualified_column(); + let alias_col = Column { + relation: Some(alias.to_owned()), + name: col.name.clone(), + }; + map.insert(alias_col, col); + } + required_columns + .iter() + .map(|col| map.get(col).unwrap_or(col).clone()) + .collect::>() +} + +fn push_down_scan( + scan: &TableScan, + required_columns: &HashSet, + has_projection: bool, +) -> Result { + // once we reach the table scan, we can use the accumulated set of column + // names to construct the set of column indexes in the scan + // + // we discard non-existing columns because some column names are not part of the schema, + // e.g. when the column derives from an aggregation + // + // Use BTreeSet to remove potential duplicates (e.g. union) as + // well as to sort the projection to ensure deterministic behavior + let schema = scan.source.schema(); + let mut projection: BTreeSet = required_columns + .iter() + .filter(|c| { + c.relation.is_none() || c.relation.as_ref().unwrap() == &scan.table_name + }) + .map(|c| schema.index_of(&c.name)) + .filter_map(ArrowResult::ok) + .collect(); + + if projection.is_empty() { + if has_projection && !schema.fields().is_empty() { + // Ensure that we are reading at least one column from the table in case the query + // does not reference any columns directly such as "SELECT COUNT(1) FROM table", + // except when the table is empty (no column) + projection.insert(0); + } else { + // for table scan without projection, we default to return all columns + projection = scan + .source + .schema() + .fields() + .iter() + .enumerate() + .map(|(i, _)| i) + .collect::>(); + } + } + + // create the projected schema + let projected_fields: Vec = projection + .iter() + .map(|i| DFField::from_qualified(&scan.table_name, schema.fields()[*i].clone())) + .collect(); + + let projection = projection.into_iter().collect::>(); + let projected_schema = projected_fields.to_dfschema_ref()?; + + // return the table scan with projection + Ok(LogicalPlan::TableScan(TableScan { + table_name: scan.table_name.clone(), + source: scan.source.clone(), + projection: Some(projection), + projected_schema, + filters: scan.filters.clone(), + fetch: scan.fetch, + })) +} + #[cfg(test)] mod tests { use super::*; use crate::test::*; - use arrow::datatypes::DataType; + use arrow::datatypes::{DataType, Schema}; use datafusion_expr::expr::Cast; use datafusion_expr::{ col, count, lit,