Skip to content

Commit

Permalink
make plan_from_tables return one plan instead of Vec (#4336)
Browse files Browse the repository at this point in the history
  • Loading branch information
jackwener committed Nov 23, 2022
1 parent 03f85f1 commit 72a4487
Showing 1 changed file with 47 additions and 60 deletions.
107 changes: 47 additions & 60 deletions datafusion/sql/src/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -632,16 +632,27 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {

fn plan_from_tables(
&self,
from: Vec<TableWithJoins>,
mut from: Vec<TableWithJoins>,
ctes: &mut HashMap<String, LogicalPlan>,
outer_query_schema: Option<&DFSchema>,
) -> Result<Vec<LogicalPlan>> {
) -> Result<LogicalPlan> {
match from.len() {
0 => Ok(vec![LogicalPlanBuilder::empty(true).build()?]),
_ => from
.into_iter()
.map(|t| self.plan_table_with_joins(t, ctes, outer_query_schema))
.collect::<Result<Vec<_>>>(),
0 => Ok(LogicalPlanBuilder::empty(true).build()?),
1 => {
let from = from.remove(0);
self.plan_table_with_joins(from, ctes, outer_query_schema)
}
_ => {
let plans = from
.into_iter()
.map(|t| self.plan_table_with_joins(t, ctes, outer_query_schema))
.collect::<Result<Vec<_>>>()?;
let mut left = plans[0].clone();
for right in plans.iter().skip(1) {
left = LogicalPlanBuilder::from(left).cross_join(right)?.build()?;
}
Ok(left)
}
}
}

Expand Down Expand Up @@ -944,34 +955,16 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
fn plan_selection(
&self,
selection: Option<SQLExpr>,
plans: Vec<LogicalPlan>,
plan: LogicalPlan,
outer_query_schema: Option<&DFSchema>,
ctes: &mut HashMap<String, LogicalPlan>,
) -> Result<LogicalPlan> {
let cross_join_plan = if plans.len() == 1 {
plans[0].clone()
} else {
let mut left = plans[0].clone();
for right in plans.iter().skip(1) {
left = LogicalPlanBuilder::from(left).cross_join(right)?.build()?;
}
left
};
match selection {
Some(predicate_expr) => {
let mut fields = vec![];
let mut metadata = HashMap::new();
for plan in &plans {
fields.extend_from_slice(plan.schema().fields());
metadata.extend(plan.schema().metadata().clone());
}

let mut join_schema = DFSchema::new_with_metadata(fields, metadata)?;
let mut join_schema = (**plan.schema()).clone();
let mut all_schemas: Vec<DFSchemaRef> = vec![];
for plan in plans {
for schema in plan.all_schemas() {
all_schemas.push(schema.clone());
}
for schema in plan.all_schemas() {
all_schemas.push(schema.clone());
}
if let Some(outer) = outer_query_schema {
all_schemas.push(Arc::new(outer.clone()));
Expand All @@ -990,10 +983,10 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {

Ok(LogicalPlan::Filter(Filter::try_new(
filter_expr,
Arc::new(cross_join_plan),
Arc::new(plan),
)?))
}
None => Ok(cross_join_plan),
None => Ok(plan),
}
}

Expand All @@ -1020,45 +1013,39 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
}

// process `from` clause
let plans = self.plan_from_tables(select.from, ctes, outer_query_schema)?;
let empty_from = matches!(plans.first(), Some(LogicalPlan::EmptyRelation(_)));
let plan = self.plan_from_tables(select.from, ctes, outer_query_schema)?;
let empty_from = matches!(plan, LogicalPlan::EmptyRelation(_));
// build from schema for unqualifier column ambiguous check
// we should get only one field for this unqualifier column from schema.
let from_schema = {
let mut fields = vec![];
let mut metadata = std::collections::HashMap::new();
for plan in &plans {
if let LogicalPlan::Join(HashJoin {
join_constraint: HashJoinConstraint::Using,
on,
left,
..
}) = plan
{
// For query: select id from t1 join t2 using(id), this is legal.
// We should dedup the fields for cols in using clause.
let mut plan_fields = plan.schema().fields().clone();
for join_cols in on.iter() {
let left_field = left.schema().field_from_column(&join_cols.0)?;
plan_fields.retain(|field| {
field.unqualified_column().name
!= left_field.unqualified_column().name
});
plan_fields.push(left_field.clone());
}
fields.extend_from_slice(&plan_fields);
metadata.extend(plan.schema().metadata().clone());
} else {
fields.extend_from_slice(plan.schema().fields());
metadata.extend(plan.schema().metadata().clone());
let mut fields = plan.schema().fields().clone();

let metadata = plan.schema().metadata().clone();
if let LogicalPlan::Join(HashJoin {
join_constraint: HashJoinConstraint::Using,
ref on,
ref left,
..
}) = plan
{
// For query: select id from t1 join t2 using(id), this is legal.
// We should dedup the fields for cols in using clause.
for join_cols in on.iter() {
let left_field = left.schema().field_from_column(&join_cols.0)?;
fields.retain(|field| {
field.unqualified_column().name
!= left_field.unqualified_column().name
});
fields.push(left_field.clone());
}
}

DFSchema::new_with_metadata(fields, metadata)?
};

// process `where` clause
let plan =
self.plan_selection(select.selection, plans, outer_query_schema, ctes)?;
self.plan_selection(select.selection, plan, outer_query_schema, ctes)?;

// process the SELECT expressions, with wildcards expanded.
let select_exprs = self.prepare_select_exprs(
Expand Down

0 comments on commit 72a4487

Please sign in to comment.