Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Avoid copying LogicalPlans / Exprs during OptimizerPasses #9708

Closed
wants to merge 10 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions datafusion/core/src/dataframe/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ impl DataFrame {

/// Consume the DataFrame and produce a physical plan
pub async fn create_physical_plan(self) -> Result<Arc<dyn ExecutionPlan>> {
self.session_state.create_physical_plan(&self.plan).await
self.session_state.create_physical_plan(self.plan).await
}

/// Filter the DataFrame by column. Returns a new DataFrame only containing the
Expand Down Expand Up @@ -989,7 +989,7 @@ impl DataFrame {
/// [`Self::into_optimized_plan`] for more details.
pub fn into_optimized_plan(self) -> Result<LogicalPlan> {
// Optimize the plan first for better UX
self.session_state.optimize(&self.plan)
self.session_state.optimize(self.plan)
}

/// Converts this [`DataFrame`] into a [`TableProvider`] that can be registered
Expand Down Expand Up @@ -1466,7 +1466,7 @@ impl TableProvider for DataFrameTableProvider {
expr = expr.limit(0, Some(l))?
}
let plan = expr.build()?;
state.create_physical_plan(&plan).await
state.create_physical_plan(plan).await
}
}

Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/datasource/view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ impl TableProvider for ViewTable {
plan = plan.limit(0, Some(limit))?;
}

state.create_physical_plan(&plan.build()?).await
state.create_physical_plan(plan.build()?).await
}
}

Expand Down
46 changes: 30 additions & 16 deletions datafusion/core/src/execution/context/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ use datafusion_sql::{

use async_trait::async_trait;
use chrono::{DateTime, Utc};
use datafusion_common::tree_node::Transformed;
use parking_lot::RwLock;
use sqlparser::dialect::dialect_from_str;
use url::Url;
Expand Down Expand Up @@ -530,7 +531,7 @@ impl SessionContext {
} = cmd;

let input = Arc::try_unwrap(input).unwrap_or_else(|e| e.as_ref().clone());
let input = self.state().optimize(&input)?;
let input = self.state().optimize(input)?;
let table = self.table(&name).await;
match (if_not_exists, or_replace, table) {
(true, false, Ok(_)) => self.return_empty_dataframe(),
Expand Down Expand Up @@ -1839,13 +1840,22 @@ impl SessionState {
}

/// Optimizes the logical plan by applying optimizer rules.
pub fn optimize(&self, plan: &LogicalPlan) -> Result<LogicalPlan> {
if let LogicalPlan::Explain(e) = plan {
let mut stringified_plans = e.stringified_plans.clone();
pub fn optimize(&self, plan: LogicalPlan) -> Result<LogicalPlan> {
if let LogicalPlan::Explain(Explain {
verbose,
plan,
mut stringified_plans,
schema,
logical_optimization_succeeded,
}) = plan
{
// TODO this could be a dummy plan
let original_plan = plan.clone(); // keep original plan in case there is an error

// analyze & capture output of each rule
// TODO avoid this copy
let analyzer_result = self.analyzer.execute_and_check(
e.plan.as_ref(),
&plan,
self.options(),
|analyzed_plan, analyzer| {
let analyzer_name = analyzer.name().to_string();
Expand All @@ -1861,10 +1871,10 @@ impl SessionState {
.push(StringifiedPlan::new(plan_type, err.to_string()));

return Ok(LogicalPlan::Explain(Explain {
verbose: e.verbose,
plan: e.plan.clone(),
verbose,
plan,
stringified_plans,
schema: e.schema.clone(),
schema,
logical_optimization_succeeded: false,
}));
}
Expand All @@ -1877,37 +1887,41 @@ impl SessionState {

// optimize the child plan, capturing the output of each optimizer
let optimized_plan = self.optimizer.optimize(
&analyzed_plan,
analyzed_plan,
self,
|optimized_plan, optimizer| {
let optimizer_name = optimizer.name().to_string();
let plan_type = PlanType::OptimizedLogicalPlan { optimizer_name };
stringified_plans.push(optimized_plan.to_stringified(plan_type));
},
);

let (plan, logical_optimization_succeeded) = match optimized_plan {
Ok(plan) => (Arc::new(plan), true),
Ok(plan) => (Arc::new(plan.data), true),
Err(DataFusionError::Context(optimizer_name, err)) => {
// TODO show explain error
let plan_type = PlanType::OptimizedLogicalPlan { optimizer_name };
stringified_plans
.push(StringifiedPlan::new(plan_type, err.to_string()));
(e.plan.clone(), false)
(original_plan, false)
}
Err(e) => return Err(e),
};

Ok(LogicalPlan::Explain(Explain {
verbose: e.verbose,
verbose,
plan,
stringified_plans,
schema: e.schema.clone(),
schema,
logical_optimization_succeeded,
}))
} else {
let analyzed_plan =
self.analyzer
.execute_and_check(plan, self.options(), |_, _| {})?;
self.optimizer.optimize(&analyzed_plan, self, |_, _| {})
.execute_and_check(&plan, self.options(), |_, _| {})?;
self.optimizer
.optimize(analyzed_plan, self, |_, _| {})
.map(|t| t.data)
}
}

Expand All @@ -1920,7 +1934,7 @@ impl SessionState {
/// DDL `CREATE TABLE` must be handled by another layer.
pub async fn create_physical_plan(
&self,
logical_plan: &LogicalPlan,
logical_plan: LogicalPlan,
) -> Result<Arc<dyn ExecutionPlan>> {
let logical_plan = self.optimize(logical_plan)?;
self.query_planner
Expand Down
Loading
Loading