Skip to content

Commit

Permalink
support more than 2 branches to union (#1756)
Browse files Browse the repository at this point in the history
  • Loading branch information
MeloYang05 authored Jun 24, 2022
1 parent faadf9f commit 9abea6d
Showing 1 changed file with 121 additions and 33 deletions.
154 changes: 121 additions & 33 deletions research/query_service/ir/core/src/plan/logical.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
//! limitations under the License.

use std::cell::RefCell;
use std::collections::{BTreeSet, HashMap};
use std::collections::{BTreeSet, HashMap, VecDeque};
use std::convert::{TryFrom, TryInto};
use std::fmt;
use std::io;
Expand Down Expand Up @@ -210,35 +210,44 @@ impl LogicalPlan {
/// Get the corresponding merge node of the given branch node.
fn get_merge_node(&self, branch_node: NodeType) -> Option<NodeType> {
if branch_node.borrow().children.len() > 1 {
let mut layer = 0;
let mut curr_node_opt = Some(branch_node.clone());
while curr_node_opt.is_some() {
let next_node_id = curr_node_opt
.as_ref()
.unwrap()
.borrow()
.get_first_child();
curr_node_opt = next_node_id.and_then(|id| self.get_node(id));
if let Some(curr_node) = &curr_node_opt {
let curr_ref = curr_node.borrow();
if curr_ref.children.len() > 1 {
layer += curr_ref.children.len();
}
if curr_ref.parents.len() > 1 {
// Every branch node must have a corresponding merge node in a valid plan
if layer > 0 {
layer -= curr_ref.parents.len();
} else {
break;
let root_children: BTreeSet<u32> = branch_node
.borrow()
.children
.iter()
.cloned()
.collect();
let mut node_root_child_map = HashMap::new();
let mut queue = VecDeque::new();
for root_child_id in root_children.iter() {
queue.push_back(*root_child_id);
node_root_child_map.insert(*root_child_id, BTreeSet::from([*root_child_id]));
}
'outer: loop {
if let Some(relaxed_node_id) = queue.pop_front() {
let relaxed_node = self.get_node(relaxed_node_id).unwrap();
let related_root_child_nodes = node_root_child_map
.get(&relaxed_node_id)
.cloned()
.unwrap();
for relaxed_node_child in relaxed_node.borrow().children.iter() {
if !node_root_child_map.contains_key(relaxed_node_child) {
queue.push_back(*relaxed_node_child);
}

let child_related_root_child_nodes = node_root_child_map
.entry(*relaxed_node_child)
.or_insert(BTreeSet::new());
for root_child_node in related_root_child_nodes.iter() {
child_related_root_child_nodes.insert(*root_child_node);
}
if *child_related_root_child_nodes == root_children {
break 'outer self.get_node(*relaxed_node_child);
}
}
} else {
break None;
}
}
if layer == 0 {
curr_node_opt
} else {
None
}
} else {
None
}
Expand Down Expand Up @@ -2729,7 +2738,7 @@ mod test {
// 6
// |
// 7
fn create_logical_plan() -> LogicalPlan {
fn create_logical_plan1() -> LogicalPlan {
let opr = pb::logical_plan::Operator {
opr: Some(pb::logical_plan::operator::Opr::As(pb::As { alias: None })),
};
Expand All @@ -2754,9 +2763,66 @@ mod test {
plan
}

// The plan looks like:
// root
// / | \
// 1 2 3
// \ / /
// 4 /
// \ /
// 5
fn create_logical_plan2() -> LogicalPlan {
let opr = pb::logical_plan::Operator {
opr: Some(pb::logical_plan::operator::Opr::As(pb::As { alias: None })),
};
let mut plan = LogicalPlan::default();
plan.append_operator_as_node(opr.clone(), vec![])
.unwrap(); // root
plan.append_operator_as_node(opr.clone(), vec![0])
.unwrap(); // node 1
plan.append_operator_as_node(opr.clone(), vec![0])
.unwrap(); // node 2
plan.append_operator_as_node(opr.clone(), vec![0])
.unwrap(); // node 3
plan.append_operator_as_node(opr.clone(), vec![1, 2])
.unwrap(); // node 4
plan.append_operator_as_node(opr.clone(), vec![3, 4])
.unwrap(); // node 5
plan
}

// The plan looks like:
// root
// / | \
// 1 2 3
// \ / \ /
// 4 5
// \ /
// 6
fn create_logical_plan3() -> LogicalPlan {
let opr = pb::logical_plan::Operator {
opr: Some(pb::logical_plan::operator::Opr::As(pb::As { alias: None })),
};
let mut plan = LogicalPlan::default();
plan.append_operator_as_node(opr.clone(), vec![])
.unwrap(); // root
plan.append_operator_as_node(opr.clone(), vec![0])
.unwrap(); // node 1
plan.append_operator_as_node(opr.clone(), vec![0])
.unwrap(); // node 2
plan.append_operator_as_node(opr.clone(), vec![0])
.unwrap(); // node 3
plan.append_operator_as_node(opr.clone(), vec![1, 2])
.unwrap(); // node 4
plan.append_operator_as_node(opr.clone(), vec![2, 3])
.unwrap(); // node 5
plan.append_operator_as_node(opr.clone(), vec![4, 5])
.unwrap(); // node 6
plan
}
#[test]
fn get_merge_node() {
let plan = create_logical_plan();
fn test_get_merge_node1() {
let plan = create_logical_plan1();
let merge_node = plan.get_merge_node(plan.get_node(1).unwrap());
assert_eq!(merge_node, plan.get_node(5));
let merge_node = plan.get_merge_node(plan.get_node(0).unwrap());
Expand All @@ -2766,6 +2832,28 @@ mod test {
assert!(merge_node.is_none());
}

#[test]
fn test_get_merge_node2() {
let plan = create_logical_plan2();
let merge_node = plan.get_merge_node(plan.get_node(0).unwrap());
assert_eq!(merge_node, plan.get_node(5));
// Not a branch node
let merge_node = plan.get_merge_node(plan.get_node(2).unwrap());
assert!(merge_node.is_none());
}

#[test]
fn test_get_merge_node3() {
let plan = create_logical_plan3();
let merge_node = plan.get_merge_node(plan.get_node(0).unwrap());
assert_eq!(merge_node, plan.get_node(6));
let merge_node = plan.get_merge_node(plan.get_node(2).unwrap());
assert_eq!(merge_node, plan.get_node(6));
// Not a branch node
let merge_node = plan.get_merge_node(plan.get_node(1).unwrap());
assert!(merge_node.is_none());
}

#[test]
fn merge_branch_plans() {
let opr = pb::logical_plan::Operator {
Expand All @@ -2787,20 +2875,20 @@ mod test {
let subplan2 = LogicalPlan::with_root(Node::new(2, opr.clone()));

plan.append_branch_plans(plan.get_node(0).unwrap(), vec![subplan1, subplan2]);
let mut expected_plan = create_logical_plan();
let mut expected_plan = create_logical_plan1();
expected_plan.remove_node(6);

plan.append_node(Node::new(6, opr.clone()), vec![2, 5])
.unwrap();
plan.append_node(Node::new(7, opr.clone()), vec![6])
.unwrap();

assert_eq!(plan, create_logical_plan());
assert_eq!(plan, create_logical_plan1());
}

#[test]
fn subplan() {
let plan = create_logical_plan();
let plan = create_logical_plan1();
let opr = pb::logical_plan::Operator {
opr: Some(pb::logical_plan::operator::Opr::As(pb::As { alias: None })),
};
Expand Down Expand Up @@ -2832,7 +2920,7 @@ mod test {

#[test]
fn get_branch_plans() {
let plan = create_logical_plan();
let plan = create_logical_plan1();
let (merge_node, subplans) = plan.get_branch_plans(plan.get_node(1).unwrap());
let opr = pb::logical_plan::Operator {
opr: Some(pb::logical_plan::operator::Opr::As(pb::As { alias: None })),
Expand Down

0 comments on commit 9abea6d

Please sign in to comment.