diff --git a/interactive_engine/ir-adaptor/src/test/java/com/alibaba/graphscope/ir/maxgraph/RemoteTestGraph.java b/interactive_engine/ir-adaptor/src/test/java/com/alibaba/graphscope/ir/maxgraph/RemoteTestGraph.java index c43e40cce68b..ab6f4635ffb4 100644 --- a/interactive_engine/ir-adaptor/src/test/java/com/alibaba/graphscope/ir/maxgraph/RemoteTestGraph.java +++ b/interactive_engine/ir-adaptor/src/test/java/com/alibaba/graphscope/ir/maxgraph/RemoteTestGraph.java @@ -974,43 +974,44 @@ reason = "unsupported") // todo: return label is integer -@Graph.OptOut( - test = "org.apache.tinkerpop.gremlin.process.traversal.step.map.VertexTest", - method = "g_VX1X_outE", - reason = "returned label is id") -@Graph.OptOut( - test = "org.apache.tinkerpop.gremlin.process.traversal.step.map.VertexTest", - method = "g_VX4X_bothEXcreatedX", - reason = "returned label is id") -@Graph.OptOut( - test = "org.apache.tinkerpop.gremlin.process.traversal.step.map.VertexTest", - method = "g_VX4X_bothE", - reason = "returned label is id") -@Graph.OptOut( - test = "org.apache.tinkerpop.gremlin.process.traversal.step.filter.HasTest", - method = "g_E_hasXlabelXknowsX", - reason = "returned label is id") -@Graph.OptOut( - test = "org.apache.tinkerpop.gremlin.process.traversal.step.map.VertexTest", - method = "g_VX2X_inE", - reason = "returned label is id") -@Graph.OptOut( - method = - "g_VX1X_outEXknowsX_asXhereX_hasXweight_1X_asXfakeX_inV_hasXname_joshX_selectXhereX", - test = "org.apache.tinkerpop.gremlin.process.traversal.step.map.SelectTest", - reason = "returned label is id") -@Graph.OptOut( - method = "g_VX1X_outEXknowsX_asXhereX_hasXweight_1X_inV_hasXname_joshX_selectXhereX", - test = "org.apache.tinkerpop.gremlin.process.traversal.step.map.SelectTest", - reason = "returned label is id") -@Graph.OptOut( - method = "g_VX1X_outEXknowsX_hasXweight_1X_asXhereX_inV_hasXname_joshX_selectXhereX", - test = "org.apache.tinkerpop.gremlin.process.traversal.step.map.SelectTest", - reason = "returned label is id") -@Graph.OptOut( - method = "g_VX1X_outE_asXhereX_inV_hasXname_vadasX_selectXhereX", - test = "org.apache.tinkerpop.gremlin.process.traversal.step.map.SelectTest", - reason = "returned label is id") +// @Graph.OptOut( +// test = "org.apache.tinkerpop.gremlin.process.traversal.step.map.VertexTest", +// method = "g_VX1X_outE", +// reason = "returned label is id") +// @Graph.OptOut( +// test = "org.apache.tinkerpop.gremlin.process.traversal.step.map.VertexTest", +// method = "g_VX4X_bothEXcreatedX", +// reason = "returned label is id") +// @Graph.OptOut( +// test = "org.apache.tinkerpop.gremlin.process.traversal.step.map.VertexTest", +// method = "g_VX4X_bothE", +// reason = "returned label is id") +// @Graph.OptOut( +// test = "org.apache.tinkerpop.gremlin.process.traversal.step.filter.HasTest", +// method = "g_E_hasXlabelXknowsX", +// reason = "returned label is id") +// @Graph.OptOut( +// test = "org.apache.tinkerpop.gremlin.process.traversal.step.map.VertexTest", +// method = "g_VX2X_inE", +// reason = "returned label is id") +// @Graph.OptOut( +// method = +// +// "g_VX1X_outEXknowsX_asXhereX_hasXweight_1X_asXfakeX_inV_hasXname_joshX_selectXhereX", +// test = "org.apache.tinkerpop.gremlin.process.traversal.step.map.SelectTest", +// reason = "returned label is id") +// @Graph.OptOut( +// method = "g_VX1X_outEXknowsX_asXhereX_hasXweight_1X_inV_hasXname_joshX_selectXhereX", +// test = "org.apache.tinkerpop.gremlin.process.traversal.step.map.SelectTest", +// reason = "returned label is id") +// @Graph.OptOut( +// method = "g_VX1X_outEXknowsX_hasXweight_1X_asXhereX_inV_hasXname_joshX_selectXhereX", +// test = "org.apache.tinkerpop.gremlin.process.traversal.step.map.SelectTest", +// reason = "returned label is id") +// @Graph.OptOut( +// method = "g_VX1X_outE_asXhereX_inV_hasXname_vadasX_selectXhereX", +// test = "org.apache.tinkerpop.gremlin.process.traversal.step.map.SelectTest", +// reason = "returned label is id") // add more ignored tests which are out of ir range @Graph.OptOut( diff --git a/research/query_service/ir/compiler/src/main/java/com/alibaba/graphscope/common/IrPlan.java b/research/query_service/ir/compiler/src/main/java/com/alibaba/graphscope/common/IrPlan.java index 5a6c111d9aea..2539a15ee45a 100644 --- a/research/query_service/ir/compiler/src/main/java/com/alibaba/graphscope/common/IrPlan.java +++ b/research/query_service/ir/compiler/src/main/java/com/alibaba/graphscope/common/IrPlan.java @@ -540,7 +540,7 @@ public String getPlanAsJson() throws IOException { if (file.exists()) { file.delete(); } - irCoreLib.write_plan_to_json(ptrPlan, PLAN_JSON_FILE); + irCoreLib.writePlanToJson(ptrPlan, PLAN_JSON_FILE); json = FileUtils.readFileToString(file, StandardCharsets.UTF_8); if (file.exists()) { file.delete(); diff --git a/research/query_service/ir/compiler/src/main/java/com/alibaba/graphscope/common/jna/IrCoreLibrary.java b/research/query_service/ir/compiler/src/main/java/com/alibaba/graphscope/common/jna/IrCoreLibrary.java index 9c8881fba9e5..daba01e0a1a9 100644 --- a/research/query_service/ir/compiler/src/main/java/com/alibaba/graphscope/common/jna/IrCoreLibrary.java +++ b/research/query_service/ir/compiler/src/main/java/com/alibaba/graphscope/common/jna/IrCoreLibrary.java @@ -34,7 +34,7 @@ public interface IrCoreLibrary extends Library { Pointer initLogicalPlan(); - void write_plan_to_json(Pointer plan, String jsonFile); + void writePlanToJson(Pointer plan, String jsonFile); void destroyLogicalPlan(Pointer plan); @@ -224,4 +224,6 @@ FfiError.ByValue appendPatternOperator( FfiError.ByValue setParamsPredicate(Pointer params, String predicate); FfiError.ByValue setParamsIsAllColumns(Pointer params); + + FfiKeyResult.ByValue getKeyName(int keyId, FfiKeyType keyType); } diff --git a/research/query_service/ir/compiler/src/main/java/com/alibaba/graphscope/common/jna/type/FfiKeyResult.java b/research/query_service/ir/compiler/src/main/java/com/alibaba/graphscope/common/jna/type/FfiKeyResult.java new file mode 100644 index 000000000000..a04cb675583b --- /dev/null +++ b/research/query_service/ir/compiler/src/main/java/com/alibaba/graphscope/common/jna/type/FfiKeyResult.java @@ -0,0 +1,16 @@ +package com.alibaba.graphscope.common.jna.type; + +import com.alibaba.graphscope.common.jna.IrTypeMapper; +import com.sun.jna.Structure; + +@Structure.FieldOrder({"keyName", "error"}) +public class FfiKeyResult extends Structure { + public FfiKeyResult() { + super(IrTypeMapper.INSTANCE); + } + + public static class ByValue extends FfiKeyResult implements Structure.ByValue {} + + public String keyName; + public FfiError.ByValue error; +} diff --git a/research/query_service/ir/compiler/src/main/java/com/alibaba/graphscope/common/jna/type/FfiKeyType.java b/research/query_service/ir/compiler/src/main/java/com/alibaba/graphscope/common/jna/type/FfiKeyType.java new file mode 100644 index 000000000000..d8084feb3052 --- /dev/null +++ b/research/query_service/ir/compiler/src/main/java/com/alibaba/graphscope/common/jna/type/FfiKeyType.java @@ -0,0 +1,23 @@ +package com.alibaba.graphscope.common.jna.type; + +import com.alibaba.graphscope.common.jna.IntEnum; + +public enum FfiKeyType implements IntEnum { + Entity, + Relation, + Column; + + @Override + public int getInt() { + return this.ordinal(); + } + + @Override + public FfiKeyType getEnum(int i) { + FfiKeyType opts[] = values(); + if (i < opts.length && i >= 0) { + return opts[i]; + } + return null; + } +} diff --git a/research/query_service/ir/compiler/src/main/java/com/alibaba/graphscope/gremlin/integration/graph/RemoteTestGraph.java b/research/query_service/ir/compiler/src/main/java/com/alibaba/graphscope/gremlin/integration/graph/RemoteTestGraph.java index 6bdddb47878d..f1a450baf609 100644 --- a/research/query_service/ir/compiler/src/main/java/com/alibaba/graphscope/gremlin/integration/graph/RemoteTestGraph.java +++ b/research/query_service/ir/compiler/src/main/java/com/alibaba/graphscope/gremlin/integration/graph/RemoteTestGraph.java @@ -998,43 +998,44 @@ reason = "unsupported") // todo: return label is integer -@Graph.OptOut( - test = "org.apache.tinkerpop.gremlin.process.traversal.step.map.VertexTest", - method = "g_VX1X_outE", - reason = "returned label is id") -@Graph.OptOut( - test = "org.apache.tinkerpop.gremlin.process.traversal.step.map.VertexTest", - method = "g_VX4X_bothEXcreatedX", - reason = "returned label is id") -@Graph.OptOut( - test = "org.apache.tinkerpop.gremlin.process.traversal.step.map.VertexTest", - method = "g_VX4X_bothE", - reason = "returned label is id") -@Graph.OptOut( - test = "org.apache.tinkerpop.gremlin.process.traversal.step.filter.HasTest", - method = "g_E_hasXlabelXknowsX", - reason = "returned label is id") -@Graph.OptOut( - test = "org.apache.tinkerpop.gremlin.process.traversal.step.map.VertexTest", - method = "g_VX2X_inE", - reason = "returned label is id") -@Graph.OptOut( - method = - "g_VX1X_outEXknowsX_asXhereX_hasXweight_1X_asXfakeX_inV_hasXname_joshX_selectXhereX", - test = "org.apache.tinkerpop.gremlin.process.traversal.step.map.SelectTest", - reason = "returned label is id") -@Graph.OptOut( - method = "g_VX1X_outEXknowsX_asXhereX_hasXweight_1X_inV_hasXname_joshX_selectXhereX", - test = "org.apache.tinkerpop.gremlin.process.traversal.step.map.SelectTest", - reason = "returned label is id") -@Graph.OptOut( - method = "g_VX1X_outEXknowsX_hasXweight_1X_asXhereX_inV_hasXname_joshX_selectXhereX", - test = "org.apache.tinkerpop.gremlin.process.traversal.step.map.SelectTest", - reason = "returned label is id") -@Graph.OptOut( - method = "g_VX1X_outE_asXhereX_inV_hasXname_vadasX_selectXhereX", - test = "org.apache.tinkerpop.gremlin.process.traversal.step.map.SelectTest", - reason = "returned label is id") +// @Graph.OptOut( +// test = "org.apache.tinkerpop.gremlin.process.traversal.step.map.VertexTest", +// method = "g_VX1X_outE", +// reason = "returned label is id") +// @Graph.OptOut( +// test = "org.apache.tinkerpop.gremlin.process.traversal.step.map.VertexTest", +// method = "g_VX4X_bothEXcreatedX", +// reason = "returned label is id") +// @Graph.OptOut( +// test = "org.apache.tinkerpop.gremlin.process.traversal.step.map.VertexTest", +// method = "g_VX4X_bothE", +// reason = "returned label is id") +// @Graph.OptOut( +// test = "org.apache.tinkerpop.gremlin.process.traversal.step.filter.HasTest", +// method = "g_E_hasXlabelXknowsX", +// reason = "returned label is id") +// @Graph.OptOut( +// test = "org.apache.tinkerpop.gremlin.process.traversal.step.map.VertexTest", +// method = "g_VX2X_inE", +// reason = "returned label is id") +// @Graph.OptOut( +// method = +// +// "g_VX1X_outEXknowsX_asXhereX_hasXweight_1X_asXfakeX_inV_hasXname_joshX_selectXhereX", +// test = "org.apache.tinkerpop.gremlin.process.traversal.step.map.SelectTest", +// reason = "returned label is id") +// @Graph.OptOut( +// method = "g_VX1X_outEXknowsX_asXhereX_hasXweight_1X_inV_hasXname_joshX_selectXhereX", +// test = "org.apache.tinkerpop.gremlin.process.traversal.step.map.SelectTest", +// reason = "returned label is id") +// @Graph.OptOut( +// method = "g_VX1X_outEXknowsX_hasXweight_1X_asXhereX_inV_hasXname_joshX_selectXhereX", +// test = "org.apache.tinkerpop.gremlin.process.traversal.step.map.SelectTest", +// reason = "returned label is id") +// @Graph.OptOut( +// method = "g_VX1X_outE_asXhereX_inV_hasXname_vadasX_selectXhereX", +// test = "org.apache.tinkerpop.gremlin.process.traversal.step.map.SelectTest", +// reason = "returned label is id") // add more ignored tests which are out of ir range @Graph.OptOut( diff --git a/research/query_service/ir/compiler/src/main/java/com/alibaba/graphscope/gremlin/result/GremlinResultParserFactory.java b/research/query_service/ir/compiler/src/main/java/com/alibaba/graphscope/gremlin/result/GremlinResultParserFactory.java index 8b64981be362..2b0e72e264d2 100644 --- a/research/query_service/ir/compiler/src/main/java/com/alibaba/graphscope/gremlin/result/GremlinResultParserFactory.java +++ b/research/query_service/ir/compiler/src/main/java/com/alibaba/graphscope/gremlin/result/GremlinResultParserFactory.java @@ -16,6 +16,7 @@ package com.alibaba.graphscope.gremlin.result; +import com.alibaba.graphscope.common.jna.type.FfiKeyType; import com.alibaba.graphscope.gaia.proto.Common; import com.alibaba.graphscope.gaia.proto.IrResult; import com.alibaba.graphscope.gaia.proto.OuterExpression; @@ -61,7 +62,7 @@ public Object parseFrom(IrResult.Results results) { @Override public Object parseFrom(IrResult.Results results) { IrResult.Record record = results.getRecord(); - logger.info("{}", record); + logger.debug("{}", record); Map projectResult = new HashMap<>(); record.getColumnsList() .forEach( @@ -75,12 +76,13 @@ public Object parseFrom(IrResult.Results results) { projectTags.forEach( (k, v) -> { if (!(v instanceof EmptyValue)) { - String property = (String) k.get(1); - if (property.isEmpty()) { + String nameOrId = (String) k.get(1); + if (nameOrId.isEmpty()) { throw new GremlinResultParserException( "map value should have property" + " key"); } + String property = getPropertyName(nameOrId); Map tagEntry = (Map) projectResult.computeIfAbsent( @@ -124,6 +126,17 @@ private String getColumnKeyAsResultKey(OuterExpression.NameOrId columnKey) { throw new GremlinResultParserException(columnKey.getItemCase() + " is invalid"); } } + + // propertyId is in String format, i.e. "1" + private String getPropertyName(String nameOrId) { + OuterExpression.NameOrId.Builder builder = OuterExpression.NameOrId.newBuilder(); + if (nameOrId.matches("^[0-9]+$")) { + builder.setId(Integer.valueOf(nameOrId)); + } else { + builder.setName(nameOrId); + } + return ParserUtils.getKeyName(builder.build(), FfiKeyType.Column); + } }, GROUP { @Override diff --git a/research/query_service/ir/compiler/src/main/java/com/alibaba/graphscope/gremlin/result/ParserUtils.java b/research/query_service/ir/compiler/src/main/java/com/alibaba/graphscope/gremlin/result/ParserUtils.java index 5da391a357dc..c9ff4cf677d3 100644 --- a/research/query_service/ir/compiler/src/main/java/com/alibaba/graphscope/gremlin/result/ParserUtils.java +++ b/research/query_service/ir/compiler/src/main/java/com/alibaba/graphscope/gremlin/result/ParserUtils.java @@ -16,8 +16,11 @@ package com.alibaba.graphscope.gremlin.result; +import com.alibaba.graphscope.common.jna.IrCoreLibrary; +import com.alibaba.graphscope.common.jna.type.*; import com.alibaba.graphscope.gaia.proto.Common; import com.alibaba.graphscope.gaia.proto.IrResult; +import com.alibaba.graphscope.gaia.proto.OuterExpression; import com.alibaba.graphscope.gremlin.exception.GremlinResultParserException; import org.apache.tinkerpop.gremlin.structure.Edge; @@ -34,6 +37,7 @@ public class ParserUtils { private static final Logger logger = LoggerFactory.getLogger(ParserUtils.class); + private static final IrCoreLibrary irCoreLib = IrCoreLibrary.INSTANCE; public static Object parseElement(IrResult.Element element) { switch (element.getInnerCase()) { @@ -109,22 +113,45 @@ private static Object parseCommonValue(Common.Value value) { private static Vertex parseVertex(IrResult.Vertex vertex) { Map properties = parseProperties(vertex.getPropertiesList()); - return new DetachedVertex(vertex.getId(), vertex.getLabel().getName(), properties); + return new DetachedVertex( + vertex.getId(), getKeyName(vertex.getLabel(), FfiKeyType.Entity), properties); } private static Edge parseEdge(IrResult.Edge edge) { Map edgeProperties = parseProperties(edge.getPropertiesList()); return new DetachedEdge( edge.getId(), - edge.getLabel().getName(), + getKeyName(edge.getLabel(), FfiKeyType.Relation), edgeProperties, edge.getSrcId(), - edge.getSrcLabel().getName(), + getKeyName(edge.getSrcLabel(), FfiKeyType.Entity), edge.getDstId(), - edge.getDstLabel().getName()); + getKeyName(edge.getDstLabel(), FfiKeyType.Entity)); } private static Map parseProperties(List properties) { return new HashMap<>(); } + + public static String getKeyName(OuterExpression.NameOrId key, FfiKeyType type) { + switch (key.getItemCase()) { + case NAME: + return key.getName(); + case ID: + { + FfiKeyResult result = irCoreLib.getKeyName(key.getId(), type); + if (result.error == null || result.error.code != ResultCode.Success) { + String errorMsg = + (result.error == null) ? "error code is null" : result.error.msg; + throw new GremlinResultParserException("getKeyName fail " + errorMsg); + } + return result.keyName; + } + default: + // throw new GremlinResultParserException("key type " + key.getItemCase().name() + " + // is invalid"); + logger.error("{}", "key type is not set"); + return ""; + } + } } diff --git a/research/query_service/ir/core/src/plan/ffi.rs b/research/query_service/ir/core/src/plan/ffi.rs index d523f71a3980..18e6689cce5a 100644 --- a/research/query_service/ir/core/src/plan/ffi.rs +++ b/research/query_service/ir/core/src/plan/ffi.rs @@ -65,7 +65,7 @@ use prost::Message; use crate::error::IrError; use crate::plan::logical::LogicalPlan; -use crate::plan::meta::set_schema_from_json; +use crate::plan::meta::{set_schema_from_json, KeyType}; use crate::plan::physical::AsPhysical; use crate::JsonIO; @@ -583,6 +583,81 @@ pub extern "C" fn set_schema(cstr_json: *const c_char) -> FfiError { } } +#[repr(i32)] +#[derive(Copy, Clone)] +pub enum FfiKeyType { + Entity = 0, + Relation = 1, + Column = 2, +} + +impl From for KeyType { + fn from(t: FfiKeyType) -> Self { + match t { + FfiKeyType::Entity => KeyType::Entity, + FfiKeyType::Relation => KeyType::Relation, + FfiKeyType::Column => KeyType::Column, + } + } +} + +#[repr(C)] +#[derive(Copy, Clone)] +pub struct FfiKeyResult { + key_name: *const c_char, + error: FfiError, +} + +impl From for FfiKeyResult { + fn from(error: FfiError) -> Self { + FfiKeyResult { key_name: std::ptr::null::(), error } + } +} + +/// To release a FfiKeyResult +#[no_mangle] +pub extern "C" fn destroy_ffi_key_result(data: FfiKeyResult) { + if !data.key_name.is_null() { + let _ = unsafe { std::ffi::CString::from_raw(data.key_name as *mut c_char) }; + } + if !data.error.msg.is_null() { + let _ = unsafe { std::ffi::CString::from_raw(data.error.msg as *mut c_char) }; + } +} + +/// Query prop_name by given prop_id +#[no_mangle] +pub extern "C" fn get_key_name(key_id: i32, key_type: FfiKeyType) -> FfiKeyResult { + use super::meta::STORE_META; + if let Ok(meta) = STORE_META.read() { + if let Some(schema) = &meta.schema { + if let Some(key_name) = schema.get_name(key_id, key_type.into()) { + FfiKeyResult { + key_name: string_to_cstr(key_name.to_string()).unwrap(), + error: FfiError::success(), + } + } else { + match key_type { + FfiKeyType::Entity | FfiKeyType::Relation => FfiError::new( + ResultCode::TableNotExistError, + format!("label_id {:?} is not found", key_id), + ) + .into(), + FfiKeyType::Column => FfiError::new( + ResultCode::ColumnNotExistError, + format!("prop_id {:?} is not found", key_id), + ) + .into(), + } + } + } else { + FfiError::new(ResultCode::Others, "error getting schema from store meta".to_string()).into() + } + } else { + FfiError::new(ResultCode::Others, "error reading store meta".to_string()).into() + } +} + /// Initialize a logical plan, which expose a pointer for c-like program to access the /// entry of the logical plan. This pointer, however, is owned by Rust, and the caller /// **must not** process any operation, which includes but not limited to deallocate it. diff --git a/research/query_service/ir/runtime/src/process/operator/sink/sink.rs b/research/query_service/ir/runtime/src/process/operator/sink/sink.rs index 7fa1f9929ce9..55c2658b103c 100644 --- a/research/query_service/ir/runtime/src/process/operator/sink/sink.rs +++ b/research/query_service/ir/runtime/src/process/operator/sink/sink.rs @@ -35,8 +35,7 @@ use crate::process::record::{CommonObject, Entry, Record, RecordElement}; pub struct RecordSinkEncoder { /// the given column tags to sink; sink_keys: Vec>, - /// A map from id to name; including type of Entity (Vertex in Graph Database), - /// Relation (Edge in Graph Database), Column (Property in Graph Database), and Tag (Alias). + /// A map from id to name; Now we only support to map Tag (Alias) in Runtime. schema_map: Option>, } @@ -102,14 +101,11 @@ impl RecordSinkEncoder { // a special case to parse key in KV, where the key is vec![tag, prop_name] if let Object::Vector(ref mut v) = key { if v.len() == 2 { + // map tag_id to tag_name if let Ok(tag_id) = v.get(0).unwrap().as_i32() { let mapped_tag = Object::from(self.get_meta_name(tag_id, MetaType::Tag)); *(v[0].borrow_mut()) = mapped_tag; } - if let Ok(prop_id) = v.get(1).unwrap().as_i32() { - let mapped_prop = Object::from(self.get_meta_name(prop_id, MetaType::Column)); - *(v[1].borrow_mut()) = mapped_prop; - } } } let key_pb: common_pb::Value = key.into(); @@ -144,9 +140,7 @@ impl RecordSinkEncoder { fn vertex_to_pb(&self, v: &Vertex) -> result_pb::Vertex { result_pb::Vertex { id: v.id() as i64, - label: v - .label() - .map(|label| self.meta_to_pb(label.clone(), MetaType::Entity)), + label: v.label().map(|label| label.clone().into()), // TODO: return detached vertex without property for now properties: vec![], } @@ -155,17 +149,15 @@ impl RecordSinkEncoder { fn edge_to_pb(&self, e: &Edge) -> result_pb::Edge { result_pb::Edge { id: e.id() as i64, - label: e - .label() - .map(|label| self.meta_to_pb(label.clone(), MetaType::Relation)), + label: e.label().map(|label| label.clone().into()), src_id: e.src_id as i64, src_label: e .get_src_label() - .map(|label| self.meta_to_pb(label.clone(), MetaType::Entity)), + .map(|label| label.clone().into()), dst_id: e.dst_id as i64, dst_label: e .get_dst_label() - .map(|label| self.meta_to_pb(label.clone(), MetaType::Entity)), + .map(|label| label.clone().into()), // TODO: return detached edge without property for now properties: vec![], } @@ -253,190 +245,3 @@ impl SinkFunctionGen for algebra_pb::Sink { } } -#[cfg(test)] -mod tests { - use ir_common::generated::algebra as pb; - use ir_common::generated::common as common_pb; - use ir_common::generated::results as result_pb; - use pegasus::api::{Map, Sink}; - use pegasus::result::ResultStream; - use pegasus::JobConf; - - use crate::graph::element::{Edge, Vertex}; - use crate::graph::property::{DefaultDetails, DynDetails}; - use crate::process::operator::sink::SinkFunctionGen; - use crate::process::record::Record; - - fn sink_test(source: Vec, sink_opr_pb: pb::Sink) -> ResultStream { - let conf = JobConf::new("sink_test"); - let result = pegasus::run(conf, || { - let source = source.clone(); - let sink_opr_pb = sink_opr_pb.clone(); - |input, output| { - let stream = input.input_from(source)?; - let ec = sink_opr_pb.gen_sink().unwrap(); - stream - .map(move |record| ec.exec(record))? - .sink_into(output) - } - }) - .expect("build job failure"); - - result - } - - // g.V() - #[test] - fn sink_vertex_label_mapping_test() { - let v1 = Vertex::new(1, Some(1.into()), DynDetails::new(DefaultDetails::default())); - let v2 = Vertex::new(2, Some(2.into()), DynDetails::new(DefaultDetails::default())); - - let sink_opr_pb = pb::Sink { - tags: vec![common_pb::NameOrIdKey { key: None }], - id_name_mappings: vec![ - pb::sink::IdNameMapping { - id: 1, - name: "person".to_string(), - meta_type: 0, // pb::sink::MetaType::Entity - }, - pb::sink::IdNameMapping { - id: 2, - name: "software".to_string(), - meta_type: 0, // pb::sink::MetaType::Entity - }, - ], - }; - - let mut result = sink_test(vec![Record::new(v1, None), Record::new(v2, None)], sink_opr_pb); - let mut result_id_labels = vec![]; - while let Some(Ok(result_pb)) = result.next() { - if let Some(result_pb::results::Inner::Record(record)) = result_pb.inner { - assert_eq!(record.columns.len(), 1); - let entry = record - .columns - .get(0) - .unwrap() - .entry - .as_ref() - .unwrap(); - if let Some(result_pb::entry::Inner::Element(e)) = entry.inner.as_ref() { - if let Some(result_pb::element::Inner::Vertex(v)) = e.inner.as_ref() { - result_id_labels.push((v.id, v.label.clone().unwrap())) - } - } - } - } - result_id_labels.sort_by(|a, b| a.0.cmp(&b.0)); - - let expected_results = vec![ - ( - 1, - common_pb::NameOrId { item: Some(common_pb::name_or_id::Item::Name("person".to_string())) }, - ), - ( - 2, - common_pb::NameOrId { - item: Some(common_pb::name_or_id::Item::Name("software".to_string())), - }, - ), - ]; - - assert_eq!(result_id_labels, expected_results); - } - - // g.E() - #[test] - fn sink_edge_label_mapping_test() { - // label_mapping: - // vlabel: 11: person, 22: software, - // elabel: 111: create, 222: created_by - let mut e1 = Edge::new(1, Some(111.into()), 1, 2, DynDetails::new(DefaultDetails::default())); - e1.set_src_label(Some(11.into())); - e1.set_dst_label(Some(22.into())); - - let mut e2 = Edge::new(2, Some(222.into()), 2, 1, DynDetails::new(DefaultDetails::default())); - e2.set_src_label(Some(22.into())); - e2.set_dst_label(Some(11.into())); - - let sink_opr_pb = pb::Sink { - tags: vec![common_pb::NameOrIdKey { key: None }], - id_name_mappings: vec![ - pb::sink::IdNameMapping { - id: 11, - name: "person".to_string(), - meta_type: 0, // pb::sink::MetaType::Entity - }, - pb::sink::IdNameMapping { - id: 22, - name: "software".to_string(), - meta_type: 0, // pb::sink::MetaType::Entity - }, - pb::sink::IdNameMapping { - id: 111, - name: "create".to_string(), - meta_type: 1, // pb::sink::MetaType::Relation - }, - pb::sink::IdNameMapping { - id: 222, - name: "created_by".to_string(), - meta_type: 1, // pb::sink::MetaType::Relation - }, - ], - }; - - let mut result = sink_test(vec![Record::new(e1, None), Record::new(e2, None)], sink_opr_pb); - let mut result_eid_labels = vec![]; - while let Some(Ok(result_pb)) = result.next() { - if let Some(result_pb::results::Inner::Record(record)) = result_pb.inner { - assert_eq!(record.columns.len(), 1); - let entry = record - .columns - .get(0) - .unwrap() - .entry - .as_ref() - .unwrap(); - if let Some(result_pb::entry::Inner::Element(e)) = entry.inner.as_ref() { - if let Some(result_pb::element::Inner::Edge(e)) = e.inner.as_ref() { - result_eid_labels.push(( - e.src_id, - e.src_label.clone().unwrap(), - e.dst_id, - e.dst_label.clone().unwrap(), - e.id, - e.label.clone().unwrap(), - )); - } - } - } - } - result_eid_labels.sort_by(|a, b| a.0.cmp(&b.0)); - - let expected_results = vec![ - ( - 1, - common_pb::NameOrId { item: Some(common_pb::name_or_id::Item::Name("person".to_string())) }, - 2, - common_pb::NameOrId { - item: Some(common_pb::name_or_id::Item::Name("software".to_string())), - }, - 1, - common_pb::NameOrId { item: Some(common_pb::name_or_id::Item::Name("create".to_string())) }, - ), - ( - 2, - common_pb::NameOrId { - item: Some(common_pb::name_or_id::Item::Name("software".to_string())), - }, - 1, - common_pb::NameOrId { item: Some(common_pb::name_or_id::Item::Name("person".to_string())) }, - 2, - common_pb::NameOrId { - item: Some(common_pb::name_or_id::Item::Name("created_by".to_string())), - }, - ), - ]; - - assert_eq!(result_eid_labels, expected_results); - } -}