Skip to content

Commit

Permalink
[IR Runtime] Support removing columns in Record via Auxilia in IR…
Browse files Browse the repository at this point in the history
…-Runtime (#2085)

* [IR Runtime] auxilia remove tags

* minor refine

Co-authored-by: Longbin Lai <longbin.lailb@alibaba-inc.com>
  • Loading branch information
BingqingLyu and longbinlai committed Oct 1, 2022
1 parent e024c65 commit 91db033
Show file tree
Hide file tree
Showing 5 changed files with 187 additions and 6 deletions.
28 changes: 24 additions & 4 deletions interactive_engine/executor/ir/core/src/plan/physical.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,12 @@ impl AsPhysical for pb::Select {
extra: Default::default(),
};
params.predicate = self.predicate.clone();
let auxilia = pb::Auxilia { tag: tag_pb.clone(), params: Some(params), alias: tag_pb };
let auxilia = pb::Auxilia {
tag: tag_pb.clone(),
params: Some(params),
alias: tag_pb,
remove_tags: vec![],
};
pb::logical_plan::Operator::from(auxilia).add_job_builder(builder, plan_meta)
} else {
Err(IrError::MissingData(format!(
Expand Down Expand Up @@ -167,7 +172,7 @@ impl AsPhysical for pb::EdgeExpand {

fn post_process(&mut self, builder: &mut JobBuilder, plan_meta: &mut PlanMeta) -> IrResult<()> {
let mut is_adding_auxilia = false;
let mut auxilia = pb::Auxilia { tag: None, params: None, alias: None };
let mut auxilia = pb::Auxilia { tag: None, params: None, alias: None, remove_tags: vec![] };
if let Some(params) = self.params.as_mut() {
if let Some(node_meta) = plan_meta.get_curr_node_meta() {
let columns = node_meta.get_columns();
Expand Down Expand Up @@ -291,7 +296,7 @@ impl AsPhysical for pb::GetV {

fn post_process(&mut self, builder: &mut JobBuilder, plan_meta: &mut PlanMeta) -> IrResult<()> {
let mut is_adding_auxilia = false;
let mut auxilia = pb::Auxilia { tag: None, params: None, alias: None };
let mut auxilia = pb::Auxilia { tag: None, params: None, alias: None, remove_tags: vec![] };
if let Some(params) = self.params.as_mut() {
if let Some(node_meta) = plan_meta.get_curr_node_meta() {
let columns = node_meta.get_columns();
Expand Down Expand Up @@ -359,7 +364,8 @@ impl AsPhysical for pb::GetV {
impl AsPhysical for pb::As {
fn add_job_builder(&self, builder: &mut JobBuilder, plan_meta: &mut PlanMeta) -> IrResult<()> {
// Transform to `Auxilia` internally.
let auxilia = pb::Auxilia { tag: None, params: None, alias: self.alias.clone() };
let auxilia =
pb::Auxilia { tag: None, params: None, alias: self.alias.clone(), remove_tags: vec![] };
auxilia.add_job_builder(builder, plan_meta)
}
}
Expand Down Expand Up @@ -604,6 +610,7 @@ impl AsPhysical for LogicalPlan {
alias: Some(common_pb::NameOrId {
item: Some(common_pb::name_or_id::Item::Id(new_tag)),
}),
remove_tags: vec![],
}
.into(),
);
Expand Down Expand Up @@ -788,6 +795,7 @@ mod test {
extra: Default::default(),
}),
alias: None,
remove_tags: vec![],
}
}

Expand Down Expand Up @@ -944,6 +952,7 @@ mod test {
tag: None,
params: Some(query_params(vec![], vec!["age".into()])),
alias: Some(0.into()),
remove_tags: vec![],
})
.encode_to_vec(),
);
Expand All @@ -954,6 +963,7 @@ mod test {
tag: None,
params: Some(query_params(vec![], vec!["age".into()])),
alias: Some(1.into()),
remove_tags: vec![],
})
.encode_to_vec(),
);
Expand All @@ -980,6 +990,7 @@ mod test {
tag: None,
params: Some(query_params(vec![], vec!["age".into()])),
alias: Some(0.into()),
remove_tags: vec![],
})
.encode_to_vec(),
);
Expand All @@ -992,6 +1003,7 @@ mod test {
tag: None,
params: Some(query_params(vec![], vec!["age".into()])),
alias: Some(1.into()),
remove_tags: vec![],
})
.encode_to_vec(),
);
Expand Down Expand Up @@ -1026,6 +1038,7 @@ mod test {
tag: None,
params: Some(query_params(vec![], vec!["age".into(), "id".into(), "name".into()])),
alias: Some(0.into()),
remove_tags: vec![],
})
.encode_to_vec(),
);
Expand Down Expand Up @@ -1053,6 +1066,7 @@ mod test {
tag: None,
params: Some(query_params(vec![], vec!["age".into(), "id".into(), "name".into()])),
alias: Some(0.into()),
remove_tags: vec![],
})
.encode_to_vec(),
);
Expand Down Expand Up @@ -1158,6 +1172,7 @@ mod test {
tag: None,
params: Some(query_params(vec![], vec!["age".into(), "id".into(), "name".into()])),
alias: Some(0.into()),
remove_tags: vec![],
})
.encode_to_vec(),
);
Expand Down Expand Up @@ -1186,6 +1201,7 @@ mod test {
tag: None,
params: Some(query_params(vec![], vec!["age".into(), "id".into(), "name".into()])),
alias: Some(0.into()),
remove_tags: vec![],
})
.encode_to_vec(),
);
Expand Down Expand Up @@ -1248,6 +1264,7 @@ mod test {
extra: Default::default(),
}),
alias: None,
remove_tags: vec![],
})
.encode_to_vec(),
);
Expand Down Expand Up @@ -1660,6 +1677,7 @@ mod test {
extra: Default::default(),
}),
alias: None,
remove_tags: vec![],
};

expected_builder.apply_join(
Expand Down Expand Up @@ -1724,6 +1742,7 @@ mod test {
tag: None,
params: None,
alias: Some(common_pb::NameOrId { item: Some(common_pb::name_or_id::Item::Id(2)) }),
remove_tags: vec![],
}
.into(),
);
Expand Down Expand Up @@ -1802,6 +1821,7 @@ mod test {
tag: None,
params: None,
alias: Some(common_pb::NameOrId { item: Some(common_pb::name_or_id::Item::Id(2)) }),
remove_tags: vec![],
}
.into(),
);
Expand Down
146 changes: 146 additions & 0 deletions interactive_engine/executor/ir/integrated/tests/auxilia_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ mod test {
tag: None,
params: Some(query_params(vec![], vec![], None)),
alias: Some(TAG_A.into()),
remove_tags: vec![],
};

let conf = JobConf::new("auxilia_simple_alias_test");
Expand Down Expand Up @@ -110,6 +111,7 @@ mod test {
tag: None,
params: Some(query_params(vec![], vec!["name".into()], None)),
alias: None,
remove_tags: vec![],
};

let conf = JobConf::new("auxilia_get_property_test");
Expand Down Expand Up @@ -162,6 +164,7 @@ mod test {
tag: None,
params: Some(query_params(vec![], vec!["name".into()], None)),
alias: Some(TAG_A.into()),
remove_tags: vec![],
};

let conf = JobConf::new("auxilia_get_property_with_none_tag_input_test");
Expand Down Expand Up @@ -222,6 +225,7 @@ mod test {
str_to_expr_pb("@.name==\"vadas\"".to_string()).ok(),
)),
alias: None,
remove_tags: vec![],
};

let conf = JobConf::new("auxilia_filter_test");
Expand Down Expand Up @@ -278,6 +282,7 @@ mod test {
str_to_expr_pb("@.name==\"vadas\"".to_string()).ok(),
)),
alias: Some(TAG_A.into()),
remove_tags: vec![],
};

let conf = JobConf::new("auxilia_alias_test");
Expand Down Expand Up @@ -317,11 +322,13 @@ mod test {
tag: None,
params: Some(query_params(vec![], vec!["id".into()], None)),
alias: None,
remove_tags: vec![],
};
let auxilia_opr_2 = pb::Auxilia {
tag: None,
params: Some(query_params(vec![], vec!["name".into()], None)),
alias: None,
remove_tags: vec![],
};

let conf = JobConf::new("auxilia_update_test");
Expand Down Expand Up @@ -378,6 +385,7 @@ mod test {
tag: None,
params: Some(query_params(vec![], vec!["name".into()], None)),
alias: None,
remove_tags: vec![],
};

let conf = JobConf::new("auxilia_update_on_lazy_vertex_test");
Expand Down Expand Up @@ -439,6 +447,7 @@ mod test {
tag: None,
params: Some(query_params(vec![], vec!["name".into()], None)),
alias: None,
remove_tags: vec![],
};

let conf = JobConf::new("auxilia_update_on_empty_vertex_test");
Expand Down Expand Up @@ -477,4 +486,141 @@ mod test {
results.sort();
assert_eq!(expected_id_names, results)
}

// g.V().as('a').remove('a') via auxilia with remove_tag
#[test]
fn auxilia_remove_tag_test() {
let auxilia_opr =
pb::Auxilia { tag: None, params: None, alias: None, remove_tags: vec![TAG_A.into()] };

let conf = JobConf::new("auxilia_remove_tag_test");
let mut result = pegasus::run(conf, || {
let auxilia = auxilia_opr.clone();
|input, output| {
let mut stream = input.input_from(source_gen(Some(TAG_A.into())))?;
let filter_map_func = auxilia.gen_filter_map().unwrap();
stream = stream.filter_map(move |input| filter_map_func.exec(input))?;
stream.sink_into(output)
}
})
.expect("build job failure");

let mut result_count = 0;
while let Some(Ok(record)) = result.next() {
assert!(record.get(Some(TAG_A)).is_none());
result_count += 1;
}
assert_eq!(result_count, 6)
}

// g.V().as('a').out().as('b').remove('a', 'b') via auxilia with remove_tag
#[test]
fn auxilia_remove_tags_test() {
let expand_opr_pb = pb::EdgeExpand {
v_tag: None,
direction: 0,
params: None,
expand_opt: 0,
alias: Some(TAG_B.into()),
};

let auxilia_opr = pb::Auxilia {
tag: None,
params: None,
alias: None,
remove_tags: vec![TAG_A.into(), TAG_B.into()],
};

let conf = JobConf::new("auxilia_remove_tags_test");
let mut result = pegasus::run(conf, || {
let auxilia = auxilia_opr.clone();
let expand = expand_opr_pb.clone();
|input, output| {
let mut stream = input.input_from(source_gen(Some(TAG_A.into())))?;
let flatmap_func = expand.gen_flat_map().unwrap();
stream = stream.flat_map(move |input| flatmap_func.exec(input))?;
let filter_map_func = auxilia.gen_filter_map().unwrap();
stream = stream.filter_map(move |input| filter_map_func.exec(input))?;
stream.sink_into(output)
}
})
.expect("build job failure");

let mut result_count = 0;
while let Some(Ok(record)) = result.next() {
assert!(record.get(Some(TAG_A)).is_none());
assert!(record.get(Some(TAG_B)).is_none());
result_count += 1;
}
assert_eq!(result_count, 6)
}

// g.V().as('a').remove('a') via auxilia with tag='a', remove_tag='a'
#[test]
fn auxilia_tag_remove_tag_test() {
let auxilia_opr = pb::Auxilia {
tag: Some(TAG_A.into()),
params: None,
alias: None,
remove_tags: vec![TAG_A.into()],
};

let conf = JobConf::new("auxilia_tag_remove_tag_test");
let mut result = pegasus::run(conf, || {
let auxilia = auxilia_opr.clone();
|input, output| {
let mut stream = input.input_from(source_gen(Some(TAG_A.into())))?;
let filter_map_func = auxilia.gen_filter_map().unwrap();
stream = stream.filter_map(move |input| filter_map_func.exec(input))?;
stream.sink_into(output)
}
})
.expect("build job failure");

let mut result_count = 0;
while let Some(Ok(record)) = result.next() {
assert!(record.get(Some(TAG_A)).is_none());
result_count += 1;
}
assert_eq!(result_count, 6)
}

// g.V().as('a').out().as('b').remove('a') via auxilia with remove_tag
#[test]
fn auxilia_remove_some_tag_test() {
let expand_opr_pb = pb::EdgeExpand {
v_tag: None,
direction: 0,
params: None,
expand_opt: 0,
alias: Some(TAG_B.into()),
};

let auxilia_opr =
pb::Auxilia { tag: None, params: None, alias: None, remove_tags: vec![TAG_A.into()] };

let conf = JobConf::new("auxilia_remove_some_tag_test");
let mut result = pegasus::run(conf, || {
let auxilia = auxilia_opr.clone();
let expand = expand_opr_pb.clone();
|input, output| {
let mut stream = input.input_from(source_gen(Some(TAG_A.into())))?;
let flatmap_func = expand.gen_flat_map().unwrap();
stream = stream.flat_map(move |input| flatmap_func.exec(input))?;
let filter_map_func = auxilia.gen_filter_map().unwrap();
stream = stream.filter_map(move |input| filter_map_func.exec(input))?;
stream.sink_into(output)
}
})
.expect("build job failure");

let mut result_count = 0;
while let Some(Ok(record)) = result.next() {
println!("record {:?}", record);
assert!(record.get(Some(TAG_A)).is_none());
assert!(record.get(Some(TAG_B)).is_some());
result_count += 1;
}
assert_eq!(result_count, 6)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ mod test {
tag: None,
params: None,
alias: Some(common_pb::NameOrId { item: Some(common_pb::name_or_id::Item::Id(0)) }),
remove_tags: vec![],
}
.into(),
);
Expand Down Expand Up @@ -340,7 +341,8 @@ mod test {
alias: None,
};
let vertex_query_param = query_params(vec![], vec![], str_to_expr_pb("@.id == 2".to_string()).ok());
let auxilia_opr_pb = pb::Auxilia { tag: None, params: Some(vertex_query_param), alias: None };
let auxilia_opr_pb =
pb::Auxilia { tag: None, params: Some(vertex_query_param), alias: None, remove_tags: vec![] };

let conf = JobConf::new("expand_getv_test");
let mut result = pegasus::run(conf, || {
Expand Down
3 changes: 3 additions & 0 deletions interactive_engine/executor/ir/proto/algebra.proto
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,9 @@ message Auxilia {
QueryParams params = 2;
// The alias to give if any, which is especially helpful when the entity now refers to the head of the record
common.NameOrId alias = 3;
// The remove_tags refers to the entities that will not be used any longer, and thus can be removed.
// Specifically, it will remove tags in the end of auxilia. i.e., remove `tag` where auxilia is applied is also allowed.
repeated common.NameOrId remove_tags = 4;
}

message EdgeExpand {
Expand Down
Loading

0 comments on commit 91db033

Please sign in to comment.