Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GIE/Runtime] Sink all tagged columns as default if no tags given in Sink #2489

Merged
merged 6 commits into from
Mar 13, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
215 changes: 215 additions & 0 deletions interactive_engine/executor/ir/integrated/tests/sink_test.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,215 @@
//
//! Copyright 2021 Alibaba Group Holding Limited.
//!
//! Licensed under the Apache License, Version 2.0 (the "License");
//! you may not use this file except in compliance with the License.
//! You may obtain a copy of the License at
//!
//! http://www.apache.org/licenses/LICENSE-2.0
//!
//! Unless required by applicable law or agreed to in writing, software
//! distributed under the License is distributed on an "AS IS" BASIS,
//! WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
//! See the License for the specific language governing permissions and
//! limitations under the License.
//!
//!

mod common;

#[cfg(test)]
mod test {

use graph_proxy::apis::GraphElement;
use graph_store::common::DefaultId;
use graph_store::ldbc::LDBCVertexParser;
use ir_common::generated::algebra as pb;
use ir_common::generated::common as common_pb;
use runtime::process::entry::Entry;

use crate::common::test::*;
use ir_common::KeyId;
use ir_physical_client::physical_builder::JobBuilder;
use pegasus_server::JobRequest;

fn init_sink_request(
source_alias: Option<KeyId>, sink_keys: Vec<common_pb::NameOrIdKey>,
) -> JobRequest {
// g.V()
let source_opr = pb::Scan {
scan_opt: 0,
alias: source_alias.map(|tag| tag.into()),
params: Some(query_params(vec![], vec![], None)),
idx_predicate: None,
};

let sink_opr = pb::Sink { tags: sink_keys, sink_target: default_sink_target() };

let mut job_builder = JobBuilder::default();
job_builder.add_scan_source(source_opr);
job_builder.sink(sink_opr);

job_builder.build().unwrap()
}

// g.V().as(0) + Sink(0)
#[test]
fn sink_with_alias() {
initialize();
let request = init_sink_request(Some(0), vec![common_pb::NameOrIdKey { key: Some(0.into()) }]);
let mut results = submit_query(request, 1);
let mut result_collection = vec![];
let v1: DefaultId = LDBCVertexParser::to_global_id(1, 0);
let v2: DefaultId = LDBCVertexParser::to_global_id(2, 0);
let v3: DefaultId = LDBCVertexParser::to_global_id(3, 1);
let v4: DefaultId = LDBCVertexParser::to_global_id(4, 0);
let v5: DefaultId = LDBCVertexParser::to_global_id(5, 1);
let v6: DefaultId = LDBCVertexParser::to_global_id(6, 0);
let mut expected_result_ids = vec![v1, v2, v3, v4, v5, v6];
while let Some(result) = results.next() {
match result {
Ok(res) => {
let entry = parse_result(res).unwrap();
// head is not sinked
assert!(entry.get(None).is_none());
if let Some(vertex) = entry.get(Some(0)).unwrap().as_vertex() {
result_collection.push(vertex.id() as usize);
}
}
Err(e) => {
panic!("err result {:?}", e);
}
}
}
expected_result_ids.sort();
result_collection.sort();
assert_eq!(result_collection, expected_result_ids)
}

// g.V() + Sink(None)
#[test]
fn sink_with_head() {
initialize();
let request = init_sink_request(None, vec![common_pb::NameOrIdKey { key: None }]);
let mut results = submit_query(request, 1);
let mut result_collection = vec![];
let v1: DefaultId = LDBCVertexParser::to_global_id(1, 0);
let v2: DefaultId = LDBCVertexParser::to_global_id(2, 0);
let v3: DefaultId = LDBCVertexParser::to_global_id(3, 1);
let v4: DefaultId = LDBCVertexParser::to_global_id(4, 0);
let v5: DefaultId = LDBCVertexParser::to_global_id(5, 1);
let v6: DefaultId = LDBCVertexParser::to_global_id(6, 0);
let mut expected_result_ids = vec![v1, v2, v3, v4, v5, v6];
while let Some(result) = results.next() {
match result {
Ok(res) => {
let entry = parse_result(res).unwrap();
if let Some(vertex) = entry.get(None).unwrap().as_vertex() {
result_collection.push(vertex.id() as usize);
}
}
Err(e) => {
panic!("err result {:?}", e);
}
}
}
expected_result_ids.sort();
result_collection.sort();
assert_eq!(result_collection, expected_result_ids)
}

// g.V().as(0) + Sink(0, None)
#[test]
fn sink_with_head_and_alias() {
initialize();
let request = init_sink_request(
Some(0.into()),
vec![common_pb::NameOrIdKey { key: None }, common_pb::NameOrIdKey { key: Some(0.into()) }],
);
let mut results = submit_query(request, 1);
let mut result_collection = vec![];
let mut result_head_collection = vec![];
let v1: DefaultId = LDBCVertexParser::to_global_id(1, 0);
let v2: DefaultId = LDBCVertexParser::to_global_id(2, 0);
let v3: DefaultId = LDBCVertexParser::to_global_id(3, 1);
let v4: DefaultId = LDBCVertexParser::to_global_id(4, 0);
let v5: DefaultId = LDBCVertexParser::to_global_id(5, 1);
let v6: DefaultId = LDBCVertexParser::to_global_id(6, 0);
let mut expected_result_ids = vec![v1, v2, v3, v4, v5, v6];
while let Some(result) = results.next() {
match result {
Ok(res) => {
let entry = parse_result(res).unwrap();
if let Some(vertex) = entry.get(None).unwrap().as_vertex() {
result_head_collection.push(vertex.id() as usize);
}
if let Some(vertex) = entry.get(Some(0)).unwrap().as_vertex() {
result_collection.push(vertex.id() as usize);
}
}
Err(e) => {
panic!("err result {:?}", e);
}
}
}
expected_result_ids.sort();
result_collection.sort();
result_head_collection.sort();
assert_eq!(result_collection, expected_result_ids);
assert_eq!(result_head_collection, expected_result_ids);
}

// g.V().as(0) + Sink empty tags (i.e., here is Sink(0) by default)
#[test]
fn sink_default_for_aliased_records() {
initialize();
let request = init_sink_request(Some(0.into()), vec![]);
let mut results = submit_query(request, 1);
let mut result_collection = vec![];
let v1: DefaultId = LDBCVertexParser::to_global_id(1, 0);
let v2: DefaultId = LDBCVertexParser::to_global_id(2, 0);
let v3: DefaultId = LDBCVertexParser::to_global_id(3, 1);
let v4: DefaultId = LDBCVertexParser::to_global_id(4, 0);
let v5: DefaultId = LDBCVertexParser::to_global_id(5, 1);
let v6: DefaultId = LDBCVertexParser::to_global_id(6, 0);
let mut expected_result_ids = vec![v1, v2, v3, v4, v5, v6];
while let Some(result) = results.next() {
match result {
Ok(res) => {
let entry = parse_result(res).unwrap();
// head is not sinked by default.
assert!(entry.get(None).is_none());
if let Some(vertex) = entry.get(Some(0)).unwrap().as_vertex() {
result_collection.push(vertex.id() as usize);
}
}
Err(e) => {
panic!("err result {:?}", e);
}
}
}
expected_result_ids.sort();
result_collection.sort();
assert_eq!(result_collection, expected_result_ids);
}

// g.V() + Sink empty tags (i.e., here nothing is sinked by default as no records is tagged.)
#[test]
fn sink_default_for_unaliased_records() {
initialize();
let request = init_sink_request(None, vec![]);
let mut results = submit_query(request, 1);
while let Some(result) = results.next() {
match result {
Ok(res) => {
let entry = parse_result(res).unwrap();
// head is not sinked
assert!(entry.get(None).is_none());
}
Err(e) => {
panic!("err result {:?}", e);
}
}
}
}
}
2 changes: 1 addition & 1 deletion interactive_engine/executor/ir/proto/algebra.proto
Original file line number Diff line number Diff line change
Expand Up @@ -408,7 +408,7 @@ message Sink {
SinkVineyard sink_vineyard = 2;
}
}
// Define the tags of columns to sink
// Define the tags of columns to sink. If no tags given, sink all **tagged** columns by default.
repeated common.NameOrIdKey tags = 1;
// Define the target of sink, e.g., to Client as default, to Graph such as Vineyard etc.
SinkTarget sink_target = 2;
Expand Down
2 changes: 1 addition & 1 deletion interactive_engine/executor/ir/proto/physical.proto
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ message Sink {
message OptTag {
google.protobuf.Int32Value tag = 1;
}
// Define the tags of columns to sink
// Define the tags of columns to sink. If no tags given, sink all **tagged** columns by default.
repeated OptTag tags = 1;
// Define the target of sink, e.g., to Client as default, to Graph such as Vineyard etc.
algebra.Sink.SinkTarget sink_target = 2;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,8 +149,8 @@ impl RecordSinkEncoder {
return NameOrId::Str(meta_name.clone());
}
}
// if we can not find mapped meta_name, we return meta_id.to_string() instead.
NameOrId::Str(meta_id.to_string())
// if we can not find mapped meta_name, we return meta_id directly.
NameOrId::Id(meta_id)
}

fn vid_to_pb(&self, vid: &ID) -> result_pb::Vertex {
Expand Down Expand Up @@ -214,19 +214,32 @@ impl RecordSinkEncoder {
}

impl MapFunction<Record, Vec<u8>> for RecordSinkEncoder {
fn exec(&self, input: Record) -> FnResult<Vec<u8>> {
fn exec(&self, mut input: Record) -> FnResult<Vec<u8>> {
let mut sink_columns = Vec::with_capacity(self.sink_keys.len());
for sink_key in self.sink_keys.iter() {
if let Some(entry) = input.get(sink_key.clone()) {
if self.sink_keys.is_empty() {
// the case of sink all **tagged** columns by default.
let columns = input.get_columns_mut();
for (sink_key, entry) in columns.into_iter() {
let entry_pb = self.entry_to_pb(entry);
let column_pb = result_pb::Column {
name_or_id: sink_key
.clone()
.map(|sink_key| self.meta_to_pb(NameOrId::Id(sink_key), MetaType::Tag)),
name_or_id: Some(self.meta_to_pb(NameOrId::Id(sink_key as KeyId), MetaType::Tag)),
entry: Some(entry_pb),
};
sink_columns.push(column_pb);
}
} else {
for sink_key in self.sink_keys.iter() {
if let Some(entry) = input.get(sink_key.clone()) {
let entry_pb = self.entry_to_pb(entry);
let column_pb = result_pb::Column {
name_or_id: sink_key
.clone()
.map(|sink_key| self.meta_to_pb(NameOrId::Id(sink_key), MetaType::Tag)),
entry: Some(entry_pb),
};
sink_columns.push(column_pb);
}
}
}

let record_pb = result_pb::Record { columns: sink_columns };
Expand Down