Skip to content

Commit

Permalink
Support SubqueryAlias in optimizer-executor.
Browse files Browse the repository at this point in the history
  • Loading branch information
jackwener committed Nov 21, 2022
1 parent bcd6248 commit 42acada
Show file tree
Hide file tree
Showing 2 changed files with 98 additions and 110 deletions.
7 changes: 1 addition & 6 deletions datafusion/core/src/physical_plan/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -969,12 +969,7 @@ impl DefaultPhysicalPlanner {
SchemaRef::new(schema.as_ref().to_owned().into()),
))),
LogicalPlan::SubqueryAlias(SubqueryAlias { input, .. }) => {
match input.as_ref() {
LogicalPlan::TableScan(..) => {
self.create_initial_plan(input, session_state).await
}
_ => Err(DataFusionError::Plan("SubqueryAlias should only wrap TableScan".to_string()))
}
self.create_initial_plan(input, session_state).await
}
LogicalPlan::Limit(Limit { input, skip, fetch, .. }) => {
let input = self.create_initial_plan(input, session_state).await?;
Expand Down
201 changes: 97 additions & 104 deletions datafusion/optimizer/src/projection_push_down.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
//! loaded into memory

use crate::{OptimizerConfig, OptimizerRule};
use arrow::datatypes::{Field, Schema};
use arrow::datatypes::Field;
use arrow::error::Result as ArrowResult;
use datafusion_common::{
Column, DFField, DFSchema, DFSchemaRef, DataFusionError, Result, ToDFSchema,
Expand All @@ -34,6 +34,7 @@ use datafusion_expr::{
utils::{expr_to_columns, exprlist_to_columns, find_sort_exprs, from_plan},
Expr,
};
use std::collections::HashMap;
use std::{
collections::{BTreeSet, HashSet},
sync::Arc,
Expand Down Expand Up @@ -72,60 +73,6 @@ impl ProjectionPushDown {
}
}

fn get_projected_schema(
table_name: Option<&String>,
schema: &Schema,
required_columns: &HashSet<Column>,
has_projection: bool,
) -> Result<(Vec<usize>, DFSchemaRef)> {
// once we reach the table scan, we can use the accumulated set of column
// names to construct the set of column indexes in the scan
//
// we discard non-existing columns because some column names are not part of the schema,
// e.g. when the column derives from an aggregation
//
// Use BTreeSet to remove potential duplicates (e.g. union) as
// well as to sort the projection to ensure deterministic behavior
let mut projection: BTreeSet<usize> = required_columns
.iter()
.filter(|c| c.relation.is_none() || c.relation.as_ref() == table_name)
.map(|c| schema.index_of(&c.name))
.filter_map(ArrowResult::ok)
.collect();

if projection.is_empty() {
if has_projection && !schema.fields().is_empty() {
// Ensure that we are reading at least one column from the table in case the query
// does not reference any columns directly such as "SELECT COUNT(1) FROM table",
// except when the table is empty (no column)
projection.insert(0);
} else {
// for table scan without projection, we default to return all columns
projection = schema
.fields()
.iter()
.enumerate()
.map(|(i, _)| i)
.collect::<BTreeSet<usize>>();
}
}

// create the projected schema
let projected_fields: Vec<DFField> = match table_name {
Some(qualifier) => projection
.iter()
.map(|i| DFField::from_qualified(qualifier, schema.fields()[*i].clone()))
.collect(),
None => projection
.iter()
.map(|i| DFField::from(schema.fields()[*i].clone()))
.collect(),
};

let projection = projection.into_iter().collect::<Vec<_>>();
Ok((projection, projected_fields.to_dfschema_ref()?))
}

/// Recursively transverses the logical plan removing expressions and that are not needed.
fn optimize_plan(
_optimizer: &ProjectionPushDown,
Expand Down Expand Up @@ -348,28 +295,8 @@ fn optimize_plan(
}
// scans:
// * remove un-used columns from the scan projection
LogicalPlan::TableScan(TableScan {
table_name,
source,
filters,
fetch: limit,
..
}) => {
let (projection, projected_schema) = get_projected_schema(
Some(table_name),
&source.schema(),
required_columns,
has_projection,
)?;
// return the table scan with projection
Ok(LogicalPlan::TableScan(TableScan {
table_name: table_name.clone(),
source: source.clone(),
projection: Some(projection),
projected_schema,
filters: filters.clone(),
fetch: *limit,
}))
LogicalPlan::TableScan(scan) => {
push_down_scan(scan, &new_required_columns, has_projection)
}
LogicalPlan::Explain { .. } => Err(DataFusionError::Internal(
"Unsupported logical plan: Explain must be root of the plan".to_string(),
Expand Down Expand Up @@ -441,32 +368,16 @@ fn optimize_plan(
}))
}
LogicalPlan::SubqueryAlias(SubqueryAlias { input, alias, .. }) => {
match input.as_ref() {
LogicalPlan::TableScan(TableScan { table_name, .. }) => {
let new_required_columns = new_required_columns
.iter()
.map(|c| match &c.relation {
Some(q) if q == alias => Column {
relation: Some(table_name.clone()),
name: c.name.clone(),
},
_ => c.clone(),
})
.collect();
let new_inputs = vec![optimize_plan(
_optimizer,
input,
&new_required_columns,
has_projection,
_optimizer_config,
)?];
let expr = vec![];
from_plan(plan, &expr, &new_inputs)
}
_ => Err(DataFusionError::Plan(
"SubqueryAlias should only wrap TableScan".to_string(),
)),
}
let new_required_columns =
replace_alias(required_columns, alias, input.schema());
let child = optimize_plan(
_optimizer,
input,
&new_required_columns,
has_projection,
_optimizer_config,
)?;
from_plan(plan, &plan.expressions(), &[child])
}
// all other nodes: Add any additional columns used by
// expressions in this node to the list of required columns
Expand Down Expand Up @@ -532,11 +443,93 @@ fn projection_equal(p: &Projection, p2: &Projection) -> bool {
&& p.expr.iter().zip(&p2.expr).all(|(l, r)| l == r)
}

fn replace_alias(
required_columns: &HashSet<Column>,
alias: &str,
input_schema: &DFSchemaRef,
) -> HashSet<Column> {
let mut map = HashMap::new();
for field in input_schema.fields() {
let col = field.qualified_column();
let alias_col = Column {
relation: Some(alias.to_owned()),
name: col.name.clone(),
};
map.insert(alias_col, col);
}
required_columns
.iter()
.map(|col| map.get(col).unwrap_or(col).clone())
.collect::<HashSet<_>>()
}

fn push_down_scan(
scan: &TableScan,
required_columns: &HashSet<Column>,
has_projection: bool,
) -> Result<LogicalPlan> {
// once we reach the table scan, we can use the accumulated set of column
// names to construct the set of column indexes in the scan
//
// we discard non-existing columns because some column names are not part of the schema,
// e.g. when the column derives from an aggregation
//
// Use BTreeSet to remove potential duplicates (e.g. union) as
// well as to sort the projection to ensure deterministic behavior
let schema = scan.source.schema();
let mut projection: BTreeSet<usize> = required_columns
.iter()
.filter(|c| {
c.relation.is_none() || c.relation.as_ref().unwrap() == &scan.table_name
})
.map(|c| schema.index_of(&c.name))
.filter_map(ArrowResult::ok)
.collect();

if projection.is_empty() {
if has_projection && !schema.fields().is_empty() {
// Ensure that we are reading at least one column from the table in case the query
// does not reference any columns directly such as "SELECT COUNT(1) FROM table",
// except when the table is empty (no column)
projection.insert(0);
} else {
// for table scan without projection, we default to return all columns
projection = scan
.source
.schema()
.fields()
.iter()
.enumerate()
.map(|(i, _)| i)
.collect::<BTreeSet<usize>>();
}
}

// create the projected schema
let projected_fields: Vec<DFField> = projection
.iter()
.map(|i| DFField::from_qualified(&scan.table_name, schema.fields()[*i].clone()))
.collect();

let projection = projection.into_iter().collect::<Vec<_>>();
let projected_schema = projected_fields.to_dfschema_ref()?;

// return the table scan with projection
Ok(LogicalPlan::TableScan(TableScan {
table_name: scan.table_name.clone(),
source: scan.source.clone(),
projection: Some(projection),
projected_schema,
filters: scan.filters.clone(),
fetch: scan.fetch,
}))
}

#[cfg(test)]
mod tests {
use super::*;
use crate::test::*;
use arrow::datatypes::DataType;
use arrow::datatypes::{DataType, Schema};
use datafusion_expr::expr::Cast;
use datafusion_expr::{
col, count, lit,
Expand Down

0 comments on commit 42acada

Please sign in to comment.