Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Avoid copying LogicalPlans / Exprs during OptimizerPasses #9708

Closed
wants to merge 10 commits into from
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions datafusion/core/src/execution/context/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1877,7 +1877,7 @@ impl SessionState {

// optimize the child plan, capturing the output of each optimizer
let optimized_plan = self.optimizer.optimize(
&analyzed_plan,
analyzed_plan,
self,
|optimized_plan, optimizer| {
let optimizer_name = optimizer.name().to_string();
Expand Down Expand Up @@ -1907,7 +1907,7 @@ impl SessionState {
let analyzed_plan =
self.analyzer
.execute_and_check(plan, self.options(), |_, _| {})?;
self.optimizer.optimize(&analyzed_plan, self, |_, _| {})
self.optimizer.optimize(analyzed_plan, self, |_, _| {})
}
}

Expand Down
15 changes: 15 additions & 0 deletions datafusion/expr/src/logical_plan/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -361,6 +361,12 @@ impl LogicalPlan {
}
}

/// takes all inputs of this plan, unwrapping them if they are
/// not shared
pub fn take_inputs(&self) -> Vec<LogicalPlan> {
todo!()
}

/// returns all inputs of this `LogicalPlan` node. Does not
/// include inputs to inputs, or subqueries.
pub fn inputs(&self) -> Vec<&LogicalPlan> {
Expand Down Expand Up @@ -517,6 +523,15 @@ impl LogicalPlan {
self.with_new_exprs(self.expressions(), inputs.to_vec())
}

/// returns a new LogicalPlan with the new inputs (potentially rewritten)
///
pub fn with_new_inputs2(
mut self,
new_inputs: Vec<LogicalPlan>,
) -> Result<LogicalPlan> {
todo!()
}

/// Returns a new `LogicalPlan` based on `self` with inputs and
/// expressions replaced.
///
Expand Down
168 changes: 109 additions & 59 deletions datafusion/optimizer/src/optimizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,11 @@ use crate::utils::log_plan;
use datafusion_common::alias::AliasGenerator;
use datafusion_common::config::ConfigOptions;
use datafusion_common::instant::Instant;
use datafusion_common::{DataFusionError, Result};
use datafusion_common::{not_impl_err, DFSchema, DataFusionError, Result};
use datafusion_expr::logical_plan::LogicalPlan;

use chrono::{DateTime, Utc};
use datafusion_common::tree_node::Transformed;
use log::{debug, warn};

/// `OptimizerRule` transforms one [`LogicalPlan`] into another which
Expand Down Expand Up @@ -85,6 +86,20 @@ pub trait OptimizerRule {
fn apply_order(&self) -> Option<ApplyOrder> {
None
}

/// does this rule support rewriting owned plans (to reduce copying)?
fn supports_owned(&self) -> bool {
false
}

/// if supports_owned returns true, calls try_optimize_owned
fn try_optimize_owned(
&self,
_plan: LogicalPlan,
_config: &dyn OptimizerConfig,
) -> Result<Transformed<LogicalPlan>, DataFusionError> {
not_impl_err!("try_optimized_owned is not implemented for this rule")
}
}

/// Options to control the DataFusion Optimizer.
Expand Down Expand Up @@ -279,10 +294,10 @@ impl Optimizer {
/// invoking observer function after each call
pub fn optimize<F>(
&self,
plan: &LogicalPlan,
plan: LogicalPlan,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rewriting the optimizer passes to take in owned LogicalPlan is the key idea

I haven't plumbed it completely through yet, but the approach is looking promising

config: &dyn OptimizerConfig,
mut observer: F,
) -> Result<LogicalPlan>
) -> Result<Transformed<LogicalPlan>>
where
F: FnMut(&LogicalPlan, &dyn OptimizerRule),
{
Expand All @@ -291,6 +306,7 @@ impl Optimizer {

let start_time = Instant::now();

let mut transformed = false;
let mut previous_plans = HashSet::with_capacity(16);
previous_plans.insert(LogicalPlanSignature::new(&new_plan));

Expand All @@ -299,44 +315,54 @@ impl Optimizer {
log_plan(&format!("Optimizer input (pass {i})"), &new_plan);

for rule in &self.rules {
// if we are skipping failed rules, we need to keep a copy of the plan in case the optimizer fails
let prev_plan = if options.optimizer.skip_failed_rules {
Some(new_plan.clone())
} else {
None
};

let orig_schema = plan.schema().clone();

let result =
self.optimize_recursively(rule, &new_plan, config)
self.optimize_recursively(rule, new_plan, config)
.and_then(|plan| {
if let Some(plan) = &plan {
assert_schema_is_the_same(rule.name(), plan, &new_plan)?;
}
assert_has_schema(rule.name(), &orig_schema, &new_plan)?;
Ok(plan)
});
match result {
Ok(Some(plan)) => {
new_plan = plan;

match (result, prev_plan) {
(Ok(t), _) if t.transformed => {
transformed = true;
new_plan = t.data;
observer(&new_plan, rule.as_ref());
log_plan(rule.name(), &new_plan);
}
Ok(None) => {
(Ok(t), _) if !t.transformed => {
new_plan = t.data;
observer(&new_plan, rule.as_ref());
debug!(
"Plan unchanged by optimizer rule '{}' (pass {})",
rule.name(),
i
);
}
Err(e) => {
if options.optimizer.skip_failed_rules {
// Note to future readers: if you see this warning it signals a
// bug in the DataFusion optimizer. Please consider filing a ticket
// https://github.com/apache/arrow-datafusion
warn!(

(Err(e), Some(prev_plan)) => {
// Note to future readers: if you see this warning it signals a
// bug in the DataFusion optimizer. Please consider filing a ticket
// https://github.com/apache/arrow-datafusion
warn!(
"Skipping optimizer rule '{}' due to unexpected error: {}",
rule.name(),
e
);
} else {
return Err(DataFusionError::Context(
format!("Optimizer rule '{}' failed", rule.name(),),
Box::new(e),
));
}
}
(Err(e), None) => {
return Err(DataFusionError::Context(
format!("Optimizer rule '{}' failed", rule.name(),),
Box::new(e),
));
}
}
}
Expand All @@ -354,60 +380,78 @@ impl Optimizer {
}
log_plan("Final optimized plan", &new_plan);
debug!("Optimizer took {} ms", start_time.elapsed().as_millis());
Ok(new_plan)
Ok(if transformed {
Transformed::yes(new_plan)
} else {
Transformed::no(new_plan)
})
}

fn optimize_node(
&self,
rule: &Arc<dyn OptimizerRule + Send + Sync>,
plan: &LogicalPlan,
plan: LogicalPlan,
config: &dyn OptimizerConfig,
) -> Result<Option<LogicalPlan>> {
// TODO: future feature: We can do Batch optimize
rule.try_optimize(plan, config)
) -> Result<Transformed<LogicalPlan>> {
if rule.supports_owned() {
rule.try_optimize_owned(plan, config)
} else {
// TODO: future feature: We can do Batch optimize
rule.try_optimize(&plan, config)
.map(|opt| {
if let Some(opt_plan) = opt {
Transformed::yes(opt_plan)
} else {
// return original plan
Transformed::no(plan)
}
})
}
}

fn optimize_inputs(
&self,
rule: &Arc<dyn OptimizerRule + Send + Sync>,
plan: &LogicalPlan,
mut plan: LogicalPlan,
config: &dyn OptimizerConfig,
) -> Result<Option<LogicalPlan>> {
let inputs = plan.inputs();
let result = inputs
.iter()
) -> Result<Transformed<LogicalPlan>> {
let inputs = plan.take_inputs();

let new_inputs = inputs
.into_iter()
.map(|sub_plan| self.optimize_recursively(rule, sub_plan, config))
.collect::<Result<Vec<_>>>()?;
if result.is_empty() || result.iter().all(|o| o.is_none()) {
return Ok(None);
}

let new_inputs = result
.into_iter()
.zip(inputs)
.map(|(new_plan, old_plan)| match new_plan {
Some(plan) => plan,
None => old_plan.clone(),
})
.collect();
let transformed = new_inputs.iter().any(|t| t.transformed);
let new_inputs = new_inputs.into_iter().map(|t| t.data).collect();

let plan = plan.with_new_inputs2(new_inputs)?;

let exprs = plan.expressions();
plan.with_new_exprs(exprs, new_inputs).map(Some)
Ok(if transformed {
Transformed::yes(plan)
} else {
Transformed::no(plan)
})
}

/// Use a rule to optimize the whole plan.
/// If the rule with `ApplyOrder`, we don't need to recursively handle children in rule.
pub fn optimize_recursively(
&self,
rule: &Arc<dyn OptimizerRule + Send + Sync>,
plan: &LogicalPlan,
plan: LogicalPlan,
config: &dyn OptimizerConfig,
) -> Result<Option<LogicalPlan>> {
) -> Result<Transformed<LogicalPlan>> {
match rule.apply_order() {
Some(order) => match order {
ApplyOrder::TopDown => {
let optimize_self_opt = self.optimize_node(rule, plan, config)?;
let optimize_inputs_opt = match &optimize_self_opt {
let optimized_plan = self.optimize_node(rule, plan, config)?;
let transformed = optimized_plan.transformed;

// TODO make a nicer 'and_then' type API on Transformed
let optimized_plan = self.optimize_inputs(rule, optimized_plan.data, config)?;

let let optimize_inputs_opt = match &optimize_self_opt {
Some(optimized_plan) => {
self.optimize_inputs(rule, optimized_plan, config)?
}
Expand All @@ -431,22 +475,17 @@ impl Optimizer {
}
}

/// Returns an error if plans have different schemas.
///
/// It ignores metadata and nullability.
pub(crate) fn assert_schema_is_the_same(
pub(crate) fn assert_has_schema(
rule_name: &str,
prev_plan: &LogicalPlan,
schema: &DFSchema,
new_plan: &LogicalPlan,
) -> Result<()> {
let equivalent = new_plan
.schema()
.equivalent_names_and_types(prev_plan.schema());
let equivalent = new_plan.schema().equivalent_names_and_types(schema);

if !equivalent {
let e = DataFusionError::Internal(format!(
"Failed due to a difference in schemas, original schema: {:?}, new schema: {:?}",
prev_plan.schema(),
schema,
new_plan.schema()
));
Err(DataFusionError::Context(
Expand All @@ -458,6 +497,17 @@ pub(crate) fn assert_schema_is_the_same(
}
}

/// Returns an error if plans have different schemas.
///
/// It ignores metadata and nullability.
pub(crate) fn assert_schema_is_the_same(
rule_name: &str,
prev_plan: &LogicalPlan,
new_plan: &LogicalPlan,
) -> Result<()> {
assert_has_schema(rule_name, prev_plan.schema(), new_plan)
}

#[cfg(test)]
mod tests {
use std::sync::{Arc, Mutex};
Expand Down
16 changes: 15 additions & 1 deletion datafusion/optimizer/src/simplify_expressions/simplify_exprs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@

use std::sync::Arc;

use datafusion_common::{DFSchema, DFSchemaRef, Result};
use datafusion_common::tree_node::Transformed;
use datafusion_common::{DFSchema, DFSchemaRef, DataFusionError, Result};
use datafusion_expr::execution_props::ExecutionProps;
use datafusion_expr::logical_plan::LogicalPlan;
use datafusion_expr::simplify::SimplifyContext;
Expand Down Expand Up @@ -59,6 +60,19 @@ impl OptimizerRule for SimplifyExpressions {
execution_props.query_execution_start_time = config.query_execution_start_time();
Ok(Some(Self::optimize_internal(plan, &execution_props)?))
}

fn supports_owned(&self) -> bool {
true
}

/// if supports_owned returns true, calls try_optimize_owned
fn try_optimize_owned(
&self,
_plan: LogicalPlan,
_config: &dyn OptimizerConfig,
) -> Result<Transformed<LogicalPlan>, DataFusionError> {
todo!();
}
}

impl SimplifyExpressions {
Expand Down
Loading