Skip to content

Commit

Permalink
feat(interactive): support project properties of a path (#3213)
Browse files Browse the repository at this point in the history
<!--
Thanks for your contribution! please review
https://github.com/alibaba/GraphScope/blob/main/CONTRIBUTING.md before
opening an issue.
-->

## What do these changes do?

<!-- Please give a short brief about these changes. -->

As titled. We support to project property of a `path`, i.e., we project
the property of each element in `path`.

e.g., on modern graph:
```
gremlin> g.V().out("1..3", "knows").with('RESULT_OPT', 'ALL_V').values("name")
==>[marko, vadas]
==>[marko, josh]
```


## Related issue number

<!-- Are there any issues opened that will be resolved by merging this
change? -->

Fixes #3199
  • Loading branch information
BingqingLyu authored Sep 13, 2023
1 parent a805363 commit 8f06334
Show file tree
Hide file tree
Showing 8 changed files with 421 additions and 17 deletions.
11 changes: 10 additions & 1 deletion docs/interactive_engine/tinkerpop/supported_gremlin_steps.md
Original file line number Diff line number Diff line change
Expand Up @@ -578,7 +578,7 @@ The following steps are extended to denote more complex situations.
In Graph querying, expanding a multiple-hops path from a starting point is called `PathExpand`, which is commonly used in graph scenarios. In addition, there are different requirements for expanding strategies in different scenarios, i.e. it is required to output a simple path or all vertices explored along the expanding path. We introduce the with()-step to configure the corresponding behaviors of the `PathExpand`-step.
#### out()
Expand a multiple-hops path along the outgoing edges, which length is within the given range.
Expand a multiple-hops path along the outgoing edges, which length is within the given range.
Parameters: </br>
lengthRange - the lower and the upper bounds of the path length, </br> edgeLabels - the edge labels to traverse.
Expand All @@ -603,6 +603,9 @@ g.V().out("1..10", "knows")
# expand hops within the range of [1, 10) along the outgoing edges which label is `knows` or `created`,
# vertices can be duplicated and only the end vertex should be kept
g.V().out("1..10", "knows", "created")
# expand hops within the range of [1, 10) along the outgoing edges,
# and project the properties "id" and "name" of every vertex along the path
g.V().out("1..10").with('RESULT_OPT', 'ALL_V').values("name")
```
Running Example:
```bash
Expand All @@ -615,6 +618,12 @@ gremlin> g.V().out("1..3", "knows").with('RESULT_OPT', 'ALL_V_E')
gremlin> g.V().out("1..3", "knows").with('RESULT_OPT', 'END_V').endV()
==>v[2]
==>v[4]
gremlin> g.V().out("1..3", "knows").with('RESULT_OPT', 'ALL_V').values("name")
==>[marko, vadas]
==>[marko, josh]
gremlin> g.V().out("1..3", "knows").with('RESULT_OPT', 'ALL_V').valueMap("id","name")
==>{id=[[1, 2]], name=[[marko, vadas]]}
==>{id=[[1, 4]], name=[[marko, josh]]}
```
#### in()
Expand a multiple-hops path along the incoming edges, which length is within the given range.
Expand Down
4 changes: 0 additions & 4 deletions interactive_engine/executor/ir/core/src/plan/logical.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1295,10 +1295,6 @@ impl AsLogical for pb::PathExpand {
let tag_id = get_or_set_tag_id(alias, plan_meta)?;
plan_meta.set_tag_nodes(tag_id, vec![plan_meta.get_curr_node()]);
}
// PathExpand would never require adding columns
plan_meta
.curr_node_meta_mut()
.set_columns_opt(ColumnsOpt::None);

Ok(())
}
Expand Down
230 changes: 224 additions & 6 deletions interactive_engine/executor/ir/core/src/plan/physical.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,14 +236,16 @@ impl AsPhysical for pb::PathExpand {
if range.upper <= range.lower || range.lower < 0 || range.upper <= 0 {
Err(IrError::InvalidRange(range.lower, range.upper))?
}
// post_process for path_expand, including add repartition, and the properties need to cache, if necessary.
let mut path_expand = self.clone();
path_expand.post_process(builder, plan_meta)?;
// PathExpand includes cases of:
// 1) EdgeExpand(Opt=Edge) + GetV(NoFilter),
// 1) EdgeExpand(Opt=Edge) + GetV(NoFilterNorColumn),
// This would be translated into EdgeExpand(Opt=Vertex);
// 2) EdgeExpand(Opt=Edge) + GetV(WithFilter),
// 2) EdgeExpand(Opt=Edge) + GetV(WithFilterOrColumn),
// This would be translated into EdgeExpand(Opt=Vertex) + GetV(Opt=Self);
// 3) EdgeExpand(Opt=Vertex) + GetV(WithFilter and Opt=Self) TODO: would this case exist after match?
// This would be remain unchanged.
let mut path_expand = self.clone();
if let Some(expand_base) = path_expand.base.as_mut() {
let edge_expand = expand_base.edge_expand.as_mut();
let getv = expand_base.get_v.as_mut();
Expand Down Expand Up @@ -286,8 +288,6 @@ impl AsPhysical for pb::PathExpand {
edge_expand, getv
)));
}

path_expand.post_process(builder, plan_meta)?;
builder.path_expand(path_expand);

Ok(())
Expand All @@ -297,9 +297,142 @@ impl AsPhysical for pb::PathExpand {
}

fn post_process(&mut self, builder: &mut PlanBuilder, plan_meta: &mut PlanMeta) -> IrResult<()> {
post_process_vars(builder, plan_meta, false)?;
if plan_meta.is_partition() {
builder.shuffle(self.start_tag.clone());
if let Some(node_meta) = plan_meta.get_curr_node_meta() {
let columns = node_meta.get_columns();
let is_all_columns = node_meta.is_all_columns();
if !columns.is_empty() || is_all_columns {
let new_params = pb::QueryParams {
tables: vec![],
columns: columns
.clone()
.into_iter()
.map(|column| column.into())
.collect(),
is_all_columns,
limit: None,
predicate: None,
sample_ratio: 1.0,
extra: Default::default(),
};
// Notice that, when properties of a `Path` is needed, we need to cache the properties of the vertices/edges in the path.
// For example, `g.V().out("1..3").with("RESULT_OPT, "ALL_V").values("name")`, we need to cache the property of "name" in all the vertices in the path.
// If "RESULT_OPT" is "ALL_V_E", we assume the property of the edges in the path is also needed.

// first, cache properties on the path start vertex.
let start_auxilia = pb::GetV {
tag: self.start_tag.clone(),
opt: 4, //ItSelf
params: Some(new_params.clone()),
alias: self.start_tag.clone(),
meta_data: None,
};
builder.get_v(start_auxilia);

// then, cache properties during the path expanding.
let result_opt: pb::path_expand::ResultOpt =
unsafe { std::mem::transmute(self.result_opt) };
let expand_base = self
.base
.as_mut()
.ok_or(IrError::MissingData("PathExpand::base".to_string()))?;
let getv = expand_base.get_v.as_mut();
let edge_expand = expand_base
.edge_expand
.as_mut()
.ok_or(IrError::MissingData("PathExpand::base.edge_expand".to_string()))?;
match result_opt {
pb::path_expand::ResultOpt::EndV => {
// do nothing
}
// if the result_opt is ALL_V or ALL_V_E, we need to cache the properties of the vertices, or vertices and edges, in the path.
pb::path_expand::ResultOpt::AllV => {
if let Some(getv) = getv {
// case 1:expand (edge) + getv, then cache properties in getv
if let Some(params) = getv.params.as_mut() {
params.columns = columns
.clone()
.into_iter()
.map(|column| column.into())
.collect();
params.is_all_columns = is_all_columns;
} else {
getv.params = Some(new_params.clone());
}
} else {
// case 2: expand (vertex) + no getv, then cache properties with an extra getv (self)
if edge_expand.expand_opt != pb::edge_expand::ExpandOpt::Vertex as i32 {
return Err(IrError::ParsePbError(
format!("Unexpected ExpandBase in PathExpand {:?}", expand_base)
.into(),
));
}

let auxilia = pb::GetV {
tag: None,
opt: 4, //ItSelf
params: Some(new_params.clone()),
alias: edge_expand.alias.clone(),
meta_data: edge_expand.meta_data.clone(),
};
expand_base.get_v = Some(auxilia);
}
}
pb::path_expand::ResultOpt::AllVE => {
if let Some(getv) = getv {
// case 1:expand (edge) + getv, then cache properties in both expand and getv.
if let Some(params) = getv.params.as_mut() {
params.columns = columns
.clone()
.into_iter()
.map(|column| column.into())
.collect();
params.is_all_columns = is_all_columns;
} else {
getv.params = Some(new_params.clone());
}
if let Some(params) = edge_expand.params.as_mut() {
params.columns = columns
.clone()
.into_iter()
.map(|column| column.into())
.collect();
params.is_all_columns = is_all_columns;
} else {
edge_expand.params = Some(new_params.clone());
}
} else {
// case 2: expand (vertex) + no getv, then cache properties of edges in expand, and properties of vertices with an extra getv (self)
if edge_expand.expand_opt != pb::edge_expand::ExpandOpt::Vertex as i32 {
return Err(IrError::ParsePbError(
format!("Unexpected ExpandBase in PathExpand {:?}", expand_base)
.into(),
));
}
if let Some(params) = edge_expand.params.as_mut() {
params.columns = columns
.clone()
.into_iter()
.map(|column| column.into())
.collect();
params.is_all_columns = is_all_columns;
} else {
edge_expand.params = Some(new_params.clone());
}
let auxilia = pb::GetV {
tag: None,
opt: 4, //ItSelf
params: Some(new_params.clone()),
alias: edge_expand.alias.clone(),
meta_data: edge_expand.meta_data.clone(),
};
expand_base.get_v = Some(auxilia);
}
}
}
}
}
}
Ok(())
}
Expand Down Expand Up @@ -2917,4 +3050,89 @@ mod test {

assert_eq!(builder, expected_builder);
}

#[test]
fn path_expand_project_as_physical() {
let source_opr = pb::Scan {
scan_opt: 0,
alias: None,
params: Some(query_params(vec!["person".into()], vec![])),
idx_predicate: None,
meta_data: None,
};

let edge_expand = pb::EdgeExpand {
v_tag: None,
direction: 0,
params: Some(query_params(vec!["knows".into()], vec![])),
expand_opt: 0, // vertex
alias: None,
meta_data: None,
};

let path_opr = pb::PathExpand {
base: Some(edge_expand.clone().into()),
start_tag: None,
alias: None,
hop_range: Some(pb::Range { lower: 1, upper: 4 }),
path_opt: 0, // ARBITRARY
result_opt: 1, // ALL_V
condition: None,
};

let project_opr = pb::Project {
mappings: vec![ExprAlias {
expr: Some(str_to_expr_pb("@.name".to_string()).unwrap()),
alias: None,
}],
is_append: true,
meta_data: vec![],
};

let mut logical_plan = LogicalPlan::with_node(Node::new(0, source_opr.clone().into()));
logical_plan
.append_operator_as_node(path_opr.clone().into(), vec![0])
.unwrap(); // node 1
logical_plan
.append_operator_as_node(project_opr.clone().into(), vec![1])
.unwrap(); // node 2

// Case without partition
let mut builder = PlanBuilder::default();
let mut plan_meta = logical_plan.get_meta().clone();
logical_plan
.add_job_builder(&mut builder, &mut plan_meta)
.unwrap();

let mut expected_builder = PlanBuilder::default();
expected_builder.add_scan_source(source_opr.clone());
expected_builder.path_expand(path_opr.clone());
expected_builder.project(project_opr.clone());

assert_eq!(builder, expected_builder);

// Case with partition
let mut builder = PlanBuilder::default();
let mut plan_meta = logical_plan.get_meta().clone().with_partition();
logical_plan
.add_job_builder(&mut builder, &mut plan_meta)
.unwrap();

// translate `PathExpand(out("knows"))` to `auxilia("name") + PathExpand() with ExpandBase of out("knows")+auxilia("name")`
let mut path_expand = path_opr.clone();
path_expand.base.as_mut().unwrap().get_v =
Some(build_auxilia_with_tag_alias_columns(None, None, vec!["name".into()]));

let mut expected_builder = PlanBuilder::default();
expected_builder.add_scan_source(source_opr);
expected_builder.shuffle(None);
// post process for path expand: 1. cache properties of path start vertex; 2.
expected_builder.get_v(build_auxilia_with_tag_alias_columns(None, None, vec!["name".into()]));
expected_builder.path_expand(path_expand);
// postprocess for project
expected_builder.shuffle(None);
expected_builder.get_v(build_auxilia_with_tag_alias_columns(None, None, vec![]));
expected_builder.project(project_opr);
assert_eq!(builder, expected_builder);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use pegasus_common::downcast::AsAny;
use pegasus_common::impl_as_any;

use crate::apis::{Edge, Element, GraphElement, PropertyValue, Vertex, ID};
use crate::utils::expr::eval::Context;

#[derive(Clone, Debug, Hash, PartialEq, PartialOrd)]
pub enum VertexOrEdge {
Expand Down Expand Up @@ -219,6 +220,12 @@ impl GraphElement for VertexOrEdge {
}
}

impl Context<VertexOrEdge> for VertexOrEdge {
fn get(&self, _tag: Option<&NameOrId>) -> Option<&VertexOrEdge> {
Some(&self)
}
}

impl Element for GraphPath {
fn as_graph_element(&self) -> Option<&dyn GraphElement> {
Some(self)
Expand Down Expand Up @@ -256,11 +263,34 @@ impl GraphElement for GraphPath {
}

fn get_property(&self, key: &NameOrId) -> Option<PropertyValue> {
self.get_path_end().get_property(key)
match self {
GraphPath::AllPath(path) | GraphPath::SimpleAllPath(path) => {
let mut properties = vec![];
for v_or_e in path {
if let Some(p) = v_or_e.get_property(key) {
properties.push(p.try_to_owned().unwrap());
}
}
Some(PropertyValue::Owned(Object::Vector(properties)))
}

GraphPath::EndV((v_or_e, _)) | GraphPath::SimpleEndV((v_or_e, _, _)) => {
v_or_e.get_property(key)
}
}
}

fn get_all_properties(&self) -> Option<HashMap<NameOrId, Object>> {
self.get_path_end().get_all_properties()
match self {
GraphPath::AllPath(_) | GraphPath::SimpleAllPath(_) => {
// not supported yet.
None
}

GraphPath::EndV((v_or_e, _)) | GraphPath::SimpleEndV((v_or_e, _, _)) => {
v_or_e.get_all_properties()
}
}
}
}

Expand Down
Loading

0 comments on commit 8f06334

Please sign in to comment.