Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 57 additions & 17 deletions datafusion/optimizer/src/scalar_subquery_to_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use crate::utils::{
};
use crate::{utils, OptimizerConfig, OptimizerRule};
use datafusion_common::{context, plan_err, Column, Result};
use datafusion_expr::logical_plan::{Aggregate, Filter, JoinType, Projection, Subquery};
use datafusion_expr::logical_plan::{Filter, JoinType, Limit, Subquery};
use datafusion_expr::{combine_filters, Expr, LogicalPlan, LogicalPlanBuilder, Operator};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea here is that if the particular subquery isn't supported by the physical operators, it will error out at that time?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea here is that if the particular subquery isn't supported by the physical operators, it will error out at that time?

Yes, exactly.

use log::debug;
use std::sync::Arc;
Expand Down Expand Up @@ -104,22 +104,29 @@ impl OptimizerRule for ScalarSubqueryToJoin {

if subqueries.is_empty() {
// regular filter, no subquery exists clause here
let optimized_plan = LogicalPlan::Filter(Filter {
return Ok(LogicalPlan::Filter(Filter {
predicate: predicate.clone(),
input: Arc::new(optimized_input),
});
return Ok(optimized_plan);
}));
}

// iterate through all subqueries in predicate, turning each into a join
let mut cur_input = (**input).clone();
for subquery in subqueries {
cur_input = optimize_scalar(
if let Some(optimized_subquery) = optimize_scalar(
&subquery,
&cur_input,
&other_exprs,
optimizer_config,
)?;
)? {
cur_input = optimized_subquery;
} else {
// if we can't handle all of the subqueries then bail for now
return Ok(LogicalPlan::Filter(Filter {
predicate: predicate.clone(),
input: Arc::new(optimized_input),
}));
}
}
Ok(cur_input)
}
Expand Down Expand Up @@ -176,21 +183,47 @@ fn optimize_scalar(
filter_input: &LogicalPlan,
outer_others: &[Expr],
optimizer_config: &mut OptimizerConfig,
) -> Result<LogicalPlan> {
) -> Result<Option<LogicalPlan>> {
let subquery = query_info.query.subquery.as_ref();
debug!(
"optimizing:\n{}",
query_info.query.subquery.display_indent()
"optimizing:
{}",
subquery.display_indent()
);
let proj = Projection::try_from_plan(&query_info.query.subquery)
.map_err(|e| context!("scalar subqueries must have a projection", e))?;
let proj = match &subquery {
LogicalPlan::Projection(proj) => proj,
LogicalPlan::Limit(Limit {
skip: 0,
fetch: Some(1),
..
}) => return plan_err!("Scalar subqueries with LIMIT 1 are not yet supported"),
_ => {
// this rule does not support this type of scalar subquery
debug!(
"cannot translate this type of scalar subquery to a join: {}",
subquery.display_indent()
);
return Ok(None);
}
};
let proj = only_or_err(proj.expr.as_slice())
.map_err(|e| context!("exactly one expression should be projected", e))?;
let proj = Expr::Alias(Box::new(proj.clone()), "__value".to_string());
let sub_inputs = query_info.query.subquery.inputs();
let sub_inputs = subquery.inputs();
let sub_input = only_or_err(sub_inputs.as_slice())
.map_err(|e| context!("Exactly one input is expected. Is this a join?", e))?;
let aggr = Aggregate::try_from_plan(sub_input)
.map_err(|e| context!("scalar subqueries must aggregate a value", e))?;

let aggr = match sub_input {
LogicalPlan::Aggregate(aggr) => aggr,
_ => {
// this rule does not support this type of scalar subquery
debug!(
"cannot translate this type of scalar subquery to a join: {}",
subquery.display_indent()
);
return Ok(None);
}
};
let filter = Filter::try_from_plan(&aggr.input).ok();

// if there were filters, we use that logical plan, otherwise the plan from the aggregate
Expand Down Expand Up @@ -286,7 +319,7 @@ fn optimize_scalar(
}
let new_plan = new_plan.build()?;

Ok(new_plan)
Ok(Some(new_plan))
}

struct SubqueryInfo {
Expand Down Expand Up @@ -579,8 +612,15 @@ mod tests {
.project(vec![col("customer.c_custkey")])?
.build()?;

let expected = r#"scalar subqueries must have a projection"#;
assert_optimizer_err(&ScalarSubqueryToJoin::new(), &plan, expected);
// we expect the plan to be unchanged because this subquery is not supported by this rule
let expected = r#"Projection: customer.c_custkey [c_custkey:Int64]
Filter: customer.c_custkey = (<subquery>) [c_custkey:Int64, c_name:Utf8]
Subquery: [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]
Filter: customer.c_custkey = orders.o_custkey [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]
TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]
TableScan: customer [c_custkey:Int64, c_name:Utf8]"#;

assert_optimized_plan_eq(&ScalarSubqueryToJoin::new(), &plan, expected);
Ok(())
}

Expand Down