Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions datafusion/core/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1381,7 +1381,7 @@ impl SessionState {
// optimize the child plan, capturing the output of each optimizer
let plan = self.optimizer.optimize(
e.plan.as_ref(),
&optimizer_config,
&mut optimizer_config,
|optimized_plan, optimizer| {
let optimizer_name = optimizer.name().to_string();
let plan_type = PlanType::OptimizedLogicalPlan { optimizer_name };
Expand All @@ -1396,7 +1396,8 @@ impl SessionState {
schema: e.schema.clone(),
}))
} else {
self.optimizer.optimize(plan, &optimizer_config, |_, _| {})
self.optimizer
.optimize(plan, &mut optimizer_config, |_, _| {})
}
}

Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/tests/user_defined_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ impl OptimizerRule for TopKOptimizerRule {
fn optimize(
&self,
plan: &LogicalPlan,
optimizer_config: &OptimizerConfig,
optimizer_config: &mut OptimizerConfig,
) -> Result<LogicalPlan> {
// Note: this code simply looks for the pattern of a Limit followed by a
// Sort and replaces it by a TopK node. It does not handle many
Expand Down
4 changes: 2 additions & 2 deletions datafusion/optimizer/src/common_subexpr_eliminate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ impl OptimizerRule for CommonSubexprEliminate {
fn optimize(
&self,
plan: &LogicalPlan,
optimizer_config: &OptimizerConfig,
optimizer_config: &mut OptimizerConfig,
) -> Result<LogicalPlan> {
optimize(plan, optimizer_config)
}
Expand Down Expand Up @@ -708,7 +708,7 @@ mod test {
fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: &str) {
let optimizer = CommonSubexprEliminate {};
let optimized_plan = optimizer
.optimize(plan, &OptimizerConfig::new())
.optimize(plan, &mut OptimizerConfig::new())
.expect("failed to optimize plan");
let formatted_plan = format!("{:?}", optimized_plan);
assert_eq!(formatted_plan, expected);
Expand Down
4 changes: 2 additions & 2 deletions datafusion/optimizer/src/eliminate_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ impl OptimizerRule for EliminateFilter {
fn optimize(
&self,
plan: &LogicalPlan,
optimizer_config: &OptimizerConfig,
optimizer_config: &mut OptimizerConfig,
) -> Result<LogicalPlan> {
match plan {
LogicalPlan::Filter(Filter {
Expand Down Expand Up @@ -84,7 +84,7 @@ mod tests {
fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: &str) {
let rule = EliminateFilter::new();
let optimized_plan = rule
.optimize(plan, &OptimizerConfig::new())
.optimize(plan, &mut OptimizerConfig::new())
.expect("failed to optimize plan");
let formatted_plan = format!("{:?}", optimized_plan);
assert_eq!(formatted_plan, expected);
Expand Down
4 changes: 2 additions & 2 deletions datafusion/optimizer/src/eliminate_limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ impl OptimizerRule for EliminateLimit {
fn optimize(
&self,
plan: &LogicalPlan,
optimizer_config: &OptimizerConfig,
optimizer_config: &mut OptimizerConfig,
) -> Result<LogicalPlan> {
eliminate_limit(self, &Ancestor::NotRelevant, plan, optimizer_config)
}
Expand All @@ -152,7 +152,7 @@ mod tests {
fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: &str) {
let rule = EliminateLimit::new();
let optimized_plan = rule
.optimize(plan, &OptimizerConfig::new())
.optimize(plan, &mut OptimizerConfig::new())
.expect("failed to optimize plan");
let formatted_plan = format!("{:?}", optimized_plan);
assert_eq!(formatted_plan, expected);
Expand Down
4 changes: 2 additions & 2 deletions datafusion/optimizer/src/filter_null_join_keys.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ impl OptimizerRule for FilterNullJoinKeys {
fn optimize(
&self,
plan: &LogicalPlan,
optimizer_config: &OptimizerConfig,
optimizer_config: &mut OptimizerConfig,
) -> datafusion_common::Result<LogicalPlan> {
match plan {
LogicalPlan::Join(join) if join.join_type == JoinType::Inner => {
Expand Down Expand Up @@ -145,7 +145,7 @@ mod tests {

fn optimize_plan(plan: &LogicalPlan) -> LogicalPlan {
let rule = FilterNullJoinKeys::default();
rule.optimize(plan, &OptimizerConfig::new())
rule.optimize(plan, &mut OptimizerConfig::new())
.expect("failed to optimize plan")
}

Expand Down
8 changes: 6 additions & 2 deletions datafusion/optimizer/src/filter_push_down.rs
Original file line number Diff line number Diff line change
Expand Up @@ -616,7 +616,11 @@ impl OptimizerRule for FilterPushDown {
"filter_push_down"
}

fn optimize(&self, plan: &LogicalPlan, _: &OptimizerConfig) -> Result<LogicalPlan> {
fn optimize(
&self,
plan: &LogicalPlan,
_: &mut OptimizerConfig,
) -> Result<LogicalPlan> {
optimize(plan, State::default())
}
}
Expand Down Expand Up @@ -666,7 +670,7 @@ mod tests {

fn optimize_plan(plan: &LogicalPlan) -> LogicalPlan {
let rule = FilterPushDown::new();
rule.optimize(plan, &OptimizerConfig::new())
rule.optimize(plan, &mut OptimizerConfig::new())
.expect("failed to optimize plan")
}

Expand Down
4 changes: 2 additions & 2 deletions datafusion/optimizer/src/limit_push_down.rs
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,7 @@ impl OptimizerRule for LimitPushDown {
fn optimize(
&self,
plan: &LogicalPlan,
optimizer_config: &OptimizerConfig,
optimizer_config: &mut OptimizerConfig,
) -> Result<LogicalPlan> {
limit_push_down(self, Ancestor::NotRelevant, plan, optimizer_config)
}
Expand All @@ -358,7 +358,7 @@ mod test {
fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: &str) {
let rule = LimitPushDown::new();
let optimized_plan = rule
.optimize(plan, &OptimizerConfig::new())
.optimize(plan, &mut OptimizerConfig::new())
.expect("failed to optimize plan");
let formatted_plan = format!("{:?}", optimized_plan);
assert_eq!(formatted_plan, expected);
Expand Down
11 changes: 9 additions & 2 deletions datafusion/optimizer/src/optimizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ pub trait OptimizerRule {
fn optimize(
&self,
plan: &LogicalPlan,
optimizer_config: &OptimizerConfig,
optimizer_config: &mut OptimizerConfig,
) -> Result<LogicalPlan>;

/// A human readable name for this optimizer rule
Expand All @@ -44,15 +44,22 @@ pub struct OptimizerConfig {
/// Query execution start time that can be used to rewrite expressions such as `now()`
/// to use a literal value instead
pub query_execution_start_time: DateTime<Utc>,
next_id: usize,
}

impl OptimizerConfig {
/// Create optimizer config
pub fn new() -> Self {
Self {
query_execution_start_time: chrono::Utc::now(),
next_id: 0, // useful for generating things like unique subquery aliases
}
}

pub fn next_id(&mut self) -> usize {
self.next_id += 1;
self.next_id
}
}

impl Default for OptimizerConfig {
Expand Down Expand Up @@ -80,7 +87,7 @@ impl Optimizer {
pub fn optimize<F>(
&self,
plan: &LogicalPlan,
optimizer_config: &OptimizerConfig,
optimizer_config: &mut OptimizerConfig,
mut observer: F,
) -> Result<LogicalPlan>
where
Expand Down
4 changes: 2 additions & 2 deletions datafusion/optimizer/src/projection_push_down.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ impl OptimizerRule for ProjectionPushDown {
fn optimize(
&self,
plan: &LogicalPlan,
optimizer_config: &OptimizerConfig,
optimizer_config: &mut OptimizerConfig,
) -> Result<LogicalPlan> {
// set of all columns refered by the plan (and thus considered required by the root)
let required_columns = plan
Expand Down Expand Up @@ -1011,6 +1011,6 @@ mod tests {

fn optimize(plan: &LogicalPlan) -> Result<LogicalPlan> {
let rule = ProjectionPushDown::new();
rule.optimize(plan, &OptimizerConfig::new())
rule.optimize(plan, &mut OptimizerConfig::new())
}
}
4 changes: 2 additions & 2 deletions datafusion/optimizer/src/reduce_outer_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ impl OptimizerRule for ReduceOuterJoin {
fn optimize(
&self,
plan: &LogicalPlan,
optimizer_config: &OptimizerConfig,
optimizer_config: &mut OptimizerConfig,
) -> Result<LogicalPlan> {
let mut nonnullable_cols: Vec<Column> = vec![];

Expand Down Expand Up @@ -367,7 +367,7 @@ mod tests {
fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: &str) {
let rule = ReduceOuterJoin::new();
let optimized_plan = rule
.optimize(plan, &OptimizerConfig::new())
.optimize(plan, &mut OptimizerConfig::new())
.expect("failed to optimize plan");
let formatted_plan = format!("{:?}", optimized_plan);
assert_eq!(formatted_plan, expected);
Expand Down
8 changes: 4 additions & 4 deletions datafusion/optimizer/src/simplify_expressions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ impl OptimizerRule for SimplifyExpressions {
fn optimize(
&self,
plan: &LogicalPlan,
optimizer_config: &OptimizerConfig,
optimizer_config: &mut OptimizerConfig,
) -> Result<LogicalPlan> {
let mut execution_props = ExecutionProps::new();
execution_props.query_execution_start_time =
Expand Down Expand Up @@ -1545,7 +1545,7 @@ mod tests {
fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: &str) {
let rule = SimplifyExpressions::new();
let optimized_plan = rule
.optimize(plan, &OptimizerConfig::new())
.optimize(plan, &mut OptimizerConfig::new())
.expect("failed to optimize plan");
let formatted_plan = format!("{:?}", optimized_plan);
assert_eq!(formatted_plan, expected);
Expand Down Expand Up @@ -1768,7 +1768,7 @@ mod tests {
let rule = SimplifyExpressions::new();

let err = rule
.optimize(plan, &config)
.optimize(plan, &mut config)
.expect_err("expected optimization to fail");

err.to_string()
Expand All @@ -1783,7 +1783,7 @@ mod tests {
let rule = SimplifyExpressions::new();

let optimized_plan = rule
.optimize(plan, &config)
.optimize(plan, &mut config)
.expect("failed to optimize plan");
return format!("{:?}", optimized_plan);
}
Expand Down
4 changes: 2 additions & 2 deletions datafusion/optimizer/src/single_distinct_to_groupby.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ impl OptimizerRule for SingleDistinctToGroupBy {
fn optimize(
&self,
plan: &LogicalPlan,
_optimizer_config: &OptimizerConfig,
_optimizer_config: &mut OptimizerConfig,
) -> Result<LogicalPlan> {
optimize(plan)
}
Expand All @@ -221,7 +221,7 @@ mod tests {
fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: &str) {
let rule = SingleDistinctToGroupBy::new();
let optimized_plan = rule
.optimize(plan, &OptimizerConfig::new())
.optimize(plan, &mut OptimizerConfig::new())
.expect("failed to optimize plan");

let formatted_plan = format!("{}", optimized_plan.display_indent_schema());
Expand Down
4 changes: 2 additions & 2 deletions datafusion/optimizer/src/subquery_filter_to_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ impl OptimizerRule for SubqueryFilterToJoin {
fn optimize(
&self,
plan: &LogicalPlan,
optimizer_config: &OptimizerConfig,
optimizer_config: &mut OptimizerConfig,
) -> Result<LogicalPlan> {
match plan {
LogicalPlan::Filter(Filter { predicate, input }) => {
Expand Down Expand Up @@ -207,7 +207,7 @@ mod tests {
fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: &str) {
let rule = SubqueryFilterToJoin::new();
let optimized_plan = rule
.optimize(plan, &OptimizerConfig::new())
.optimize(plan, &mut OptimizerConfig::new())
.expect("failed to optimize plan");
let formatted_plan = format!("{}", optimized_plan.display_indent_schema());
assert_eq!(formatted_plan, expected);
Expand Down
2 changes: 1 addition & 1 deletion datafusion/optimizer/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use std::sync::Arc;
pub fn optimize_children(
optimizer: &impl OptimizerRule,
plan: &LogicalPlan,
optimizer_config: &OptimizerConfig,
optimizer_config: &mut OptimizerConfig,
) -> Result<LogicalPlan> {
let new_exprs = plan.expressions();
let new_inputs = plan
Expand Down