Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
171 changes: 81 additions & 90 deletions src/optimizer/heuristic/graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,18 +172,6 @@ impl HepGraph {
.collect_vec()
}

pub fn to_opt_expr(&self, start: HepNodeId) -> OptExpr {
let children = self
.children_at(start)
.iter()
.map(|&id| self.to_opt_expr(id))
.collect::<Vec<_>>();
OptExpr::new(
OptExprNode::OperatorRef(self.operator(start).clone()),
children,
)
}

pub fn to_plan_with_index(&self, start_index: HepNodeId) -> LogicalPlan {
let mut root_plan = LogicalPlan {
operator: self.operator(start_index).clone(),
Expand Down Expand Up @@ -222,9 +210,25 @@ mod tests {
use crate::planner::operator::Operator;

#[test]
fn test_graph() -> Result<()> {
fn test_graph_for_plan() -> Result<()> {
let plan = select_sql_run("select * from t1 left join t2 on c1 = c3")?;
let mut graph = HepGraph::new(plan.clone());
let graph = HepGraph::new(plan);

assert!(graph.graph.contains_edge(NodeIndex::new(1), NodeIndex::new(2)));
assert!(graph.graph.contains_edge(NodeIndex::new(1), NodeIndex::new(3)));
assert!(graph.graph.contains_edge(NodeIndex::new(0), NodeIndex::new(1)));

assert_eq!(graph.graph.edge_weight(EdgeIndex::new(0)), Some(&0));
assert_eq!(graph.graph.edge_weight(EdgeIndex::new(1)), Some(&1));
assert_eq!(graph.graph.edge_weight(EdgeIndex::new(2)), Some(&0));

Ok(())
}

#[test]
fn test_graph_add_node() -> Result<()> {
let plan = select_sql_run("select * from t1 left join t2 on c1 = c3")?;
let mut graph = HepGraph::new(plan);

graph.add_node(
HepNodeId::new(1),
Expand All @@ -244,114 +248,101 @@ mod tests {
OptExprNode::OperatorRef(Operator::Dummy)
);

graph.replace_node(HepNodeId::new(5), OptExprNode::OptExpr(OptExprNodeId::new(5)));

assert_eq!(graph.version, 4);

assert!(graph.graph.contains_edge(NodeIndex::new(1), NodeIndex::new(2)));
assert!(graph.graph.contains_edge(NodeIndex::new(1), NodeIndex::new(3)));
assert!(graph.graph.contains_edge(NodeIndex::new(0), NodeIndex::new(1)));
assert!(graph.graph.contains_edge(NodeIndex::new(5), NodeIndex::new(4)));
assert!(graph.graph.contains_edge(NodeIndex::new(1), NodeIndex::new(5)));
assert!(graph.graph.contains_edge(NodeIndex::new(5), NodeIndex::new(6)));
assert_eq!(graph.graph.edge_weight(EdgeIndex::new(0)), Some(&0));
assert_eq!(graph.graph.edge_weight(EdgeIndex::new(1)), Some(&1));
assert_eq!(graph.graph.edge_weight(EdgeIndex::new(2)), Some(&0));

assert_eq!(graph.graph.edge_weight(EdgeIndex::new(3)), Some(&0));
assert_eq!(graph.graph.edge_weight(EdgeIndex::new(4)), Some(&2));
assert_eq!(graph.graph.edge_weight(EdgeIndex::new(5)), Some(&1));
assert_eq!(
graph.graph.node_weight(NodeIndex::new(5)),
Some(&OptExprNode::OptExpr(5))
);
assert_eq!(graph.root_index, NodeIndex::new(0));

let old_root_node = graph.remove_node(
HepNodeId::new(0),
false
).unwrap();
Ok(())
}

graph.remove_node(
HepNodeId::new(5),
true
);
#[test]
fn test_graph_replace_node() -> Result<()> {
let plan = select_sql_run("select * from t1 left join t2 on c1 = c3")?;
let mut graph = HepGraph::new(plan);

assert_eq!(graph.version, 6);
graph.replace_node(HepNodeId::new(1), OptExprNode::OperatorRef(Operator::Dummy));

assert!(graph.graph.contains_edge(NodeIndex::new(1), NodeIndex::new(2)));
assert!(graph.graph.contains_edge(NodeIndex::new(1), NodeIndex::new(3)));
assert_eq!(graph.graph.edge_weight(EdgeIndex::new(0)), Some(&0));
assert_eq!(graph.graph.edge_weight(EdgeIndex::new(1)), Some(&1));
assert_eq!(graph.root_index, NodeIndex::new(1));
assert!(matches!(graph.operator(HepNodeId::new(1)), Operator::Dummy));

let final_plan = graph.to_plan();
Ok(())
}

match final_plan.operator {
Operator::Join(_) => (),
_ => unreachable!("Should be a join operator"),
}
#[test]
fn test_graph_remove_middle_node_by_single() -> Result<()> {
let plan = select_sql_run("select * from t1 left join t2 on c1 = c3")?;
let mut graph = HepGraph::new(plan);

assert_eq!(final_plan.childrens.len(), 2);
graph.remove_node(HepNodeId::new(1), false);

let part_plan = graph.to_plan_with_index(HepNodeId::new(3));
assert_eq!(graph.graph.edge_count(), 2);

match part_plan.operator {
Operator::Scan(_) => (),
_ => unreachable!("Should be a scan operator"),
}
assert!(graph.graph.contains_edge(NodeIndex::new(0), NodeIndex::new(2)));
assert!(graph.graph.contains_edge(NodeIndex::new(0), NodeIndex::new(3)));

assert_eq!(part_plan.childrens.len(), 0);
Ok(())
}

let root_expr = graph.to_opt_expr(HepNodeId::new(1));
#[test]
fn test_graph_remove_middle_node_with_childrens() -> Result<()> {
let plan = select_sql_run("select * from t1 left join t2 on c1 = c3")?;
let mut graph = HepGraph::new(plan);

match root_expr.root.get_operator() {
Operator::Join(_) => (),
_ => unreachable!("Should be a join operator"),
}
graph.remove_node(HepNodeId::new(1), true);

assert_eq!(root_expr.childrens.len(), 2);
assert_eq!(graph.graph.edge_count(), 0);

let part_expr = graph.to_opt_expr(HepNodeId::new(3));
Ok(())
}

match part_expr.root.get_operator() {
Operator::Scan(_) => (),
_ => unreachable!("Should be a scan operator"),
}
#[test]
fn test_graph_swap_node() -> Result<()> {
let plan = select_sql_run("select * from t1 left join t2 on c1 = c3")?;
let mut graph = HepGraph::new(plan);

assert_eq!(part_expr.childrens.len(), 0);
let before_op_0 = graph.operator(HepNodeId::new(0)).clone();
let before_op_1 = graph.operator(HepNodeId::new(1)).clone();

graph.add_root(old_root_node);
graph.swap_node(HepNodeId::new(0), HepNodeId::new(1));

assert_eq!(graph.version, 7);
let op_0 = graph.operator(HepNodeId::new(0));
let op_1 = graph.operator(HepNodeId::new(1));

let re_root_plan = graph.to_plan();
assert_eq!(op_0, &before_op_1);
assert_eq!(op_1, &before_op_0);

match re_root_plan.operator {
Operator::Project(_) => (),
_ => unreachable!("Should be a project operator"),
}
Ok(())
}

#[test]
fn test_graph_add_root() -> Result<()> {
let plan = select_sql_run("select * from t1 left join t2 on c1 = c3")?;
let mut graph = HepGraph::new(plan);

graph.add_root(OptExprNode::OperatorRef(Operator::Dummy));

assert_eq!(re_root_plan.childrens.len(), 1);
assert_eq!(graph.graph.edge_count(), 4);
assert!(graph.graph.contains_edge(NodeIndex::new(4), NodeIndex::new(0)));
assert_eq!(graph.graph.edge_weight(EdgeIndex::new(3)), Some(&0));

Ok(())
}

graph.swap_node(HepNodeId::new(2), HepNodeId::new(3));
#[test]
fn test_graph_to_plan() -> Result<()> {
let plan = select_sql_run("select * from t1 left join t2 on c1 = c3")?;
let graph = HepGraph::new(plan.clone());

assert_eq!(graph.version, 8);
let plan_for_graph = graph.to_plan();

let swap_plan = graph.to_plan();
assert_eq!(plan, plan_for_graph);

match &swap_plan.childrens[0].childrens[0].operator {
Operator::Scan(op) => {
assert_eq!(op.columns[0].referenced_columns()[0].name, "c3");
},
_ => unreachable!("Should be a scan operator"),
}
let plan_by_index = graph.to_plan_with_index(HepNodeId::new(1));

match &swap_plan.childrens[0].childrens[1].operator {
Operator::Scan(op) => {
assert_eq!(op.columns[0].referenced_columns()[0].name, "c1");
},
_ => unreachable!("Should be a scan operator"),
}
assert_eq!(plan.childrens[0], plan_by_index);

Ok(())
}
Expand Down