Skip to content

Commit

Permalink
[GAIA] Add storage adapter for graphscope store (#521)
Browse files Browse the repository at this point in the history
* Add adapter for querying GraphScope store in GAIA.
* Add MultiPartition for v2 store to route the vid/eid query request to the partition that holds its data and assign an worker to do the query.
* Fix compile error with dependency on GraphScope store.
  • Loading branch information
BingqingLyu committed Jul 14, 2021
1 parent ae6b8c5 commit 55adea9
Show file tree
Hide file tree
Showing 12 changed files with 594 additions and 17 deletions.
2 changes: 1 addition & 1 deletion interactive_engine/src/common/rust/common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ build = "build.rs"
[dependencies]
serde_json = "1.0"
protobuf = { version = "~2.0", features = ["with-bytes"] }
pnet = "0.23.0"
pnet = "0.27.0"
log = "0.3"
log4rs = "0.8.0"
serde = "1.0.72"
Expand Down
1 change: 1 addition & 0 deletions interactive_engine/src/executor/store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ maxgraph-common = { path = "../../common/rust/common" }

[dependencies.rocksdb]
git = "https://github.com/pingcap/rust-rocksdb.git"
rev = "23a604459899f4f7a4c8967f16b6fa623474f27b"

[build-dependencies]
protoc-grpcio = "0.3.0"
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#![allow(dead_code)]
use serde::export::PhantomData;
use std::marker::PhantomData;
use protobuf::Message;
use crate::db::api::{GraphResult, GraphError};
use crate::db::api::GraphErrorCode::InvalidData;
Expand Down
9 changes: 1 addition & 8 deletions research/gaia/gremlin/gremlin_core/src/compiler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
use crate::process::traversal::step::*;
use crate::process::traversal::step::{BySubJoin, HasAnyJoin};
use crate::process::traversal::traverser::Traverser;
use crate::structure::Element;
use crate::Partitioner;
use crate::{generated as pb, TraverserSinkEncoder};
use pegasus::api::function::*;
Expand Down Expand Up @@ -55,13 +54,7 @@ impl JobCompiler<Traverser> for GremlinJobCompiler {
let p = self.partitioner.clone();
if let Some(worker_id) = pegasus::get_current_worker() {
let num_workers = worker_id.peers as usize / self.num_servers;
Ok(box_route!(move |t: &Traverser| -> u64 {
if let Some(e) = t.get_element() {
p.get_partition(&e.id(), num_workers)
} else {
0
}
}))
Ok(Box::new(Router { p, num_workers }))
} else {
Err("worker id not found")?
}
Expand Down
10 changes: 5 additions & 5 deletions research/gaia/gremlin/gremlin_core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ extern crate enum_dispatch;
extern crate lazy_static;
#[macro_use]
extern crate pegasus_common;
#[macro_use]
extern crate pegasus;
#[macro_use]
extern crate pegasus_config;
Expand Down Expand Up @@ -51,6 +50,7 @@ use crate::result_process::result_to_pb;
use crate::structure::filter::codec::ParseError;
pub use generated::gremlin::GremlinStep as GremlinStepPb;
pub use graph_proxy::{create_demo_graph, ID_MASK};
pub use graph_store::utils::IterList;
use std::io;

#[cfg(feature = "proto_inplace")]
Expand Down Expand Up @@ -91,7 +91,7 @@ impl From<ParseError> for DynError {

/// A tricky bypassing of Rust's compiler. It is useful to simplify throwing a `DynError`
/// from a `&str` as `Err(str_to_dyn_err('some str'))`
fn str_to_dyn_error(str: &str) -> DynError {
pub fn str_to_dyn_error(str: &str) -> DynError {
let err: Box<dyn std::error::Error + Send + Sync> = str.into();
err
}
Expand All @@ -107,7 +107,7 @@ pub trait FromPb<T> {
}

pub trait Partitioner: Send + Sync + 'static {
fn get_partition(&self, id: &ID, job_workers: usize) -> u64;
fn get_partition(&self, id: &ID, job_workers: usize) -> DynResult<u64>;
}

/// A simple partition utility
Expand All @@ -116,7 +116,7 @@ pub struct Partition {
}

impl Partitioner for Partition {
fn get_partition(&self, id: &ID, workers: usize) -> u64 {
fn get_partition(&self, id: &ID, workers: usize) -> DynResult<u64> {
let id_usize = (*id & (ID_MASK)) as usize;
let magic_num = id_usize / self.num_servers;
// The partitioning logics is as follows:
Expand All @@ -125,7 +125,7 @@ impl Partitioner for Partition {
// 2. `R * workers` shifts the worker's id in the machine R.
// 3. `magic_num % workers` then picks up one of the workers in the machine R
// to do the computation.
((id_usize - magic_num * self.num_servers) * workers + magic_num % workers) as u64
Ok(((id_usize - magic_num * self.num_servers) * workers + magic_num % workers) as u64)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ mod order_by;
mod sink;
mod source;
mod sub_traversal;
mod traverser_router;
mod util;

use crate::structure::{Tag, INIT_TAG_NUM};
Expand All @@ -82,4 +83,5 @@ pub use sink::SinkFuncGen;
pub use source::graph_step_from;
pub use source::GraphVertexStep;
pub use sub_traversal::{BySubJoin, GroupBySubJoin, HasAnyJoin, JoinFuncGen, SelectBySubJoin};
pub use traverser_router::Router;
pub use util::result_downcast;
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,11 @@ impl GraphVertexStep {
pub fn set_src(&mut self, ids: Vec<ID>, partitioner: Arc<dyn Partitioner>) {
let mut partitions = HashMap::new();
for id in ids {
let wid = partitioner.get_partition(&id, self.workers);
partitions.entry(wid).or_insert_with(Vec::new).push(id);
if let Ok(wid) = partitioner.get_partition(&id, self.workers) {
partitions.entry(wid).or_insert_with(Vec::new).push(id);
} else {
debug!("get server id failed in graph_partition_manager in source op");
}
}

self.src = Some(partitions);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
//
//! Copyright 2021 Alibaba Group Holding Limited.
//!
//! Licensed under the Apache License, Version 2.0 (the "License");
//! you may not use this file except in compliance with the License.
//! You may obtain a copy of the License at
//!
//! http://www.apache.org/licenses/LICENSE-2.0
//!
//! Unless required by applicable law or agreed to in writing, software
//! distributed under the License is distributed on an "AS IS" BASIS,
//! WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
//! See the License for the specific language governing permissions and
//! limitations under the License.

use crate::process::traversal::traverser::Traverser;
use crate::{Element, Partitioner};
use pegasus::api::function::{FnResult, RouteFunction};
use std::sync::Arc;

pub struct Router {
pub p: Arc<dyn Partitioner>,
pub num_workers: usize,
}

impl RouteFunction<Traverser> for Router {
fn route(&self, t: &Traverser) -> FnResult<u64> {
if let Some(e) = t.get_element() {
self.p.get_partition(&e.id(), self.num_workers)
} else {
Ok(0)
}
}
}
17 changes: 17 additions & 0 deletions research/gaia/gremlin/gs_gremlin/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
[package]
name = "gs_gremlin"
version = "0.1.0"
edition = "2018"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
log = "0.4"
maxgraph-store = {path = "../../../../interactive_engine/src/executor/store"}
gremlin_core = {path="../gremlin_core"}
dyn_type = { path = "../../dyn_type" }
structopt = "0.2"
tokio = { version = "1.0", features = ["macros", "sync"] }
pegasus = { path = "../../pegasus/pegasus" }
pegasus_server = { path = "../../pegasus/server" }
pegasus_common = { path = "../../pegasus/common" }
Loading

0 comments on commit 55adea9

Please sign in to comment.