Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 62 additions & 7 deletions datafusion-examples/examples/rewrite_expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,11 @@
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use datafusion_common::config::ConfigOptions;
use datafusion_common::tree_node::{Transformed, TreeNode};
use datafusion_common::{DataFusionError, Result};
use datafusion_common::{DataFusionError, Result, ScalarValue};
use datafusion_expr::{
AggregateUDF, Between, Expr, Filter, LogicalPlan, ScalarUDF, TableSource,
};
use datafusion_optimizer::analyzer::{Analyzer, AnalyzerRule};
use datafusion_optimizer::optimizer::Optimizer;
use datafusion_optimizer::{utils, OptimizerConfig, OptimizerContext, OptimizerRule};
use datafusion_sql::planner::{ContextProvider, SqlToRel};
Expand All @@ -46,10 +47,18 @@ pub fn main() -> Result<()> {
logical_plan.display_indent()
);

// now run the optimizer with our custom rule
let optimizer = Optimizer::with_rules(vec![Arc::new(MyRule {})]);
// run the analyzer with our custom rule
let config = OptimizerContext::default().with_skip_failing_rules(false);
let optimized_plan = optimizer.optimize(&logical_plan, &config, observe)?;
let analyzer = Analyzer::with_rules(vec![Arc::new(MyAnalyzerRule {})]);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

let analyzed_plan = analyzer.execute_and_check(&logical_plan, config.options())?;
println!(
"Analyzed Logical Plan:\n\n{}\n",
analyzed_plan.display_indent()
);

// then run the optimizer with our custom rule
let optimizer = Optimizer::with_rules(vec![Arc::new(MyOptimizerRule {})]);
let optimized_plan = optimizer.optimize(&analyzed_plan, &config, observe)?;
println!(
"Optimized Logical Plan:\n\n{}\n",
optimized_plan.display_indent()
Expand All @@ -66,11 +75,57 @@ fn observe(plan: &LogicalPlan, rule: &dyn OptimizerRule) {
)
}

struct MyRule {}
/// An example analyzer rule that changes Int64 literals to UInt64
struct MyAnalyzerRule {}

impl AnalyzerRule for MyAnalyzerRule {
fn analyze(&self, plan: LogicalPlan, _config: &ConfigOptions) -> Result<LogicalPlan> {
Self::analyze_plan(plan)
}

fn name(&self) -> &str {
"my_analyzer_rule"
}
}

impl MyAnalyzerRule {
fn analyze_plan(plan: LogicalPlan) -> Result<LogicalPlan> {
plan.transform(&|plan| {
Ok(match plan {
LogicalPlan::Filter(filter) => {
let predicate = Self::analyze_expr(filter.predicate.clone())?;
Transformed::Yes(LogicalPlan::Filter(Filter::try_new(
predicate,
filter.input,
)?))
}
_ => Transformed::No(plan),
})
})
}

fn analyze_expr(expr: Expr) -> Result<Expr> {
expr.transform(&|expr| {
// closure is invoked for all sub expressions
Ok(match expr {
Expr::Literal(ScalarValue::Int64(i)) => {
// transform to UInt64
Transformed::Yes(Expr::Literal(ScalarValue::UInt64(
i.map(|i| i as u64),
)))
}
_ => Transformed::No(expr),
})
})
}
}

/// An example optimizer rule that rewrite BETWEEN expression to binary compare expressions
struct MyOptimizerRule {}

impl OptimizerRule for MyRule {
impl OptimizerRule for MyOptimizerRule {
fn name(&self) -> &str {
"my_rule"
"my_optimizer_rule"
}

fn try_optimize(
Expand Down
34 changes: 31 additions & 3 deletions datafusion/core/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,10 @@ use crate::physical_optimizer::global_sort_selection::GlobalSortSelection;
use crate::physical_optimizer::pipeline_checker::PipelineChecker;
use crate::physical_optimizer::pipeline_fixer::PipelineFixer;
use crate::physical_optimizer::sort_enforcement::EnforceSorting;
use datafusion_optimizer::OptimizerConfig;
use datafusion_optimizer::{
analyzer::{Analyzer, AnalyzerRule},
OptimizerConfig,
};
use datafusion_sql::planner::object_name_to_table_reference;
use uuid::Uuid;

Expand Down Expand Up @@ -1198,6 +1201,8 @@ impl QueryPlanner for DefaultQueryPlanner {
pub struct SessionState {
/// UUID for the session
session_id: String,
/// Responsible for analyzing and rewrite a logical plan before optimization
analyzer: Analyzer,
/// Responsible for optimizing a logical plan
optimizer: Optimizer,
/// Responsible for optimizing a physical execution plan
Expand Down Expand Up @@ -1336,6 +1341,7 @@ impl SessionState {

SessionState {
session_id,
analyzer: Analyzer::new(),
optimizer: Optimizer::new(),
physical_optimizers,
query_planner: Arc::new(DefaultQueryPlanner {}),
Expand Down Expand Up @@ -1448,6 +1454,15 @@ impl SessionState {
self
}

/// Replace the analyzer rules
pub fn with_analyzer_rules(
mut self,
rules: Vec<Arc<dyn AnalyzerRule + Send + Sync>>,
) -> Self {
self.analyzer = Analyzer::with_rules(rules);
self
}

/// Replace the optimizer rules
pub fn with_optimizer_rules(
mut self,
Expand All @@ -1466,6 +1481,15 @@ impl SessionState {
self
}

/// Adds a new [`AnalyzerRule`]
pub fn add_analyzer_rule(
mut self,
analyzer_rule: Arc<dyn AnalyzerRule + Send + Sync>,
) -> Self {
self.analyzer.rules.push(analyzer_rule);
self
}

/// Adds a new [`OptimizerRule`]
pub fn add_optimizer_rule(
mut self,
Expand Down Expand Up @@ -1639,9 +1663,12 @@ impl SessionState {
if let LogicalPlan::Explain(e) = plan {
let mut stringified_plans = e.stringified_plans.clone();

let analyzed_plan = self
.analyzer
.execute_and_check(e.plan.as_ref(), self.options())?;
// optimize the child plan, capturing the output of each optimizer
let (plan, logical_optimization_succeeded) = match self.optimizer.optimize(
e.plan.as_ref(),
&analyzed_plan,
self,
|optimized_plan, optimizer| {
let optimizer_name = optimizer.name().to_string();
Expand All @@ -1667,7 +1694,8 @@ impl SessionState {
logical_optimization_succeeded,
}))
} else {
self.optimizer.optimize(plan, self, |_, _| {})
let analyzed_plan = self.analyzer.execute_and_check(plan, self.options())?;
self.optimizer.optimize(&analyzed_plan, self, |_, _| {})
}
}

Expand Down
1 change: 1 addition & 0 deletions datafusion/optimizer/src/analyzer/count_wildcard_rule.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use crate::analyzer::AnalyzerRule;

/// Rewrite `Count(Expr:Wildcard)` to `Count(Expr:Literal)`.
/// Resolve issue: https://github.com/apache/arrow-datafusion/issues/5473.
#[derive(Default)]
pub struct CountWildcardRule {}

impl CountWildcardRule {
Expand Down
6 changes: 3 additions & 3 deletions datafusion/optimizer/src/analyzer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@
// specific language governing permissions and limitations
// under the License.

mod count_wildcard_rule;
mod inline_table_scan;
pub(crate) mod type_coercion;
pub mod count_wildcard_rule;
pub mod inline_table_scan;
pub mod type_coercion;

use crate::analyzer::count_wildcard_rule::CountWildcardRule;
use crate::analyzer::inline_table_scan::InlineTableScan;
Expand Down
6 changes: 2 additions & 4 deletions datafusion/optimizer/src/optimizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

//! Query optimizer traits

use crate::analyzer::Analyzer;
use crate::common_subexpr_eliminate::CommonSubexprEliminate;
use crate::decorrelate_where_exists::DecorrelateWhereExists;
use crate::decorrelate_where_in::DecorrelateWhereIn;
Expand Down Expand Up @@ -156,7 +155,7 @@ impl OptimizerConfig for OptimizerContext {
/// A rule-based optimizer.
#[derive(Clone)]
pub struct Optimizer {
/// All rules to apply
/// All optimizer rules to apply
pub rules: Vec<Arc<dyn OptimizerRule + Send + Sync>>,
}

Expand Down Expand Up @@ -264,8 +263,7 @@ impl Optimizer {
F: FnMut(&LogicalPlan, &dyn OptimizerRule),
{
let options = config.options();
// execute_and_check has it's own timer
let mut new_plan = Analyzer::default().execute_and_check(plan, options)?;
let mut new_plan = plan.clone();

let start_time = Instant::now();

Expand Down
7 changes: 5 additions & 2 deletions datafusion/optimizer/tests/integration-test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@ use chrono::{DateTime, NaiveDateTime, Utc};
use datafusion_common::config::ConfigOptions;
use datafusion_common::{DataFusionError, Result};
use datafusion_expr::{AggregateUDF, LogicalPlan, ScalarUDF, TableSource};
use datafusion_optimizer::analyzer::Analyzer;
use datafusion_optimizer::optimizer::Optimizer;
use datafusion_optimizer::{OptimizerContext, OptimizerRule};
use datafusion_optimizer::{OptimizerConfig, OptimizerContext, OptimizerRule};
use datafusion_sql::planner::{ContextProvider, SqlToRel};
use datafusion_sql::sqlparser::ast::Statement;
use datafusion_sql::sqlparser::dialect::GenericDialect;
Expand Down Expand Up @@ -347,8 +348,10 @@ fn test_sql(sql: &str) -> Result<LogicalPlan> {
let config = OptimizerContext::new()
.with_skip_failing_rules(false)
.with_query_execution_start_time(now_time);
let analyzer = Analyzer::new();
let optimizer = Optimizer::new();
// optimize the logical plan
// analyze and optimize the logical plan
let plan = analyzer.execute_and_check(&plan, config.options())?;
optimizer.optimize(&plan, &config, &observe)
}

Expand Down