diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/runtime/ffi/Utils.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/runtime/ffi/Utils.java index c607cf580926..ec05223f0b1b 100644 --- a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/runtime/ffi/Utils.java +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/runtime/ffi/Utils.java @@ -112,11 +112,13 @@ public static final PathOpt ffiPathOpt(GraphOpt.PathExpandPath opt) { public static final ResultOpt ffiResultOpt(GraphOpt.PathExpandResult opt) { switch (opt) { - case EndV: + case END_V: return ResultOpt.EndV; - case AllV: - default: + case ALL_V: return ResultOpt.AllV; + case ALL_V_E: + default: + return ResultOpt.AllVE; } } diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/tools/config/GraphOpt.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/tools/config/GraphOpt.java index be3c0612ac75..00464dcc20ba 100644 --- a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/tools/config/GraphOpt.java +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/tools/config/GraphOpt.java @@ -49,7 +49,8 @@ public enum PathExpandPath { } public enum PathExpandResult { - EndV, - AllV + END_V, + ALL_V, + ALL_V_E } } diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/tools/config/PathExpandConfig.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/tools/config/PathExpandConfig.java index c8f7eb42696a..5fcb3d7e8cee 100644 --- a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/tools/config/PathExpandConfig.java +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/tools/config/PathExpandConfig.java @@ -134,7 +134,7 @@ protected Builder(GraphBuilder innerBuilder) { (GraphOptCluster) innerBuilder.getCluster(), innerBuilder.getRelOptSchema()); this.pathOpt = GraphOpt.PathExpandPath.ARBITRARY; - this.resultOpt = GraphOpt.PathExpandResult.EndV; + this.resultOpt = GraphOpt.PathExpandResult.END_V; } public Builder expand(ExpandConfig config) { diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/jna/type/ResultOpt.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/jna/type/ResultOpt.java index 895179007575..47964c3822af 100644 --- a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/jna/type/ResultOpt.java +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/jna/type/ResultOpt.java @@ -20,7 +20,8 @@ public enum ResultOpt implements IntEnum { EndV, - AllV; + AllV, + AllVE; @Override public int getInt() { diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/plugin/step/PathExpandStep.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/plugin/step/PathExpandStep.java index 7f7372dea794..5e87bba24d28 100644 --- a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/plugin/step/PathExpandStep.java +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/plugin/step/PathExpandStep.java @@ -97,13 +97,14 @@ public void configure(final Object... keyValues) { + " insensitive)"); } } else if (key.equals("ResultOpt")) { - if (value.equals("AllV") || value.equals("EndV")) { + if (value.equals("AllV") || value.equals("EndV") || value.equals("AllVE")) { this.resultOpt = ResultOpt.valueOf(value); } else { throw new ExtendGremlinStepException( "value " + originalVal - + " is invalid, use ALL_V or END_V instead (case insensitive)"); + + " is invalid, use ALL_V, END_V, ALL_VE instead (case" + + " insensitive)"); } } else if (key.equals("Until")) { this.untilCondition = ObjectUtils.requireNonEmpty(originalVal); diff --git a/interactive_engine/compiler/src/test/java/com/alibaba/graphscope/common/ir/ExpandTest.java b/interactive_engine/compiler/src/test/java/com/alibaba/graphscope/common/ir/ExpandTest.java index 535506284d91..fab09a6f9e71 100644 --- a/interactive_engine/compiler/src/test/java/com/alibaba/graphscope/common/ir/ExpandTest.java +++ b/interactive_engine/compiler/src/test/java/com/alibaba/graphscope/common/ir/ExpandTest.java @@ -94,7 +94,7 @@ public void expand_3_test() { pxdBuilder.literal(10))) .range(1, 3) .pathOpt(GraphOpt.PathExpandPath.SIMPLE) - .resultOpt(GraphOpt.PathExpandResult.AllV) + .resultOpt(GraphOpt.PathExpandResult.ALL_V) .build(); RelNode pathExpand = builder.source( @@ -108,7 +108,7 @@ public void expand_3_test() { + " tables=[knows]}], alias=[DEFAULT], opt=[OUT])\n" + "], getV=[GraphLogicalGetV(tableConfig=[{isAll=false, tables=[person]}]," + " alias=[DEFAULT], fusedFilter=[[=(DEFAULT.age, 10)]], opt=[END])\n" - + "], offset=[1], fetch=[3], path_opt=[SIMPLE], result_opt=[AllV]," + + "], offset=[1], fetch=[3], path_opt=[SIMPLE], result_opt=[ALL_V]," + " alias=[DEFAULT])\n" + " GraphLogicalSource(tableConfig=[{isAll=false, tables=[person]}]," + " alias=[DEFAULT], opt=[VERTEX])", diff --git a/interactive_engine/compiler/src/test/java/com/alibaba/graphscope/common/ir/plan/HepPlannerTest.java b/interactive_engine/compiler/src/test/java/com/alibaba/graphscope/common/ir/plan/HepPlannerTest.java index 75b9b402bcc3..5ac6cf80038b 100644 --- a/interactive_engine/compiler/src/test/java/com/alibaba/graphscope/common/ir/plan/HepPlannerTest.java +++ b/interactive_engine/compiler/src/test/java/com/alibaba/graphscope/common/ir/plan/HepPlannerTest.java @@ -66,7 +66,7 @@ public void push_filter_2_test() { .getV(getVConfig) .range(1, 3) .pathOpt(GraphOpt.PathExpandPath.SIMPLE) - .resultOpt(GraphOpt.PathExpandResult.AllV) + .resultOpt(GraphOpt.PathExpandResult.ALL_V) .build(); RelNode sentence = builder.source( @@ -96,7 +96,7 @@ public void push_filter_2_test() { + " tables=[knows]}], alias=[DEFAULT], opt=[OUT])\n" + "], getV=[GraphLogicalGetV(tableConfig=[{isAll=false, tables=[person]}]," + " alias=[DEFAULT], opt=[END])\n" - + "], offset=[1], fetch=[3], path_opt=[SIMPLE], result_opt=[AllV]," + + "], offset=[1], fetch=[3], path_opt=[SIMPLE], result_opt=[ALL_V]," + " alias=[DEFAULT])\n" + " GraphLogicalSource(tableConfig=[{isAll=false, tables=[person]}]," + " alias=[x], opt=[VERTEX])\n" diff --git a/interactive_engine/compiler/src/test/java/com/alibaba/graphscope/common/ir/runtime/FfiLogicalPlanTest.java b/interactive_engine/compiler/src/test/java/com/alibaba/graphscope/common/ir/runtime/FfiLogicalPlanTest.java index 50be56a1aa23..1b1e0e85cdbc 100644 --- a/interactive_engine/compiler/src/test/java/com/alibaba/graphscope/common/ir/runtime/FfiLogicalPlanTest.java +++ b/interactive_engine/compiler/src/test/java/com/alibaba/graphscope/common/ir/runtime/FfiLogicalPlanTest.java @@ -47,7 +47,7 @@ public void logical_plan_test() throws Exception { .getV(getVConfig) .range(1, 3) .pathOpt(GraphOpt.PathExpandPath.SIMPLE) - .resultOpt(GraphOpt.PathExpandResult.AllV) + .resultOpt(GraphOpt.PathExpandResult.ALL_V) .build(); RelNode node = builder.source( @@ -77,7 +77,7 @@ public void logical_plan_test() throws Exception { + " tables=[knows]}], alias=[DEFAULT], opt=[OUT])\n" + "], getV=[GraphLogicalGetV(tableConfig=[{isAll=false, tables=[person]}]," + " alias=[DEFAULT], opt=[END])\n" - + "], offset=[1], fetch=[3], path_opt=[SIMPLE], result_opt=[AllV]," + + "], offset=[1], fetch=[3], path_opt=[SIMPLE], result_opt=[ALL_V]," + " alias=[DEFAULT])\n" + " GraphLogicalSource(tableConfig=[{isAll=false, tables=[person]}]," + " alias=[x], opt=[VERTEX])\n" diff --git a/interactive_engine/compiler/src/test/java/com/alibaba/graphscope/cypher/antlr4/MatchTest.java b/interactive_engine/compiler/src/test/java/com/alibaba/graphscope/cypher/antlr4/MatchTest.java index f1f9c0f0d4a4..b0580e7b2237 100644 --- a/interactive_engine/compiler/src/test/java/com/alibaba/graphscope/cypher/antlr4/MatchTest.java +++ b/interactive_engine/compiler/src/test/java/com/alibaba/graphscope/cypher/antlr4/MatchTest.java @@ -92,7 +92,7 @@ public void match_4_test() { + " 1.0E0)]], opt=[OUT])\n" + "], getV=[GraphLogicalGetV(tableConfig=[{isAll=true, tables=[software," + " person]}], alias=[DEFAULT], opt=[END])\n" - + "], offset=[1], fetch=[2], path_opt=[ARBITRARY], result_opt=[EndV]," + + "], offset=[1], fetch=[2], path_opt=[ARBITRARY], result_opt=[END_V]," + " alias=[b])\n" + " GraphLogicalSource(tableConfig=[{isAll=false, tables=[person]}]," + " alias=[a], opt=[VERTEX])\n" diff --git a/interactive_engine/compiler/src/test/java/com/alibaba/graphscope/gremlin/PathExpandStepTest.java b/interactive_engine/compiler/src/test/java/com/alibaba/graphscope/gremlin/PathExpandStepTest.java index 2a1eca13878f..578fcacecbac 100644 --- a/interactive_engine/compiler/src/test/java/com/alibaba/graphscope/gremlin/PathExpandStepTest.java +++ b/interactive_engine/compiler/src/test/java/com/alibaba/graphscope/gremlin/PathExpandStepTest.java @@ -59,7 +59,7 @@ public void g_V_path_expand_label_test() { Assert.assertEquals("knows", label.name); } - // g.V().out("1..2").with("Path_Opt", "Simple").with("Result_Opt", "AllV") + // g.V().out("1..2").with("Path_Opt", "Simple").with("Result_Opt", "ALL_V") @Test public void g_V_path_expand_with_test() { Traversal traversal = diff --git a/interactive_engine/compiler/src/test/java/com/alibaba/graphscope/gremlin/antlr4/PositiveEvalTest.java b/interactive_engine/compiler/src/test/java/com/alibaba/graphscope/gremlin/antlr4/PositiveEvalTest.java index d8234a1d50c6..942c546a4888 100644 --- a/interactive_engine/compiler/src/test/java/com/alibaba/graphscope/gremlin/antlr4/PositiveEvalTest.java +++ b/interactive_engine/compiler/src/test/java/com/alibaba/graphscope/gremlin/antlr4/PositiveEvalTest.java @@ -928,7 +928,7 @@ public void g_V_has_notEndingWith_test() { eval("g.V().has(\"name\", TextP.notEndingWith(\"marko\"))")); } - // g.V().as("a").select("a").by(out("1..2").with("Result_Opt", AllV).count()) + // g.V().as("a").select("a").by(out("1..2").with("Result_Opt", ALL_V).count()) @Test public void g_V_as_select_a_by_out_1_2_endV_count_test() { Assert.assertEquals( diff --git a/interactive_engine/executor/ir/core/src/plan/ffi.rs b/interactive_engine/executor/ir/core/src/plan/ffi.rs index faf3d1479ca3..0b12ba916cd2 100644 --- a/interactive_engine/executor/ir/core/src/plan/ffi.rs +++ b/interactive_engine/executor/ir/core/src/plan/ffi.rs @@ -2175,6 +2175,7 @@ mod graph { pub enum PathResultOpt { EndV = 0, AllV = 1, + AllVE = 2, } /// To initialize an path expand operator from an edge_expand base diff --git a/interactive_engine/executor/ir/core/src/plan/physical.rs b/interactive_engine/executor/ir/core/src/plan/physical.rs index 80136a12c9b2..df34b16566f6 100644 --- a/interactive_engine/executor/ir/core/src/plan/physical.rs +++ b/interactive_engine/executor/ir/core/src/plan/physical.rs @@ -316,6 +316,7 @@ impl AsPhysical for pb::PathExpand { // 1. the previous op is ExpandE, and with no alias (which means that the edges won't be accessed later). // 2. `GetV` is GetV(Adj) (i.e., opt=Start/End/Other) without any filters or further query semantics. // 3. the direction should be: outE + inV = out; inE + outV = in; and bothE + otherV = both +// In addition, if PathExpand + GetV, make opt of GetV to be `End`. fn build_and_try_fuse_get_v(builder: &mut JobBuilder, mut get_v: pb::GetV) -> IrResult<()> { if get_v.opt == 4 { return Err(IrError::Unsupported("Try to fuse GetV with Opt=Self into ExpandE".to_string())); @@ -352,6 +353,9 @@ fn build_and_try_fuse_get_v(builder: &mut JobBuilder, mut get_v: pb::GetV) -> Ir return Ok(()); } } + } else if let physical_pb::physical_opr::operator::OpKind::Path(ref _path) = op_kind { + // make opt of getV after path expand as End. + get_v.opt = unsafe { std::mem::transmute(physical_pb::get_v::VOpt::End) }; } } builder.get_v(get_v); diff --git a/interactive_engine/executor/ir/graph_proxy/src/apis/graph/element/edge.rs b/interactive_engine/executor/ir/graph_proxy/src/apis/graph/element/edge.rs index 5ff4116eeb94..cdb42110ea45 100644 --- a/interactive_engine/executor/ir/graph_proxy/src/apis/graph/element/edge.rs +++ b/interactive_engine/executor/ir/graph_proxy/src/apis/graph/element/edge.rs @@ -31,7 +31,7 @@ use pegasus_common::impl_as_any; use crate::apis::{read_id, write_id, Details, DynDetails, Element, GraphElement, PropertyValue, ID}; use crate::utils::expr::eval::Context; -#[derive(Clone, Debug)] +#[derive(Clone, Debug, Default)] pub struct Edge { id: ID, label: Option, diff --git a/interactive_engine/executor/ir/graph_proxy/src/apis/graph/element/mod.rs b/interactive_engine/executor/ir/graph_proxy/src/apis/graph/element/mod.rs index 25f9ea946df3..4096ce629e0b 100644 --- a/interactive_engine/executor/ir/graph_proxy/src/apis/graph/element/mod.rs +++ b/interactive_engine/executor/ir/graph_proxy/src/apis/graph/element/mod.rs @@ -17,7 +17,7 @@ use ahash::HashMap; use dyn_type::{BorrowObject, Object}; pub use edge::Edge; use ir_common::{LabelId, NameOrId}; -pub use path::GraphPath; +pub use path::{GraphPath, VertexOrEdge}; pub use property::{Details, DynDetails, PropKey, PropertyValue}; pub use vertex::Vertex; diff --git a/interactive_engine/executor/ir/graph_proxy/src/apis/graph/element/path.rs b/interactive_engine/executor/ir/graph_proxy/src/apis/graph/element/path.rs index 61710cc19cfb..bf5de3e3bfba 100644 --- a/interactive_engine/executor/ir/graph_proxy/src/apis/graph/element/path.rs +++ b/interactive_engine/executor/ir/graph_proxy/src/apis/graph/element/path.rs @@ -13,7 +13,6 @@ //! See the License for the specific language governing permissions and //! limitations under the License. -use std::any::Any; use std::cmp::Ordering; use std::convert::{TryFrom, TryInto}; use std::hash::{Hash, Hasher}; @@ -21,51 +20,91 @@ use std::hash::{Hash, Hasher}; use ahash::HashMap; use dyn_type::{BorrowObject, Object}; use ir_common::error::ParsePbError; -use ir_common::generated::algebra::path_expand::PathOpt; -use ir_common::generated::algebra::path_expand::ResultOpt; +use ir_common::generated::algebra as pb; use ir_common::generated::results as result_pb; use ir_common::{LabelId, NameOrId}; use pegasus::codec::{Decode, Encode, ReadExt, WriteExt}; -use pegasus_common::downcast::*; +use pegasus_common::downcast::Any; +use pegasus_common::downcast::AsAny; use pegasus_common::impl_as_any; -use crate::apis::{Element, GraphElement, PropertyValue, Vertex, ID}; +use crate::apis::{Edge, Element, GraphElement, PropertyValue, Vertex, ID}; + +#[derive(Clone, Debug, Hash, PartialEq, PartialOrd)] +pub enum VertexOrEdge { + V(Vertex), + E(Edge), +} + +impl From for VertexOrEdge { + fn from(v: Vertex) -> Self { + Self::V(v) + } +} + +impl From for VertexOrEdge { + fn from(e: Edge) -> Self { + Self::E(e) + } +} + +impl VertexOrEdge { + pub fn as_vertex(&self) -> Option<&Vertex> { + match self { + VertexOrEdge::V(v) => Some(v), + _ => None, + } + } + + pub fn as_edge(&self) -> Option<&Edge> { + match self { + VertexOrEdge::E(e) => Some(e), + _ => None, + } + } +} #[derive(Clone, Debug)] pub enum GraphPath { - AllV(Vec), - SimpleAllV(Vec), - EndV((Vertex, usize)), - SimpleEndV((Vertex, Vec)), + /// Arbitrary path, which may contain both vertices and edges, or only vertices. + AllPath(Vec), + /// Simple path, which may contains both vertices and edges, or only vertices. + SimpleAllPath(Vec), + /// Arbitrary path with only end vertices preserved, which may contain both vertices and edges, or only vertices. + EndV((VertexOrEdge, usize)), + /// Simple path with only end vertex preserved, which may contains both vertices and edges, or only vertices. + SimpleEndV((VertexOrEdge, Vec)), } -impl_as_any!(GraphPath); - impl GraphPath { - pub fn new(entry: Vertex, path_opt: PathOpt, result_opt: ResultOpt) -> Self { + pub fn new>( + entry: E, path_opt: pb::path_expand::PathOpt, result_opt: pb::path_expand::ResultOpt, + ) -> Self { match result_opt { - ResultOpt::EndV => match path_opt { - PathOpt::Arbitrary => GraphPath::EndV((entry, 1)), - PathOpt::Simple => { + pb::path_expand::ResultOpt::EndV => match path_opt { + pb::path_expand::PathOpt::Arbitrary => GraphPath::EndV((entry.into(), 1)), + pb::path_expand::PathOpt::Simple => { + let entry = entry.into(); let id = entry.id(); GraphPath::SimpleEndV((entry, vec![id])) } }, - ResultOpt::AllV => match path_opt { - PathOpt::Arbitrary => GraphPath::AllV(vec![entry]), - PathOpt::Simple => GraphPath::SimpleAllV(vec![entry]), + pb::path_expand::ResultOpt::AllV | pb::path_expand::ResultOpt::AllVE => match path_opt { + pb::path_expand::PathOpt::Arbitrary => GraphPath::AllPath(vec![entry.into()]), + pb::path_expand::PathOpt::Simple => GraphPath::SimpleAllPath(vec![entry.into()]), }, } } // append an entry and return the flag of whether the entry has been appended or not. - pub fn append(&mut self, entry: Vertex) -> bool { + pub fn append>(&mut self, entry: E) -> bool { match self { - GraphPath::AllV(ref mut path) => { - path.push(entry); + GraphPath::AllPath(ref mut path) => { + path.push(entry.into()); true } - GraphPath::SimpleAllV(ref mut path) => { + GraphPath::SimpleAllPath(ref mut path) => { + let entry = entry.into(); if path.contains(&entry) { false } else { @@ -74,37 +113,91 @@ impl GraphPath { } } GraphPath::EndV((ref mut e, ref mut weight)) => { - *e = entry; + *e = entry.into(); *weight += 1; true } GraphPath::SimpleEndV((ref mut e, ref mut path)) => { + let entry = entry.into(); if path.contains(&entry.id()) { false } else { path.push(entry.id()); - *e = entry; + *e = entry.into(); true } } } } - pub fn get_path_end(&self) -> &Vertex { + pub fn get_path_end(&self) -> &VertexOrEdge { match self { - GraphPath::AllV(ref p) | GraphPath::SimpleAllV(ref p) => p.last().unwrap(), + GraphPath::AllPath(ref p) | GraphPath::SimpleAllPath(ref p) => p.last().unwrap(), GraphPath::EndV((ref e, _)) | GraphPath::SimpleEndV((ref e, _)) => e, } } - pub fn take_path(self) -> Option> { + pub fn take_path(self) -> Option> { match self { - GraphPath::AllV(p) | GraphPath::SimpleAllV(p) => Some(p), + GraphPath::AllPath(p) | GraphPath::SimpleAllPath(p) => Some(p), GraphPath::EndV(_) | GraphPath::SimpleEndV(_) => None, } } } +impl Element for VertexOrEdge { + fn as_graph_element(&self) -> Option<&dyn GraphElement> { + match self { + VertexOrEdge::V(v) => v.as_graph_element(), + VertexOrEdge::E(e) => e.as_graph_element(), + } + } + + fn len(&self) -> usize { + match self { + VertexOrEdge::V(v) => v.len(), + VertexOrEdge::E(e) => e.len(), + } + } + + fn as_borrow_object(&self) -> BorrowObject { + match self { + VertexOrEdge::V(v) => v.as_borrow_object(), + VertexOrEdge::E(e) => e.as_borrow_object(), + } + } +} + +impl GraphElement for VertexOrEdge { + fn id(&self) -> ID { + match self { + VertexOrEdge::V(v) => v.id(), + VertexOrEdge::E(e) => e.id(), + } + } + + fn label(&self) -> Option { + match self { + VertexOrEdge::V(v) => v.label(), + VertexOrEdge::E(e) => e.label(), + } + } + + fn get_property(&self, key: &NameOrId) -> Option { + match self { + VertexOrEdge::V(v) => v.get_property(key), + VertexOrEdge::E(e) => e.get_property(key), + } + } + + fn get_all_properties(&self) -> Option> { + match self { + VertexOrEdge::V(v) => v.get_all_properties(), + VertexOrEdge::E(e) => e.get_all_properties(), + } + } +} + impl Element for GraphPath { fn as_graph_element(&self) -> Option<&dyn GraphElement> { Some(self) @@ -113,7 +206,7 @@ impl Element for GraphPath { // the path len is the number of edges in the path; fn len(&self) -> usize { match self { - GraphPath::AllV(p) | GraphPath::SimpleAllV(p) => p.len() - 1, + GraphPath::AllPath(p) | GraphPath::SimpleAllPath(p) => p.len() - 1, GraphPath::EndV((_, weight)) => *weight - 1, GraphPath::SimpleEndV((_, p)) => p.len() - 1, } @@ -149,10 +242,10 @@ impl PartialEq for GraphPath { fn eq(&self, other: &Self) -> bool { // We define eq by structure, ignoring path weight match (self, other) { - (GraphPath::AllV(p1), GraphPath::AllV(p2)) - | (GraphPath::AllV(p1), GraphPath::SimpleAllV(p2)) - | (GraphPath::SimpleAllV(p1), GraphPath::AllV(p2)) - | (GraphPath::SimpleAllV(p1), GraphPath::SimpleAllV(p2)) => p1.eq(p2), + (GraphPath::AllPath(p1), GraphPath::AllPath(p2)) + | (GraphPath::AllPath(p1), GraphPath::SimpleAllPath(p2)) + | (GraphPath::SimpleAllPath(p1), GraphPath::AllPath(p2)) + | (GraphPath::SimpleAllPath(p1), GraphPath::SimpleAllPath(p2)) => p1.eq(p2), (GraphPath::EndV((p1, _)), GraphPath::EndV((p2, _))) | (GraphPath::EndV((p1, _)), GraphPath::SimpleEndV((p2, _))) | (GraphPath::SimpleEndV((p1, _)), GraphPath::EndV((p2, _))) @@ -161,15 +254,14 @@ impl PartialEq for GraphPath { } } } - impl PartialOrd for GraphPath { // We define partial_cmp by structure, ignoring path weight fn partial_cmp(&self, other: &Self) -> Option { match (self, other) { - (GraphPath::AllV(p1), GraphPath::AllV(p2)) - | (GraphPath::AllV(p1), GraphPath::SimpleAllV(p2)) - | (GraphPath::SimpleAllV(p1), GraphPath::AllV(p2)) - | (GraphPath::SimpleAllV(p1), GraphPath::SimpleAllV(p2)) => p1.partial_cmp(p2), + (GraphPath::AllPath(p1), GraphPath::AllPath(p2)) + | (GraphPath::AllPath(p1), GraphPath::SimpleAllPath(p2)) + | (GraphPath::SimpleAllPath(p1), GraphPath::AllPath(p2)) + | (GraphPath::SimpleAllPath(p1), GraphPath::SimpleAllPath(p2)) => p1.partial_cmp(p2), (GraphPath::EndV((p1, _)), GraphPath::EndV((p2, _))) | (GraphPath::EndV((p1, _)), GraphPath::SimpleEndV((p2, _))) | (GraphPath::SimpleEndV((p1, _)), GraphPath::EndV((p2, _))) @@ -179,10 +271,43 @@ impl PartialOrd for GraphPath { } } +impl Encode for VertexOrEdge { + fn write_to(&self, writer: &mut W) -> std::io::Result<()> { + match self { + VertexOrEdge::V(v) => { + writer.write_u8(0)?; + v.write_to(writer)?; + } + VertexOrEdge::E(e) => { + writer.write_u8(1)?; + e.write_to(writer)?; + } + } + Ok(()) + } +} + +impl Decode for VertexOrEdge { + fn read_from(reader: &mut R) -> std::io::Result { + let e = reader.read_u8()?; + match e { + 0 => { + let v = ::read_from(reader)?; + Ok(VertexOrEdge::V(v)) + } + 1 => { + let e = ::read_from(reader)?; + Ok(VertexOrEdge::E(e)) + } + _ => Err(std::io::Error::new(std::io::ErrorKind::Other, "unreachable")), + } + } +} + impl Encode for GraphPath { fn write_to(&self, writer: &mut W) -> std::io::Result<()> { match self { - GraphPath::AllV(path) => { + GraphPath::AllPath(path) => { writer.write_u8(0)?; path.write_to(writer)?; } @@ -191,7 +316,7 @@ impl Encode for GraphPath { path_end.write_to(writer)?; writer.write_u64(*weight as u64)?; } - GraphPath::SimpleAllV(path) => { + GraphPath::SimpleAllPath(path) => { writer.write_u8(2)?; path.write_to(writer)?; } @@ -210,20 +335,20 @@ impl Decode for GraphPath { let opt = reader.read_u8()?; match opt { 0 => { - let path = >::read_from(reader)?; - Ok(GraphPath::AllV(path)) + let path = >::read_from(reader)?; + Ok(GraphPath::AllPath(path)) } 1 => { - let vertex_or_edge = ::read_from(reader)?; + let vertex_or_edge = ::read_from(reader)?; let weight = ::read_from(reader)? as usize; Ok(GraphPath::EndV((vertex_or_edge, weight))) } 2 => { - let path = >::read_from(reader)?; - Ok(GraphPath::SimpleAllV(path)) + let path = >::read_from(reader)?; + Ok(GraphPath::SimpleAllPath(path)) } 3 => { - let vertex_or_edge = ::read_from(reader)?; + let vertex_or_edge = ::read_from(reader)?; let path = >::read_from(reader)?; Ok(GraphPath::SimpleEndV((vertex_or_edge, path))) } @@ -232,7 +357,7 @@ impl Decode for GraphPath { } } -impl TryFrom for Vertex { +impl TryFrom for VertexOrEdge { type Error = ParsePbError; fn try_from(e: result_pb::graph_path::VertexOrEdge) -> Result { let vertex_or_edge = e @@ -241,10 +366,11 @@ impl TryFrom for Vertex { match vertex_or_edge { result_pb::graph_path::vertex_or_edge::Inner::Vertex(v) => { let vertex = v.try_into()?; - Ok(vertex) + Ok(VertexOrEdge::V(vertex)) } - result_pb::graph_path::vertex_or_edge::Inner::Edge(_) => { - Err(ParsePbError::Unsupported("Path with edges".to_string())) + result_pb::graph_path::vertex_or_edge::Inner::Edge(e) => { + let edge = e.try_into()?; + Ok(VertexOrEdge::E(edge)) } } } @@ -258,17 +384,17 @@ impl TryFrom for GraphPath { .into_iter() .map(|vertex_or_edge| vertex_or_edge.try_into()) .collect::, _>>()?; - Ok(GraphPath::AllV(graph_path)) + Ok(GraphPath::AllPath(graph_path)) } } impl Hash for GraphPath { fn hash(&self, state: &mut H) { match self { - GraphPath::AllV(p) => p.hash(state), - GraphPath::SimpleAllV(p) => p.hash(state), - GraphPath::EndV((e, _)) => e.hash(state), - GraphPath::SimpleEndV((e, _)) => e.hash(state), + GraphPath::AllPath(p) | GraphPath::SimpleAllPath(p) => p.hash(state), + GraphPath::EndV((e, _)) | GraphPath::SimpleEndV((e, _)) => e.hash(state), } } } + +impl_as_any!(GraphPath); diff --git a/interactive_engine/executor/ir/graph_proxy/src/apis/graph/mod.rs b/interactive_engine/executor/ir/graph_proxy/src/apis/graph/mod.rs index 1379abc7ba1b..cc5e1040033a 100644 --- a/interactive_engine/executor/ir/graph_proxy/src/apis/graph/mod.rs +++ b/interactive_engine/executor/ir/graph_proxy/src/apis/graph/mod.rs @@ -167,10 +167,6 @@ impl QueryParams { Ok(self) } - pub fn is_queryable(&self) -> bool { - !(self.labels.is_empty() && self.filter.is_none() && self.limit.is_none() && self.columns.is_none()) - } - pub fn get_extra_param(&self, key: &str) -> Option<&String> { if let Some(ref extra_params) = self.extra_params { extra_params.get(key) diff --git a/interactive_engine/executor/ir/graph_proxy/src/apis/mod.rs b/interactive_engine/executor/ir/graph_proxy/src/apis/mod.rs index afc1acd10638..69b8eca326ed 100644 --- a/interactive_engine/executor/ir/graph_proxy/src/apis/mod.rs +++ b/interactive_engine/executor/ir/graph_proxy/src/apis/mod.rs @@ -22,6 +22,7 @@ pub mod write_graph; pub use cluster_info::*; pub use graph::element::{ Details, DynDetails, Edge, Element, GraphElement, GraphPath, PropKey, PropertyValue, Vertex, + VertexOrEdge, }; pub use graph::{read_id, write_id, Direction, QueryParams, ID}; pub use read_graph::{from_fn, get_graph, register_graph, ReadGraph, Statement}; diff --git a/interactive_engine/executor/ir/graph_proxy/src/apis/partitioner.rs b/interactive_engine/executor/ir/graph_proxy/src/apis/partitioner.rs index 66817b2bce93..2a41bd8ac4cc 100644 --- a/interactive_engine/executor/ir/graph_proxy/src/apis/partitioner.rs +++ b/interactive_engine/executor/ir/graph_proxy/src/apis/partitioner.rs @@ -50,7 +50,10 @@ impl PartitionedData for Edge { impl PartitionedData for GraphPath { fn get_partition_key_id(&self) -> PartitionKeyId { - self.get_path_end().id() as PartitionKeyId + match self.get_path_end() { + super::VertexOrEdge::V(v) => v.get_partition_key_id(), + super::VertexOrEdge::E(e) => e.get_partition_key_id(), + } } } diff --git a/interactive_engine/executor/ir/integrated/tests/pathxd_test.rs b/interactive_engine/executor/ir/integrated/tests/pathxd_test.rs index fb8daacd4e27..a908c9acee3e 100644 --- a/interactive_engine/executor/ir/integrated/tests/pathxd_test.rs +++ b/interactive_engine/executor/ir/integrated/tests/pathxd_test.rs @@ -29,7 +29,7 @@ mod test { use crate::common::test::*; // g.V().hasLabel("person").both("lower..upper", "knows") - // result_opt: 0: EndV, 1: AllV; path_opt: 0: Arbitrary, 1: Simple + // result_opt: 0: EndV, 1: AllV, 2: AllVE; path_opt: 0: Arbitrary, 1: Simple fn init_path_expand_request(range: pb::Range, result_opt: i32, path_opt: i32) -> JobRequest { let source_opr = pb::Scan { scan_opt: 0, @@ -68,14 +68,13 @@ mod test { } // g.V().hasLabel("person").both("2..3", "knows") - fn init_path_expand_exactly_request(is_whole_path: bool) -> JobRequest { - let result_opt = if is_whole_path { 1 } else { 0 }; + // result_opt: 0: EndV, 1: AllV, 2: AllVE; + fn init_path_expand_exactly_request(result_opt: i32) -> JobRequest { init_path_expand_request(pb::Range { lower: 2, upper: 3 }, result_opt, 0) } // g.V().hasLabel("person").both("0..3", "knows") - fn init_path_expand_range_from_zero_request(is_whole_path: bool) -> JobRequest { - let result_opt = if is_whole_path { 1 } else { 0 }; + fn init_path_expand_range_from_zero_request(result_opt: i32) -> JobRequest { init_path_expand_request(pb::Range { lower: 0, upper: 3 }, result_opt, 0) } @@ -250,9 +249,10 @@ mod test { path_expand_end_query(2) } + // both(2..3) fn path_expand_exactly_whole_query(worker_num: u32) { initialize(); - let request = init_path_expand_exactly_request(true); + let request = init_path_expand_exactly_request(1); let mut results = submit_query(request, worker_num); let mut result_collection: Vec> = vec![]; let mut expected_result_paths = @@ -294,7 +294,7 @@ mod test { fn path_expand_exactly_end_query(worker_num: u32) { initialize(); - let request = init_path_expand_exactly_request(false); + let request = init_path_expand_exactly_request(0); let mut results = submit_query(request, worker_num); let mut result_collection = vec![]; let expected_result_path_ends = vec![1, 1, 2, 2, 4, 4]; @@ -327,7 +327,7 @@ mod test { fn path_expand_range_from_zero_whole_query(worker_num: u32) { initialize(); - let request = init_path_expand_range_from_zero_request(true); + let request = init_path_expand_range_from_zero_request(1); let mut results = submit_query(request, worker_num); let mut result_collection: Vec> = vec![]; let mut expected_result_paths = vec![ @@ -625,4 +625,199 @@ mod test { fn path_expand_with_filter_query_w2_test() { path_expand_with_filter_query(2) } + + // both(1..3) with vertices and edges preserved in the path + fn path_expand_whole_v_e_query(worker_num: u32) { + initialize(); + let request = init_path_expand_request(pb::Range { lower: 1, upper: 3 }, 2, 0); + let mut results = submit_query(request, worker_num); + let mut result_collection: Vec> = vec![]; + let mut expected_result_paths: Vec> = vec![ + vec!["v1", "e[1->2]", "v2"], + vec!["v1", "e[1->4]", "v4"], + vec!["v2", "e[1->2]", "v1"], + vec!["v4", "e[1->4]", "v1"], + vec!["v1", "e[1->2]", "v2", "e[1->2]", "v1"], + vec!["v1", "e[1->4]", "v4", "e[1->4]", "v1"], + vec!["v2", "e[1->2]", "v1", "e[1->2]", "v2"], + vec!["v2", "e[1->2]", "v1", "e[1->4]", "v4"], + vec!["v4", "e[1->4]", "v1", "e[1->2]", "v2"], + vec!["v4", "e[1->4]", "v1", "e[1->4]", "v4"], + ] + .into_iter() + .map(|ids| { + ids.into_iter() + .map(|id| id.to_string()) + .collect() + }) + .collect(); + while let Some(result) = results.next() { + match result { + Ok(res) => { + let entry = parse_result(res).unwrap(); + if let Some(path) = entry.get(None).unwrap().as_graph_path() { + let path_collect = path.clone().take_path().unwrap(); + let mut path_ids = vec![]; + for v_or_e in path_collect { + match v_or_e { + graph_proxy::apis::VertexOrEdge::V(v) => { + path_ids.push(format!("v{}", v.id())) + } + graph_proxy::apis::VertexOrEdge::E(e) => { + path_ids.push(format!("e[{}->{}]", e.src_id, e.dst_id)); + } + } + } + result_collection.push(path_ids); + } + } + Err(e) => { + panic!("err result {:?}", e); + } + } + } + expected_result_paths.sort(); + result_collection.sort(); + assert_eq!(result_collection, expected_result_paths) + } + + #[test] + fn path_expand_whole_v_e_test() { + path_expand_whole_v_e_query(1) + } + + #[test] + fn path_expand_whole_v_e_w2_test() { + path_expand_whole_v_e_query(2) + } + + // both(0..3) with vertices and edges preserved in the path + fn path_expand_range_from_zero_whole_v_e_query(worker_num: u32) { + initialize(); + let request = init_path_expand_range_from_zero_request(2); + let mut results = submit_query(request, worker_num); + let mut result_collection: Vec> = vec![]; + let mut expected_result_paths: Vec> = vec![ + vec!["v1"], + vec!["v2"], + vec!["v4"], + vec!["v6"], + vec!["v1", "e[1->2]", "v2"], + vec!["v1", "e[1->4]", "v4"], + vec!["v2", "e[1->2]", "v1"], + vec!["v4", "e[1->4]", "v1"], + vec!["v1", "e[1->2]", "v2", "e[1->2]", "v1"], + vec!["v1", "e[1->4]", "v4", "e[1->4]", "v1"], + vec!["v2", "e[1->2]", "v1", "e[1->2]", "v2"], + vec!["v2", "e[1->2]", "v1", "e[1->4]", "v4"], + vec!["v4", "e[1->4]", "v1", "e[1->2]", "v2"], + vec!["v4", "e[1->4]", "v1", "e[1->4]", "v4"], + ] + .into_iter() + .map(|ids| { + ids.into_iter() + .map(|id| id.to_string()) + .collect() + }) + .collect(); + while let Some(result) = results.next() { + match result { + Ok(res) => { + let entry = parse_result(res).unwrap(); + if let Some(path) = entry.get(None).unwrap().as_graph_path() { + let path_collect = path.clone().take_path().unwrap(); + let mut path_ids = vec![]; + for v_or_e in path_collect { + match v_or_e { + graph_proxy::apis::VertexOrEdge::V(v) => { + path_ids.push(format!("v{}", v.id())); + } + graph_proxy::apis::VertexOrEdge::E(e) => { + path_ids.push(format!("e[{}->{}]", e.src_id, e.dst_id)); + } + } + } + result_collection.push(path_ids); + } + } + Err(e) => { + panic!("err result {:?}", e); + } + } + } + expected_result_paths.sort(); + result_collection.sort(); + assert_eq!(result_collection, expected_result_paths) + } + + #[test] + fn path_expand_range_from_zero_whole_v_e_test() { + path_expand_range_from_zero_whole_v_e_query(1) + } + + #[test] + fn path_expand_range_from_zero_whole_v_e_w2_test() { + path_expand_range_from_zero_whole_v_e_query(2) + } + + // both(2..3) with vertices and edges preserved in the path + fn path_expand_exactly_whole_v_e_query(worker_num: u32) { + initialize(); + let request = init_path_expand_exactly_request(2); + let mut results = submit_query(request, worker_num); + let mut result_collection: Vec> = vec![]; + let mut expected_result_paths: Vec> = vec![ + vec!["v1", "e[1->2]", "v2", "e[1->2]", "v1"], + vec!["v1", "e[1->4]", "v4", "e[1->4]", "v1"], + vec!["v2", "e[1->2]", "v1", "e[1->2]", "v2"], + vec!["v2", "e[1->2]", "v1", "e[1->4]", "v4"], + vec!["v4", "e[1->4]", "v1", "e[1->2]", "v2"], + vec!["v4", "e[1->4]", "v1", "e[1->4]", "v4"], + ] + .into_iter() + .map(|ids| { + ids.into_iter() + .map(|id| id.to_string()) + .collect() + }) + .collect(); + while let Some(result) = results.next() { + match result { + Ok(res) => { + let entry = parse_result(res).unwrap(); + if let Some(path) = entry.get(None).unwrap().as_graph_path() { + let path_collect = path.clone().take_path().unwrap(); + let mut path_ids = vec![]; + for v_or_e in path_collect { + match v_or_e { + graph_proxy::apis::VertexOrEdge::V(v) => { + path_ids.push(format!("v{}", v.id())); + } + graph_proxy::apis::VertexOrEdge::E(e) => { + path_ids.push(format!("e[{}->{}]", e.src_id, e.dst_id)); + } + } + } + result_collection.push(path_ids); + } + } + Err(e) => { + panic!("err result {:?}", e); + } + } + } + expected_result_paths.sort(); + result_collection.sort(); + assert_eq!(result_collection, expected_result_paths) + } + + #[test] + fn path_expand_exactly_whole_v_e_test() { + path_expand_exactly_whole_v_e_query(1) + } + + #[test] + fn path_expand_exactly_whole_v_e_w2_test() { + path_expand_exactly_whole_v_e_query(2) + } } diff --git a/interactive_engine/executor/ir/proto/algebra.proto b/interactive_engine/executor/ir/proto/algebra.proto index 4433a9aa0432..52501591f3f3 100644 --- a/interactive_engine/executor/ir/proto/algebra.proto +++ b/interactive_engine/executor/ir/proto/algebra.proto @@ -249,13 +249,13 @@ message Scan { message GetV { enum VOpt { // Getting the start vertex of the edge/path - Start = 0; + START = 0; // Getting the end vertex of the edge/path - End = 1; + END = 1; // Getting the other vertex of the edge/path. We are calibrating to Gremlin's bothE.otherV semantics - Other = 2; + OTHER = 2; // Getting both vertices of the edge/path - Both = 3; + BOTH = 3; } // The tag that refers to the edge/path where the end vertex will be retrieved common.NameOrId tag = 1; @@ -281,9 +281,9 @@ message EdgeExpand { BOTH = 2; } enum ExpandOpt { - Vertex = 0; - Edge = 1; - Degree = 2; + VERTEX = 0; + EDGE = 1; + DEGREE = 2; } // The tag that refers to the starting vertex common.NameOrId v_tag = 1; @@ -310,17 +310,19 @@ message PathExpand { } enum PathOpt { // an arbitrary path, in which both vertex/edge may duplicate - Arbitrary = 0; + ARBITRARY = 0; // a path without vertex duplications - Simple = 1; + SIMPLE = 1; } // Define what result is required for this path. We currently support `EndV` and `AllV`, while an option to // include all edges and vertices may be needed in the future. enum ResultOpt { // only end vertex is required for this expansion - EndV = 0; + END_V = 0; // all vertices of this path are required for this expansion. - AllV = 1; + ALL_V = 1; + // all vertices and edges of this path are required for this expansion. + ALL_V_E = 2; } // A path expansion has a base expansion ExpandBase base = 1; diff --git a/interactive_engine/executor/ir/proto/physical.proto b/interactive_engine/executor/ir/proto/physical.proto index f6c7d13b6436..26a3ce33211c 100644 --- a/interactive_engine/executor/ir/proto/physical.proto +++ b/interactive_engine/executor/ir/proto/physical.proto @@ -158,15 +158,15 @@ message Scan { message GetV { enum VOpt { // The case when getting the start vertex of the edge - Start = 0; + START = 0; // The case when getting the end vertex of the edge/path - End = 1; + END = 1; // The case when getting the other vertex of the edge. We are calibrating to Gremlin's bothE.otherV semantics - Other = 2; + OTHER = 2; // The case when getting both vertices of the edge - Both = 3; + BOTH = 3; // The case when tag refers to vertices - Itself = 4; + ITSELF = 4; } // The tag that refers to the edge/path where the end vertex will be retrieved google.protobuf.Int32Value tag = 1; @@ -185,9 +185,9 @@ message EdgeExpand { BOTH = 2; } enum ExpandOpt { - Vertex = 0; - Edge = 1; - Degree = 2; + VERTEX = 0; + EDGE = 1; + DEGREE = 2; } // The tag that refers to the starting vertex google.protobuf.Int32Value v_tag = 1; @@ -213,17 +213,19 @@ message PathExpand { } enum PathOpt { // an arbitrary path, in which both vertex/edge may duplicate - Arbitrary = 0; + ARBITRARY = 0; // a path without vertex duplications - Simple = 1; + SIMPLE = 1; } // Define what result is required for this path. We currently support `EndV` and `AllV`, while an option to // include all edges and vertices may be needed in the future. enum ResultOpt { - // only end vertex is required for this expansion - EndV = 0; - // all vertices of this path are required for this expansion. - AllV = 1; + // only end vertex is required for this expansion + END_V = 0; + // all vertices of this path are required for this expansion. + ALL_V = 1; + // all vertices and edges of this path are required for this expansion. + ALL_V_E = 2; } // A path expansion has a base of edge expansion ExpandBase base = 1; diff --git a/interactive_engine/executor/ir/runtime/src/assembly.rs b/interactive_engine/executor/ir/runtime/src/assembly.rs index 39a785e5655a..db2372dd344d 100644 --- a/interactive_engine/executor/ir/runtime/src/assembly.rs +++ b/interactive_engine/executor/ir/runtime/src/assembly.rs @@ -550,8 +550,27 @@ impl IRJobAssembly { .filter_map_with_name("PathStart", move |input| path_start_func.exec(input))?; // path base expand let mut base_expand_plan = vec![]; + // process edge_expand, with opt = ExpandV given by physical plan. if let Some(edge_expand) = base.edge_expand.take() { - base_expand_plan.push(edge_expand.into()); + if pb::path_expand::ResultOpt::AllVE + == unsafe { std::mem::transmute(path.result_opt) } + { + // the case when base expand needs to expand edges + vertices + let mut edge_expand_e = edge_expand.clone(); + edge_expand_e.expand_opt = pb::edge_expand::ExpandOpt::Edge as i32; + let alias = edge_expand_e.alias.take(); + let get_v = pb::GetV { + opt: pb::get_v::VOpt::Other as i32, + tag: None, + params: None, + alias, + }; + base_expand_plan.push(edge_expand_e.into()); + base_expand_plan.push(get_v.into()); + } else { + // the case when base expand needs to expand vertices + base_expand_plan.push(edge_expand.into()); + } } else { Err(FnGenError::from(ParsePbError::ParseError(format!( "empty EdgeExpand of ExpandBase in PathExpand Operator {:?}", @@ -569,6 +588,7 @@ impl IRJobAssembly { .into(), ); } + // process get_v, with opt = Self, given by physical plan (to deal with filtering on vertices). if let Some(getv) = base.get_v.take() { base_expand_plan.push(getv.clone().into()); } diff --git a/interactive_engine/executor/ir/runtime/src/process/operator/map/expand_intersect.rs b/interactive_engine/executor/ir/runtime/src/process/operator/map/expand_intersect.rs index 230f0119a926..f8838476052f 100644 --- a/interactive_engine/executor/ir/runtime/src/process/operator/map/expand_intersect.rs +++ b/interactive_engine/executor/ir/runtime/src/process/operator/map/expand_intersect.rs @@ -166,7 +166,7 @@ impl FilterMapFunction for ExpandOrIntersect unreachable!() } }); - if let Some(pre_entry) = input.get_column_mut(&self.edge_or_end_v_tag) { + if let Some(pre_entry) = input.get_mut(Some(self.edge_or_end_v_tag)) { // the case of expansion and intersection let pre_intersection = pre_entry .as_any_mut() diff --git a/interactive_engine/executor/ir/runtime/src/process/operator/map/get_v.rs b/interactive_engine/executor/ir/runtime/src/process/operator/map/get_v.rs index ef1715045cb1..14dac97de115 100644 --- a/interactive_engine/executor/ir/runtime/src/process/operator/map/get_v.rs +++ b/interactive_engine/executor/ir/runtime/src/process/operator/map/get_v.rs @@ -16,14 +16,14 @@ use std::convert::TryInto; use graph_proxy::apis::GraphElement; -use graph_proxy::apis::{get_graph, DynDetails, QueryParams, Vertex}; +use graph_proxy::apis::{get_graph, DynDetails, GraphPath, QueryParams, Vertex}; use ir_common::error::ParsePbError; use ir_common::generated::physical as pb; use ir_common::generated::physical::get_v::VOpt; use ir_common::{KeyId, LabelId}; use pegasus::api::function::{FilterMapFunction, FnResult}; -use crate::error::{FnExecError, FnGenError, FnGenResult}; +use crate::error::{FnExecError, FnExecResult, FnGenError, FnGenResult}; use crate::process::entry::{DynEntry, Entry, EntryType}; use crate::process::operator::map::FilterMapFuncGen; use crate::process::record::Record; @@ -33,46 +33,28 @@ struct GetVertexOperator { start_tag: Option, opt: VOpt, alias: Option, + query_labels: Vec, } -impl FilterMapFunction for GetVertexOperator { - fn exec(&self, mut input: Record) -> FnResult> { - if let Some(entry) = input.get(self.start_tag) { - if let Some(e) = entry.as_edge() { - let (id, label) = match self.opt { - VOpt::Start => (e.src_id, e.get_src_label()), - VOpt::End => (e.dst_id, e.get_dst_label()), - VOpt::Other => (e.get_other_id(), e.get_other_label()), - _ => unreachable!(), - }; - let vertex = Vertex::new(id, label.map(|l| l.clone()), DynDetails::default()); - input.append(vertex, self.alias.clone()); - Ok(Some(input)) - } else if let Some(graph_path) = entry.as_graph_path() { - // TODO: we do not check VOpt here, and we treat all cases as to get the end vertex of the path. - let path_end = graph_path.get_path_end().clone(); - input.append(path_end, self.alias.clone()); - Ok(Some(input)) +impl GetVertexOperator { + fn contains_label(&self, label: Option<&LabelId>) -> FnExecResult { + if self.query_labels.is_empty() { + // no label constraint + Ok(true) + } else { + if let Some(label) = label { + Ok(self.query_labels.contains(label)) } else { - Err(FnExecError::unexpected_data_error( - "Can only apply `GetV` (`Auxilia` instead) on an edge or path entry", - ))? + Err(FnExecError::UnExpectedData(format!( + "Label is None in GetVertexOperator, with Opr {:?}", + self, + )))? } - } else { - Ok(None) } } } -#[derive(Debug)] -struct GetVertexWithLabelOperator { - start_tag: Option, - opt: VOpt, - alias: Option, - labels: Vec, -} - -impl FilterMapFunction for GetVertexWithLabelOperator { +impl FilterMapFunction for GetVertexOperator { fn exec(&self, mut input: Record) -> FnResult> { if let Some(entry) = input.get(self.start_tag) { if let Some(e) = entry.as_edge() { @@ -82,36 +64,62 @@ impl FilterMapFunction for GetVertexWithLabelOperator { VOpt::Other => (e.get_other_id(), e.get_other_label()), _ => unreachable!(), }; - let label = label.ok_or(FnExecError::UnExpectedData(format!( - "Label is None in GetVertexWithLabelOperator, Record {:?} with Opr {:?}", - input, self, - )))?; - if self.labels.contains(label) { - let vertex = Vertex::new(id, Some(*label), DynDetails::default()); + if self.contains_label(label)? { + let vertex = Vertex::new(id, label.cloned(), DynDetails::default()); input.append(vertex, self.alias.clone()); Ok(Some(input)) } else { Ok(None) } - } else if let Some(graph_path) = entry.as_graph_path() { - if let VOpt::End = self.opt { - let path_end = graph_path.get_path_end(); - let label = path_end - .label() - .ok_or(FnExecError::UnExpectedData(format!( - "Label is None in GetVertexWithLabelOperator, Record {:?} with Opr {:?}", - input, self, - )))?; - if self.labels.contains(&label) { - input.append(path_end.clone(), self.alias.clone()); - Ok(Some(input)) - } else { - Ok(None) + } else if let Some(_) = entry.as_graph_path() { + let graph_path = input + .get_mut(self.start_tag) + .unwrap() + .as_any_mut() + .downcast_mut::() + .ok_or(FnExecError::unexpected_data_error(&format!("entry is not a path in GetV")))?; + // we check VOpt here: + // for `Other`, we treat it as to get_other_id() in the Edge within the Path (in which case is expanding the path with a adj vertex) + // for `End`, we treat it as to get EndV() in the Path (in which case is getting the end vertex from the path) + match self.opt { + VOpt::Other => { + let path_end_edge = graph_path.get_path_end().as_edge().ok_or( + FnExecError::unexpected_data_error(&format!( + "GetOtherVertex on a path entry with input: {:?}", + graph_path.get_path_end() + )), + )?; + let label = path_end_edge.get_other_label(); + if self.contains_label(label)? { + let vertex = Vertex::new( + path_end_edge.get_other_id(), + label.cloned(), + DynDetails::default(), + ); + graph_path.append(vertex); + Ok(Some(input)) + } else { + Ok(None) + } } - } else { - Err(FnExecError::unsupported_error( - "Only support `GetV` with VOpt::End on a path entry", - ))? + VOpt::End => { + let path_end_vertex = graph_path + .get_path_end() + .as_vertex() + .ok_or(FnExecError::unsupported_error("Get end edge on a path entry"))? + .clone(); + let label = path_end_vertex.label(); + if self.contains_label(label.as_ref())? { + input.append(path_end_vertex, self.alias.clone()); + Ok(Some(input)) + } else { + Ok(None) + } + } + _ => Err(FnExecError::unsupported_error(&format!( + "Wired opt in GetVertexOperator for GraphPath: {:?}", + self.opt + )))?, } } else { Err(FnExecError::unexpected_data_error( @@ -241,25 +249,16 @@ impl FilterMapFuncGen for pb::GetV { } } - if !tables_condition.is_empty() { - let get_vertex_with_label_operator = GetVertexWithLabelOperator { - start_tag: self.tag, - opt, - alias: self.alias, - labels: tables_condition, - }; - if log_enabled!(log::Level::Debug) && pegasus::get_current_worker().index == 0 { - debug!("Runtime GetVertexWithLabelOperator: {:?}", get_vertex_with_label_operator); - } - Ok(Box::new(get_vertex_with_label_operator)) - } else { - let get_vertex_operator = - GetVertexOperator { start_tag: self.tag, opt, alias: self.alias }; - if log_enabled!(log::Level::Debug) && pegasus::get_current_worker().index == 0 { - debug!("Runtime GetVertexOperator: {:?}", get_vertex_operator); - } - Ok(Box::new(get_vertex_operator)) + let get_vertex_operator = GetVertexOperator { + start_tag: self.tag, + opt, + alias: self.alias, + query_labels: tables_condition, + }; + if log_enabled!(log::Level::Debug) && pegasus::get_current_worker().index == 0 { + debug!("Runtime GetVertexOperator: {:?}", get_vertex_operator); } + Ok(Box::new(get_vertex_operator)) } VOpt::Itself => { let query_params: QueryParams = self.params.try_into()?; diff --git a/interactive_engine/executor/ir/runtime/src/process/operator/sink/sink.rs b/interactive_engine/executor/ir/runtime/src/process/operator/sink/sink.rs index 59f88b4e6721..44580c81f529 100644 --- a/interactive_engine/executor/ir/runtime/src/process/operator/sink/sink.rs +++ b/interactive_engine/executor/ir/runtime/src/process/operator/sink/sink.rs @@ -17,6 +17,7 @@ use std::borrow::BorrowMut; use std::collections::HashMap; use dyn_type::Object; +use graph_proxy::apis::VertexOrEdge; use graph_proxy::apis::{Edge, Element, GraphElement, GraphPath, Vertex, ID}; use ir_common::generated::algebra as algebra_pb; use ir_common::generated::algebra::sink_default::MetaType; @@ -188,17 +189,27 @@ impl RecordSinkEncoder { } } - fn vertex_or_edge_to_pb(&self, vertex_or_edge: &Vertex) -> result_pb::graph_path::VertexOrEdge { - let vertex_pb = self.vertex_to_pb(vertex_or_edge); - result_pb::graph_path::VertexOrEdge { - inner: Some(result_pb::graph_path::vertex_or_edge::Inner::Vertex(vertex_pb)), + fn vertex_or_edge_to_pb(&self, vertex_or_edge: &VertexOrEdge) -> result_pb::graph_path::VertexOrEdge { + match vertex_or_edge { + VertexOrEdge::V(v) => { + let vertex_pb = self.vertex_to_pb(v); + result_pb::graph_path::VertexOrEdge { + inner: Some(result_pb::graph_path::vertex_or_edge::Inner::Vertex(vertex_pb)), + } + } + VertexOrEdge::E(e) => { + let edge_pb = self.edge_to_pb(e); + result_pb::graph_path::VertexOrEdge { + inner: Some(result_pb::graph_path::vertex_or_edge::Inner::Edge(edge_pb)), + } + } } } fn path_to_pb(&self, p: &GraphPath) -> result_pb::GraphPath { let mut graph_path_pb = vec![]; match p { - GraphPath::AllV(path) | GraphPath::SimpleAllV(path) => { + GraphPath::AllPath(path) | GraphPath::SimpleAllPath(path) => { for vertex_or_edge in path { let vertex_or_edge_pb = self.vertex_or_edge_to_pb(vertex_or_edge); graph_path_pb.push(vertex_or_edge_pb); diff --git a/interactive_engine/executor/ir/runtime/src/process/record.rs b/interactive_engine/executor/ir/runtime/src/process/record.rs index f1b3a36dd34c..805591f82266 100644 --- a/interactive_engine/executor/ir/runtime/src/process/record.rs +++ b/interactive_engine/executor/ir/runtime/src/process/record.rs @@ -16,7 +16,7 @@ use std::borrow::BorrowMut; use std::hash::Hash; -use graph_proxy::apis::{GraphPath, Vertex}; +use graph_proxy::apis::{Edge, GraphPath, Vertex}; use graph_proxy::utils::expr::eval::Context; use ir_common::{KeyId, NameOrId}; use pegasus::api::function::DynIter; @@ -63,13 +63,6 @@ impl Record { self.curr = entry; } - pub fn get_column_mut(&mut self, tag: &KeyId) -> Option<&mut dyn Entry> { - self.columns - .get_mut(*tag as usize) - .map(|e| e.get_mut()) - .unwrap_or(None) - } - pub fn get_columns_mut(&mut self) -> &mut VecMap { self.columns.borrow_mut() } @@ -82,6 +75,20 @@ impl Record { } } + pub fn get_mut(&mut self, tag: Option) -> Option<&mut dyn Entry> { + if let Some(tag) = tag { + self.columns + .get_mut(tag as usize) + .map(|e| e.get_mut()) + .unwrap_or(None) + } else { + self.curr + .as_mut() + .map(|e| e.get_mut()) + .unwrap_or(None) + } + } + pub fn take(&mut self, tag: Option<&KeyId>) -> Option { if let Some(tag) = tag { self.columns.remove(*tag as usize) @@ -202,13 +209,19 @@ impl Iterator for RecordPathExpandIter { loop { match self.children.next() { Some(mut elem) => { - // currently, we only support GraphPath containing vertices. + // currently, we support GraphPath containing vertices or edges. if let Some(vertex) = elem.as_any_mut().downcast_mut::() { let v = std::mem::replace(vertex, Default::default()); if curr_path.append(v) { record.append(curr_path, None); return Some(record); } + } else if let Some(edge) = elem.as_any_mut().downcast_mut::() { + let e = std::mem::replace(edge, Default::default()); + if curr_path.append(e) { + record.append(curr_path, None); + return Some(record); + } } } None => return None,