Skip to content
This repository was archived by the owner on Jan 7, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 37 additions & 32 deletions optd-datafusion-bridge/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -256,40 +256,45 @@ impl OptdQueryPlanner {
.unwrap()
.explain_to_string(if verbose { Some(&meta) } else { None }),
));
let join_order = get_join_order(optimized_rel.clone());
explains.push(StringifiedPlan::new(
PlanType::OptimizedPhysicalPlan {
optimizer_name: "optd-join-order".to_string(),
},
if let Some(join_order) = join_order {
join_order.to_string()
} else {
"None".to_string()
},
));
let bindings = optimizer
.optd_cascades_optimizer()
.get_all_group_bindings(group_id, true);
let mut join_orders = BTreeSet::new();
let mut logical_join_orders = BTreeSet::new();
for binding in bindings {
if let Some(join_order) = get_join_order(binding) {
logical_join_orders.insert(join_order.conv_into_logical_join_order());
join_orders.insert(join_order);

const ENABLE_JOIN_ORDER: bool = false;

if ENABLE_JOIN_ORDER {
let join_order = get_join_order(optimized_rel.clone());
explains.push(StringifiedPlan::new(
PlanType::OptimizedPhysicalPlan {
optimizer_name: "optd-join-order".to_string(),
},
if let Some(join_order) = join_order {
join_order.to_string()
} else {
"None".to_string()
},
));
let bindings = optimizer
.optd_cascades_optimizer()
.get_all_group_bindings(group_id, true);
let mut join_orders = BTreeSet::new();
let mut logical_join_orders = BTreeSet::new();
for binding in bindings {
if let Some(join_order) = get_join_order(binding) {
logical_join_orders.insert(join_order.conv_into_logical_join_order());
join_orders.insert(join_order);
}
}
explains.push(StringifiedPlan::new(
PlanType::OptimizedPhysicalPlan {
optimizer_name: "optd-all-join-orders".to_string(),
},
join_orders.iter().map(|x| x.to_string()).join("\n"),
));
explains.push(StringifiedPlan::new(
PlanType::OptimizedPhysicalPlan {
optimizer_name: "optd-all-logical-join-orders".to_string(),
},
logical_join_orders.iter().map(|x| x.to_string()).join("\n"),
));
}
explains.push(StringifiedPlan::new(
PlanType::OptimizedPhysicalPlan {
optimizer_name: "optd-all-join-orders".to_string(),
},
join_orders.iter().map(|x| x.to_string()).join("\n"),
));
explains.push(StringifiedPlan::new(
PlanType::OptimizedPhysicalPlan {
optimizer_name: "optd-all-logical-join-orders".to_string(),
},
logical_join_orders.iter().map(|x| x.to_string()).join("\n"),
));
}
// println!(
// "{} cost={}",
Expand Down
41 changes: 25 additions & 16 deletions optd-datafusion-repr/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@ use properties::{
};
use rules::{
EliminateDuplicatedAggExprRule, EliminateDuplicatedSortExprRule, EliminateFilterRule,
EliminateJoinRule, EliminateLimitRule, FilterAggTransposeRule, FilterCrossJoinTransposeRule,
FilterInnerJoinTransposeRule, FilterMergeRule, FilterProjectTransposeRule,
FilterSortTransposeRule, HashJoinRule, JoinAssocRule, JoinCommuteRule, PhysicalConversionRule,
ProjectFilterTransposeRule, ProjectMergeRule, ProjectionPullUpJoin, SimplifyFilterRule,
SimplifyJoinCondRule,
EliminateJoinRule, EliminateLimitRule, EliminateProjectRule, FilterAggTransposeRule,
FilterCrossJoinTransposeRule, FilterInnerJoinTransposeRule, FilterMergeRule,
FilterProjectTransposeRule, FilterSortTransposeRule, HashJoinRule, JoinAssocRule,
JoinCommuteRule, PhysicalConversionRule, ProjectMergeRule, ProjectionPullUpJoin,
SimplifyFilterRule, SimplifyJoinCondRule,
};

pub use optd_core::rel_node::Value;
Expand All @@ -46,7 +46,7 @@ mod testing;
// mod expand;

pub struct DatafusionOptimizer {
hueristic_optimizer: HeuristicsOptimizer<OptRelNodeTyp>,
heuristic_optimizer: HeuristicsOptimizer<OptRelNodeTyp>,
cascades_optimizer: CascadesOptimizer<OptRelNodeTyp>,
pub runtime_statistics: RuntimeAdaptionStorage,
enable_adaptive: bool,
Expand Down Expand Up @@ -75,7 +75,7 @@ impl DatafusionOptimizer {
}

pub fn optd_hueristic_optimizer(&self) -> &HeuristicsOptimizer<OptRelNodeTyp> {
&self.hueristic_optimizer
&self.heuristic_optimizer
}

pub fn optd_optimizer_mut(&mut self) -> &mut CascadesOptimizer<OptRelNodeTyp> {
Expand All @@ -99,6 +99,8 @@ impl DatafusionOptimizer {
Arc::new(DepJoinPastAgg::new()),
Arc::new(ProjectMergeRule::new()),
Arc::new(FilterMergeRule::new()),
// disabled due to logical properties are not implemented in heuristics
// Arc::new(EliminateProjectRule::new()),
]
}

Expand All @@ -110,9 +112,10 @@ impl DatafusionOptimizer {
rule_wrappers.push(RuleWrapper::new_cascades(rule));
}
// project transpose rules
rule_wrappers.push(RuleWrapper::new_cascades(Arc::new(
ProjectFilterTransposeRule::new(),
)));
// only do filter-project one way for now to reduce search space
// rule_wrappers.push(RuleWrapper::new_cascades(Arc::new(
// ProjectFilterTransposeRule::new(),
// )));
// add all filter pushdown rules as heuristic rules
rule_wrappers.push(RuleWrapper::new_cascades(Arc::new(
FilterProjectTransposeRule::new(),
Expand All @@ -129,13 +132,19 @@ impl DatafusionOptimizer {
rule_wrappers.push(RuleWrapper::new_cascades(Arc::new(
FilterAggTransposeRule::new(),
)));
rule_wrappers.push(RuleWrapper::new_cascades(Arc::new(HashJoinRule::new()))); // 17
rule_wrappers.push(RuleWrapper::new_cascades(Arc::new(JoinCommuteRule::new()))); // 18
rule_wrappers.push(RuleWrapper::new_cascades(Arc::new(HashJoinRule::new())));
rule_wrappers.push(RuleWrapper::new_cascades(Arc::new(JoinCommuteRule::new())));
rule_wrappers.push(RuleWrapper::new_cascades(Arc::new(JoinAssocRule::new())));
rule_wrappers.push(RuleWrapper::new_cascades(Arc::new(
ProjectionPullUpJoin::new(),
)));

rule_wrappers.push(RuleWrapper::new_cascades(Arc::new(
EliminateProjectRule::new(),
)));
rule_wrappers.push(RuleWrapper::new_cascades(Arc::new(ProjectMergeRule::new())));
rule_wrappers.push(RuleWrapper::new_cascades(Arc::new(
EliminateFilterRule::new(),
)));
rule_wrappers
}

Expand Down Expand Up @@ -167,7 +176,7 @@ impl DatafusionOptimizer {
partial_explore_space: Some(1 << 10),
},
),
hueristic_optimizer: HeuristicsOptimizer::new_with_rules(
heuristic_optimizer: HeuristicsOptimizer::new_with_rules(
heuristic_rules,
ApplyOrder::TopDown, // uhh TODO reconsider
property_builders.clone(),
Expand Down Expand Up @@ -215,7 +224,7 @@ impl DatafusionOptimizer {
cascades_optimizer: optimizer,
enable_adaptive: true,
enable_heuristic: false,
hueristic_optimizer: HeuristicsOptimizer::new_with_rules(
heuristic_optimizer: HeuristicsOptimizer::new_with_rules(
vec![],
ApplyOrder::BottomUp,
Arc::new([]),
Expand All @@ -224,7 +233,7 @@ impl DatafusionOptimizer {
}

pub fn heuristic_optimize(&mut self, root_rel: OptRelNodeRef) -> OptRelNodeRef {
self.hueristic_optimizer
self.heuristic_optimizer
.optimize(root_rel)
.expect("heuristics returns error")
}
Expand Down
2 changes: 1 addition & 1 deletion optd-datafusion-repr/src/rules.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ pub use physical::PhysicalConversionRule;
pub use project_transpose::{
project_filter_transpose::{FilterProjectTransposeRule, ProjectFilterTransposeRule},
project_join_transpose::ProjectionPullUpJoin,
project_merge::ProjectMergeRule,
project_merge::{EliminateProjectRule, ProjectMergeRule},
};
pub use subquery::{
DepInitialDistinct, DepJoinEliminateAtScan, DepJoinPastAgg, DepJoinPastFilter, DepJoinPastProj,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@ use std::collections::HashMap;
use optd_core::rules::{Rule, RuleMatcher};
use optd_core::{optimizer::Optimizer, rel_node::RelNode};

use crate::plan_nodes::{ExprList, LogicalProjection, OptRelNode, OptRelNodeTyp, PlanNode};
use crate::plan_nodes::{
ColumnRefExpr, ExprList, LogicalProjection, OptRelNode, OptRelNodeTyp, PlanNode,
};
use crate::properties::schema::SchemaPropertyBuilder;
use crate::rules::macros::define_rule;

use super::project_transpose_common::ProjectionMapping;
Expand Down Expand Up @@ -41,6 +44,38 @@ fn apply_projection_merge(
vec![node.into_rel_node().as_ref().clone()]
}

// Proj child [identical columns] -> eliminate
define_rule!(
EliminateProjectRule,
apply_eliminate_project,
(Projection, child, [expr])
);

fn apply_eliminate_project(
optimizer: &impl Optimizer<OptRelNodeTyp>,
EliminateProjectRulePicks { child, expr }: EliminateProjectRulePicks,
) -> Vec<RelNode<OptRelNodeTyp>> {
let exprs = ExprList::from_rel_node(expr.into()).unwrap();
let child_columns = optimizer
.get_property::<SchemaPropertyBuilder>(child.clone().into(), 0)
.len();
if child_columns != exprs.len() {
return Vec::new();
}
for i in 0..exprs.len() {
let child_expr = exprs.child(i);
if child_expr.typ() == OptRelNodeTyp::ColumnRef {
let child_expr = ColumnRefExpr::from_rel_node(child_expr.into_rel_node()).unwrap();
if child_expr.index() != i {
return Vec::new();
}
} else {
return Vec::new();
}
}
vec![child]
}

#[cfg(test)]
mod tests {
use std::sync::Arc;
Expand Down
10 changes: 5 additions & 5 deletions optd-sqlplannertest/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,11 @@ The `explain` and `execute` task will be run with datafusion's logical optimizer

#### Flags

| Name | Description |
| -------------- | --------------------------------------- |
| use_df_logical | Enable Datafusion's logical optimizer |
| verbose | Display estimated cost in physical plan |
| logical_rules | Only enable these logical rules |
| Name | Description |
| -------------- | ------------------------------------------------------------------ |
| use_df_logical | Enable Datafusion's logical optimizer |
| verbose | Display estimated cost in physical plan |
| logical_rules | Only enable these logical rules (also disable heuristic optimizer) |

Currently we have the following options for the explain task:

Expand Down
9 changes: 7 additions & 2 deletions optd-sqlplannertest/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ impl DatafusionDBMS {
for r in 0..rules.len() {
optimizer.enable_rule(r);
}
guard.as_mut().unwrap().enable_heuristic(true);
} else {
for (rule_id, rule) in rules.as_ref().iter().enumerate() {
if rule.rule.is_impl_rule() {
Expand All @@ -127,6 +128,7 @@ impl DatafusionDBMS {
if !rules_to_enable.is_empty() {
bail!("Unknown logical rule: {:?}", rules_to_enable);
}
guard.as_mut().unwrap().enable_heuristic(false);
}
}
let sql = unescape_input(sql)?;
Expand Down Expand Up @@ -335,8 +337,11 @@ fn extract_flags(task: &str) -> Result<TestFlags> {
} else if flag == "use_df_logical" {
options.enable_df_logical = true;
} else if flag.starts_with("logical_rules") {
options.enable_logical_rules =
flag.split('+').skip(1).map(|x| x.to_string()).collect();
if let Some((_, flag)) = flag.split_once(':') {
options.enable_logical_rules = flag.split('+').map(|x| x.to_string()).collect();
} else {
bail!("Failed to parse logical_rules flag: {}", flag);
}
} else if flag == "disable_explore_limit" {
options.disable_explore_limit = true;
} else {
Expand Down
3 changes: 1 addition & 2 deletions optd-sqlplannertest/tests/basic/basic_nodes.planner.sql
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@ LogicalLimit { skip: 0(u64), fetch: 1(u64) }
└── LogicalProjection { exprs: [ #0, #1 ] }
└── LogicalScan { table: t1 }
PhysicalLimit { skip: 0(u64), fetch: 1(u64) }
└── PhysicalProjection { exprs: [ #0, #1 ] }
└── PhysicalScan { table: t1 }
└── PhysicalScan { table: t1 }
0 0
0 0
1 1
Expand Down
7 changes: 3 additions & 4 deletions optd-sqlplannertest/tests/basic/cross_product.planner.sql
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,9 @@ LogicalProjection { exprs: [ #0, #1 ] }
└── LogicalJoin { join_type: Cross, cond: true }
├── LogicalScan { table: t1 }
└── LogicalScan { table: t2 }
PhysicalProjection { exprs: [ #0, #1 ] }
└── PhysicalNestedLoopJoin { join_type: Cross, cond: true }
├── PhysicalScan { table: t1 }
└── PhysicalScan { table: t2 }
PhysicalNestedLoopJoin { join_type: Cross, cond: true }
├── PhysicalScan { table: t1 }
└── PhysicalScan { table: t2 }
0 0
0 1
0 2
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,7 @@ select * from t1;
/*
LogicalProjection { exprs: [ #0, #1 ] }
└── LogicalScan { table: t1 }
PhysicalProjection { exprs: [ #0, #1 ] }
└── PhysicalScan { table: t1 }
PhysicalScan { table: t1 }
0 0
1 1
5 2
Expand Down Expand Up @@ -45,8 +44,7 @@ PhysicalSort
│ │ └── #0
│ └── SortOrder { order: Asc }
│ └── #1
└── PhysicalProjection { exprs: [ #0, #1 ] }
└── PhysicalScan { table: t1 }
└── PhysicalScan { table: t1 }
0 0
0 2
1 1
Expand All @@ -61,9 +59,8 @@ select * from t1 group by v1, v2, v1;
LogicalProjection { exprs: [ #0, #1 ] }
└── LogicalAgg { exprs: [], groups: [ #0, #1, #0 ] }
└── LogicalScan { table: t1 }
PhysicalProjection { exprs: [ #0, #1 ] }
└── PhysicalAgg { aggrs: [], groups: [ #0, #1 ] }
└── PhysicalScan { table: t1 }
PhysicalAgg { aggrs: [], groups: [ #0, #1 ] }
└── PhysicalScan { table: t1 }
0 0
1 1
5 2
Expand Down Expand Up @@ -96,9 +93,8 @@ PhysicalSort
│ │ └── #0
│ └── SortOrder { order: Asc }
│ └── #1
└── PhysicalProjection { exprs: [ #0, #1 ] }
└── PhysicalAgg { aggrs: [], groups: [ #0, #1 ] }
└── PhysicalScan { table: t1 }
└── PhysicalAgg { aggrs: [], groups: [ #0, #1 ] }
└── PhysicalScan { table: t1 }
0 0
0 2
1 1
Expand Down
Loading
Loading