Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GAIA] Add storage adapter for graphscope store #521

Merged
merged 10 commits into from
Jul 14, 2021
2 changes: 1 addition & 1 deletion interactive_engine/src/common/rust/common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ build = "build.rs"
[dependencies]
serde_json = "1.0"
protobuf = { version = "~2.0", features = ["with-bytes"] }
pnet = "0.23.0"
pnet = "0.27.0"
log = "0.3"
log4rs = "0.8.0"
serde = "1.0.72"
Expand Down
1 change: 1 addition & 0 deletions interactive_engine/src/executor/store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ maxgraph-common = { path = "../../common/rust/common" }

[dependencies.rocksdb]
git = "https://github.com/pingcap/rust-rocksdb.git"
rev = "23a604459899f4f7a4c8967f16b6fa623474f27b"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we depend on a general/stable verison?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a historical issue: the MaxGraph storage has depended on this wired version. While it is necessary to migrate to a stable version, I think we can do that by creating another issue.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, created as issue #525


[build-dependencies]
protoc-grpcio = "0.3.0"
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#![allow(dead_code)]
use serde::export::PhantomData;
use std::marker::PhantomData;
use protobuf::Message;
use crate::db::api::{GraphResult, GraphError};
use crate::db::api::GraphErrorCode::InvalidData;
Expand Down
9 changes: 1 addition & 8 deletions research/gaia/gremlin/gremlin_core/src/compiler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
use crate::process::traversal::step::*;
use crate::process::traversal::step::{BySubJoin, HasAnyJoin};
use crate::process::traversal::traverser::Traverser;
use crate::structure::Element;
use crate::Partitioner;
use crate::{generated as pb, TraverserSinkEncoder};
use pegasus::api::function::*;
Expand Down Expand Up @@ -55,13 +54,7 @@ impl JobCompiler<Traverser> for GremlinJobCompiler {
let p = self.partitioner.clone();
if let Some(worker_id) = pegasus::get_current_worker() {
let num_workers = worker_id.peers as usize / self.num_servers;
Ok(box_route!(move |t: &Traverser| -> u64 {
if let Some(e) = t.get_element() {
p.get_partition(&e.id(), num_workers)
} else {
0
}
}))
Ok(Box::new(Router { p, num_workers }))
} else {
Err("worker id not found")?
}
Expand Down
10 changes: 5 additions & 5 deletions research/gaia/gremlin/gremlin_core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ extern crate enum_dispatch;
extern crate lazy_static;
#[macro_use]
extern crate pegasus_common;
#[macro_use]
extern crate pegasus;
#[macro_use]
extern crate pegasus_config;
Expand Down Expand Up @@ -51,6 +50,7 @@ use crate::result_process::result_to_pb;
use crate::structure::filter::codec::ParseError;
pub use generated::gremlin::GremlinStep as GremlinStepPb;
pub use graph_proxy::{create_demo_graph, ID_MASK};
pub use graph_store::utils::IterList;
use std::io;

#[cfg(feature = "proto_inplace")]
Expand Down Expand Up @@ -91,7 +91,7 @@ impl From<ParseError> for DynError {

/// A tricky bypassing of Rust's compiler. It is useful to simplify throwing a `DynError`
/// from a `&str` as `Err(str_to_dyn_err('some str'))`
fn str_to_dyn_error(str: &str) -> DynError {
pub fn str_to_dyn_error(str: &str) -> DynError {
let err: Box<dyn std::error::Error + Send + Sync> = str.into();
err
}
Expand All @@ -107,7 +107,7 @@ pub trait FromPb<T> {
}

pub trait Partitioner: Send + Sync + 'static {
fn get_partition(&self, id: &ID, job_workers: usize) -> u64;
fn get_partition(&self, id: &ID, job_workers: usize) -> DynResult<u64>;
}

/// A simple partition utility
Expand All @@ -116,7 +116,7 @@ pub struct Partition {
}

impl Partitioner for Partition {
fn get_partition(&self, id: &ID, workers: usize) -> u64 {
fn get_partition(&self, id: &ID, workers: usize) -> DynResult<u64> {
let id_usize = (*id & (ID_MASK)) as usize;
let magic_num = id_usize / self.num_servers;
// The partitioning logics is as follows:
Expand All @@ -125,7 +125,7 @@ impl Partitioner for Partition {
// 2. `R * workers` shifts the worker's id in the machine R.
// 3. `magic_num % workers` then picks up one of the workers in the machine R
// to do the computation.
((id_usize - magic_num * self.num_servers) * workers + magic_num % workers) as u64
Ok(((id_usize - magic_num * self.num_servers) * workers + magic_num % workers) as u64)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ mod order_by;
mod sink;
mod source;
mod sub_traversal;
mod traverser_router;
mod util;

use crate::structure::{Tag, INIT_TAG_NUM};
Expand All @@ -82,4 +83,5 @@ pub use sink::SinkFuncGen;
pub use source::graph_step_from;
pub use source::GraphVertexStep;
pub use sub_traversal::{BySubJoin, GroupBySubJoin, HasAnyJoin, JoinFuncGen, SelectBySubJoin};
pub use traverser_router::Router;
pub use util::result_downcast;
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,11 @@ impl GraphVertexStep {
pub fn set_src(&mut self, ids: Vec<ID>, partitioner: Arc<dyn Partitioner>) {
let mut partitions = HashMap::new();
for id in ids {
let wid = partitioner.get_partition(&id, self.workers);
partitions.entry(wid).or_insert_with(Vec::new).push(id);
if let Ok(wid) = partitioner.get_partition(&id, self.workers) {
partitions.entry(wid).or_insert_with(Vec::new).push(id);
} else {
debug!("get server id failed in graph_partition_manager in source op");
}
}

self.src = Some(partitions);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
//
//! Copyright 2021 Alibaba Group Holding Limited.
//!
//! Licensed under the Apache License, Version 2.0 (the "License");
//! you may not use this file except in compliance with the License.
//! You may obtain a copy of the License at
//!
//! http://www.apache.org/licenses/LICENSE-2.0
//!
//! Unless required by applicable law or agreed to in writing, software
//! distributed under the License is distributed on an "AS IS" BASIS,
//! WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
//! See the License for the specific language governing permissions and
//! limitations under the License.

use crate::process::traversal::traverser::Traverser;
use crate::{Element, Partitioner};
use pegasus::api::function::{FnResult, RouteFunction};
use std::sync::Arc;

pub struct Router {
pub p: Arc<dyn Partitioner>,
pub num_workers: usize,
}

impl RouteFunction<Traverser> for Router {
fn route(&self, t: &Traverser) -> FnResult<u64> {
if let Some(e) = t.get_element() {
self.p.get_partition(&e.id(), self.num_workers)
} else {
Ok(0)
}
}
}
17 changes: 17 additions & 0 deletions research/gaia/gremlin/gs_gremlin/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
[package]
name = "gs_gremlin"
version = "0.1.0"
edition = "2018"
yecol marked this conversation as resolved.
Show resolved Hide resolved

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
log = "0.4"
maxgraph-store = {path = "../../../../interactive_engine/src/executor/store"}
gremlin_core = {path="../gremlin_core"}
dyn_type = { path = "../../dyn_type" }
structopt = "0.2"
tokio = { version = "1.0", features = ["macros", "sync"] }
pegasus = { path = "../../pegasus/pegasus" }
pegasus_server = { path = "../../pegasus/server" }
pegasus_common = { path = "../../pegasus/common" }
Loading