Skip to content

Commit

Permalink
ARROW-11395: [DataFusion] Support custom optimizers
Browse files Browse the repository at this point in the history
This PR adds support for custom [logical] optimization rules to be injected to the `ExecutionContext`, thereby allowing people to create their own optimizers and configure the context with them, so that they run through the logical plan during the optimization step.

Another way of thinking about this is that `QueryPlanner` is now only responsible for converting a logical plan into an execution plan, while the `optimizerRule` is responsible for re-writing a logical plan into a logical plan:

* `OptimizerRule`: `LogicalPlan -> LogicalPlan`
* `QueryPlanner`: `LogicalPlan -> ExecutionPlan`

The second commit on this PR is just a small simplification that helps people writing the rules without having to worry about the `Explain`. This is important because forgetting about it has major consequences to UX (explain does not work as intended)

Closes #9333 from jorgecarleitao/split_optimizer

Authored-by: Jorge C. Leitao <jorgecarleitao@gmail.com>
Signed-off-by: Jorge C. Leitao <jorgecarleitao@gmail.com>
  • Loading branch information
jorgecarleitao committed Jan 31, 2021
1 parent f05b49b commit 77a46f2
Show file tree
Hide file tree
Showing 7 changed files with 92 additions and 90 deletions.
41 changes: 23 additions & 18 deletions rust/datafusion/src/execution/context.rs
Expand Up @@ -324,19 +324,15 @@ impl ExecutionContext {

/// Optimize the logical plan by applying optimizer rules
pub fn optimize(&self, plan: &LogicalPlan) -> Result<LogicalPlan> {
// Apply standard rewrites and optimizations
let optimizers = &self.state.lock().unwrap().config.optimizers;

let mut new_plan = plan.clone();
debug!("Logical plan:\n {:?}", plan);
let mut plan = ProjectionPushDown::new().optimize(&plan)?;
plan = FilterPushDown::new().optimize(&plan)?;
plan = HashBuildProbeOrder::new().optimize(&plan)?;
for optimizer in optimizers {
new_plan = optimizer.optimize(&new_plan)?;
}
debug!("Optimized logical plan:\n {:?}", plan);

self.state
.lock()
.unwrap()
.config
.query_planner
.rewrite_logical_plan(plan)
Ok(new_plan)
}

/// Create a physical plan from a logical plan
Expand Down Expand Up @@ -454,13 +450,6 @@ impl FunctionRegistry for ExecutionContext {

/// A planner used to add extensions to DataFusion logical and physical plans.
pub trait QueryPlanner {
/// Given a `LogicalPlan`, create a new, modified `LogicalPlan`
/// plan. This method is run after built in `OptimizerRule`s. By
/// default returns the `plan` unmodified.
fn rewrite_logical_plan(&self, plan: LogicalPlan) -> Result<LogicalPlan> {
Ok(plan)
}

/// Given a `LogicalPlan`, create an `ExecutionPlan` suitable for execution
fn create_physical_plan(
&self,
Expand Down Expand Up @@ -491,6 +480,8 @@ pub struct ExecutionConfig {
pub concurrency: usize,
/// Default batch size when reading data sources
pub batch_size: usize,
/// Responsible for optimizing a logical plan
optimizers: Vec<Arc<dyn OptimizerRule + Send + Sync>>,
/// Responsible for planning `LogicalPlan`s, and `ExecutionPlan`
query_planner: Arc<dyn QueryPlanner + Send + Sync>,
}
Expand All @@ -501,6 +492,11 @@ impl ExecutionConfig {
Self {
concurrency: num_cpus::get(),
batch_size: 32768,
optimizers: vec![
Arc::new(ProjectionPushDown::new()),
Arc::new(FilterPushDown::new()),
Arc::new(HashBuildProbeOrder::new()),
],
query_planner: Arc::new(DefaultQueryPlanner {}),
}
}
Expand Down Expand Up @@ -529,6 +525,15 @@ impl ExecutionConfig {
self.query_planner = query_planner;
self
}

/// Adds a new [`OptimizerRule`]
pub fn add_optimizer_rule(
mut self,
optimizer_rule: Arc<dyn OptimizerRule + Send + Sync>,
) -> Self {
self.optimizers.push(optimizer_rule);
self
}
}

/// Execution context for registering data sources and executing queries
Expand Down
4 changes: 2 additions & 2 deletions rust/datafusion/src/optimizer/filter_push_down.rs
Expand Up @@ -396,7 +396,7 @@ impl OptimizerRule for FilterPushDown {
"filter_push_down"
}

fn optimize(&mut self, plan: &LogicalPlan) -> Result<LogicalPlan> {
fn optimize(&self, plan: &LogicalPlan) -> Result<LogicalPlan> {
optimize(plan, State::default())
}
}
Expand Down Expand Up @@ -438,7 +438,7 @@ mod tests {
use arrow::datatypes::SchemaRef;

fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: &str) {
let mut rule = FilterPushDown::new();
let rule = FilterPushDown::new();
let optimized_plan = rule.optimize(plan).expect("failed to optimize plan");
let formatted_plan = format!("{:?}", optimized_plan);
assert_eq!(formatted_plan, expected);
Expand Down
2 changes: 1 addition & 1 deletion rust/datafusion/src/optimizer/hash_build_probe_order.rs
Expand Up @@ -98,7 +98,7 @@ impl OptimizerRule for HashBuildProbeOrder {
"hash_build_probe_order"
}

fn optimize(&mut self, plan: &LogicalPlan) -> Result<LogicalPlan> {
fn optimize(&self, plan: &LogicalPlan) -> Result<LogicalPlan> {
match plan {
// Main optimization rule, swaps order of left and right
// based on number of rows in each table
Expand Down
17 changes: 1 addition & 16 deletions rust/datafusion/src/optimizer/optimizer.rs
Expand Up @@ -17,29 +17,14 @@

//! Query optimizer traits

use super::utils;
use crate::error::Result;
use crate::logical_plan::LogicalPlan;

/// An optimizer rules performs a transformation on a logical plan to produce an optimized
/// logical plan.
pub trait OptimizerRule {
/// Perform optimizations on the plan
fn optimize(&mut self, plan: &LogicalPlan) -> Result<LogicalPlan>;
fn optimize(&self, plan: &LogicalPlan) -> Result<LogicalPlan>;
/// Produce a human readable name for this optimizer rule
fn name(&self) -> &str;

/// Convenience rule for writing optimizers: recursively invoke
/// optimize on plan's children and then return a node of the same
/// type. Useful for optimizer rules which want to leave the type
/// of plan unchanged but still apply to the children.
fn optimize_children(&mut self, plan: &LogicalPlan) -> Result<LogicalPlan> {
let new_exprs = utils::expressions(&plan);
let new_inputs = utils::inputs(&plan)
.into_iter()
.map(|plan| self.optimize(plan))
.collect::<Result<Vec<_>>>()?;

utils::from_plan(plan, &new_exprs, &new_inputs)
}
}
6 changes: 3 additions & 3 deletions rust/datafusion/src/optimizer/projection_push_down.rs
Expand Up @@ -32,7 +32,7 @@ use utils::optimize_explain;
pub struct ProjectionPushDown {}

impl OptimizerRule for ProjectionPushDown {
fn optimize(&mut self, plan: &LogicalPlan) -> Result<LogicalPlan> {
fn optimize(&self, plan: &LogicalPlan) -> Result<LogicalPlan> {
// set of all columns refered by the plan (and thus considered required by the root)
let required_columns = plan
.schema()
Expand Down Expand Up @@ -108,7 +108,7 @@ fn get_projected_schema(

/// Recursively transverses the logical plan removing expressions and that are not needed.
fn optimize_plan(
optimizer: &mut ProjectionPushDown,
optimizer: &ProjectionPushDown,
plan: &LogicalPlan,
required_columns: &HashSet<String>, // set of columns required up to this step
has_projection: bool,
Expand Down Expand Up @@ -525,7 +525,7 @@ mod tests {
}

fn optimize(plan: &LogicalPlan) -> Result<LogicalPlan> {
let mut rule = ProjectionPushDown::new();
let rule = ProjectionPushDown::new();
rule.optimize(plan)
}
}
42 changes: 38 additions & 4 deletions rust/datafusion/src/optimizer/utils.rs
Expand Up @@ -95,7 +95,7 @@ pub fn expr_to_column_names(expr: &Expr, accum: &mut HashSet<String>) -> Result<
/// Create a `LogicalPlan::Explain` node by running `optimizer` on the
/// input plan and capturing the resulting plan string
pub fn optimize_explain(
optimizer: &mut impl OptimizerRule,
optimizer: &impl OptimizerRule,
verbose: bool,
plan: &LogicalPlan,
stringified_plans: &Vec<StringifiedPlan>,
Expand All @@ -119,6 +119,40 @@ pub fn optimize_explain(
})
}

/// Convenience rule for writing optimizers: recursively invoke
/// optimize on plan's children and then return a node of the same
/// type. Useful for optimizer rules which want to leave the type
/// of plan unchanged but still apply to the children.
/// This also handles the case when the `plan` is a [`LogicalPlan::Explain`].
pub fn optimize_children(
optimizer: &impl OptimizerRule,
plan: &LogicalPlan,
) -> Result<LogicalPlan> {
if let LogicalPlan::Explain {
verbose,
plan,
stringified_plans,
schema,
} = plan
{
return optimize_explain(
optimizer,
*verbose,
&*plan,
stringified_plans,
&schema.as_ref().to_owned().into(),
);
}

let new_exprs = expressions(&plan);
let new_inputs = inputs(&plan)
.into_iter()
.map(|plan| optimizer.optimize(plan))
.collect::<Result<Vec<_>>>()?;

from_plan(plan, &new_exprs, &new_inputs)
}

/// returns all expressions (non-recursively) in the current logical plan node.
pub fn expressions(plan: &LogicalPlan) -> Vec<Expr> {
match plan {
Expand Down Expand Up @@ -446,7 +480,7 @@ mod tests {
struct TestOptimizer {}

impl OptimizerRule for TestOptimizer {
fn optimize(&mut self, plan: &LogicalPlan) -> Result<LogicalPlan> {
fn optimize(&self, plan: &LogicalPlan) -> Result<LogicalPlan> {
Ok(plan.clone())
}

Expand All @@ -457,13 +491,13 @@ mod tests {

#[test]
fn test_optimize_explain() -> Result<()> {
let mut optimizer = TestOptimizer {};
let optimizer = TestOptimizer {};

let empty_plan = LogicalPlanBuilder::empty(false).build()?;
let schema = LogicalPlan::explain_schema();

let optimized_explain = optimize_explain(
&mut optimizer,
&optimizer,
true,
&empty_plan,
&vec![StringifiedPlan::new(PlanType::LogicalPlan, "...")],
Expand Down
70 changes: 24 additions & 46 deletions rust/datafusion/tests/user_defined_plan.rs
Expand Up @@ -72,7 +72,7 @@ use datafusion::{
execution::context::ExecutionContextState,
execution::context::QueryPlanner,
logical_plan::{Expr, LogicalPlan, UserDefinedLogicalNode},
optimizer::{optimizer::OptimizerRule, utils::optimize_explain},
optimizer::{optimizer::OptimizerRule, utils::optimize_children},
physical_plan::{
planner::{DefaultPhysicalPlanner, ExtensionPlanner},
Distribution, ExecutionPlan, Partitioning, PhysicalPlanner, RecordBatchStream,
Expand Down Expand Up @@ -178,7 +178,9 @@ async fn topk_plan() -> Result<()> {
}

fn make_topk_context() -> ExecutionContext {
let config = ExecutionConfig::new().with_query_planner(Arc::new(TopKQueryPlanner {}));
let config = ExecutionConfig::new()
.with_query_planner(Arc::new(TopKQueryPlanner {}))
.add_optimizer_rule(Arc::new(TopKOptimizerRule {}));

ExecutionContext::with_config(config)
}
Expand All @@ -188,10 +190,6 @@ fn make_topk_context() -> ExecutionContext {
struct TopKQueryPlanner {}

impl QueryPlanner for TopKQueryPlanner {
fn rewrite_logical_plan(&self, plan: LogicalPlan) -> Result<LogicalPlan> {
TopKOptimizerRule {}.optimize(&plan)
}

/// Given a `LogicalPlan` created from above, create an
/// `ExecutionPlan` suitable for execution
fn create_physical_plan(
Expand All @@ -210,52 +208,32 @@ impl QueryPlanner for TopKQueryPlanner {
struct TopKOptimizerRule {}
impl OptimizerRule for TopKOptimizerRule {
// Example rewrite pass to insert a user defined LogicalPlanNode
fn optimize(&mut self, plan: &LogicalPlan) -> Result<LogicalPlan> {
match plan {
// Note: this code simply looks for the pattern of a Limit followed by a
// Sort and replaces it by a TopK node. It does not handle many
// edge cases (e.g multiple sort columns, sort ASC / DESC), etc.
LogicalPlan::Limit { ref n, ref input } => {
if let LogicalPlan::Sort {
ref expr,
ref input,
} = **input
{
if expr.len() == 1 {
// we found a sort with a single sort expr, replace with a a TopK
return Ok(LogicalPlan::Extension {
node: Arc::new(TopKPlanNode {
k: *n,
input: self.optimize(input.as_ref())?,
expr: expr[0].clone(),
}),
});
}
fn optimize(&self, plan: &LogicalPlan) -> Result<LogicalPlan> {
// Note: this code simply looks for the pattern of a Limit followed by a
// Sort and replaces it by a TopK node. It does not handle many
// edge cases (e.g multiple sort columns, sort ASC / DESC), etc.
if let LogicalPlan::Limit { ref n, ref input } = plan {
if let LogicalPlan::Sort {
ref expr,
ref input,
} = **input
{
if expr.len() == 1 {
// we found a sort with a single sort expr, replace with a a TopK
return Ok(LogicalPlan::Extension {
node: Arc::new(TopKPlanNode {
k: *n,
input: self.optimize(input.as_ref())?,
expr: expr[0].clone(),
}),
});
}
}
// Due to the way explain is implemented, in order to get
// explain functionality we need to explicitly handle it
// here.
LogicalPlan::Explain {
verbose,
plan,
stringified_plans,
schema,
} => {
return optimize_explain(
self,
*verbose,
&*plan,
stringified_plans,
&schema.as_ref().to_owned().into(),
)
}
_ => {}
}

// If we didn't find the Limit/Sort combination, recurse as
// normal and build the result.
self.optimize_children(plan)
optimize_children(self, plan)
}

fn name(&self) -> &str {
Expand Down

0 comments on commit 77a46f2

Please sign in to comment.