Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[store_adapter] Make common functions/macros public for storage adapt #519

Merged
merged 1 commit into from
Jul 12, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 67 additions & 0 deletions research/gaia/gremlin/gremlin_core/src/graph_proxy/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
//
//! Copyright 2020 Alibaba Group Holding Limited.
//!
//! Licensed under the Apache License, Version 2.0 (the "License");
//! you may not use this file except in compliance with the License.
//! You may obtain a copy of the License at
//!
//! http://www.apache.org/licenses/LICENSE-2.0
//!
//! Unless required by applicable law or agreed to in writing, software
//! distributed under the License is distributed on an "AS IS" BASIS,
//! WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
//! See the License for the specific language governing permissions and
//! limitations under the License.

mod storage;
use crate::structure::Statement;
use crate::DynResult;
use pegasus::api::function::DynIter;
pub use storage::{create_demo_graph, encode_store_e_id, ID_MASK};

pub fn from_fn<I, O, F>(func: F) -> Box<dyn Statement<I, O>>
where
F: Fn(I) -> DynResult<DynIter<O>> + Send + Sync + 'static,
{
Box::new(func) as Box<dyn Statement<I, O>>
}

#[macro_export]
macro_rules! limit_n {
($iter: expr, $n: expr) => {
if let Some(limit) = $n {
let r = $iter.take(limit);
Box::new(r)
} else {
Box::new($iter)
}
};
}

#[macro_export]
macro_rules! filter_limit {
($iter: expr, $f: expr, $n: expr) => {
if let Some(ref f) = $f {
let f = f.clone();
let r = $iter.filter(move |v| f.test(v).unwrap_or(false));
limit_n!(r, $n)
} else {
let r = $iter;
limit_n!(r, $n)
}
};
}

#[macro_export]
macro_rules! filter_limit_ok {
($iter: expr, $f: expr, $n: expr) => {
if let Some(ref f) = $f {
let f = f.clone();
let r = $iter.filter(move |v| f.test(v).unwrap_or(false)).map(|v| Ok(v));
limit_n!(r, $n)
} else {
let r = $iter.map(|v| Ok(v));
limit_n!(r, $n)
}
};
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,20 +13,20 @@
//! See the License for the specific language governing permissions and
//! limitations under the License.

use crate::graph_proxy::from_fn;
use crate::structure::{
DefaultDetails, Details, Direction, DynDetails, Edge, Label, PropKey, QueryParams, Statement,
Vertex, ID_BITS,
DefaultDetails, Details, Direction, DynDetails, Edge, Label, LabelId, PropKey, QueryParams,
Statement, Vertex, ID_BITS,
};
use crate::{filter_limit, filter_limit_ok, limit_n};
use crate::{register_graph, DynResult, GraphProxy, ID};
use dyn_type::BorrowObject;
use graph_store::config::{JsonConf, DIR_GRAPH_SCHEMA, FILE_SCHEMA};
use graph_store::ldbc::LDBCVertexParser;
use graph_store::prelude::{
DefaultId, EdgeId, GlobalStoreTrait, GlobalStoreUpdate, GraphDBConfig, InternalId,
LDBCGraphSchema, LabelId, LargeGraphDB, LocalEdge, LocalVertex, MutableGraphDB, Row,
INVALID_LABEL_ID,
LDBCGraphSchema, LargeGraphDB, LocalEdge, LocalVertex, MutableGraphDB, Row, INVALID_LABEL_ID,
};
use pegasus::api::function::DynIter;
use pegasus_common::downcast::*;
use std::collections::HashMap;
use std::path::Path;
Expand Down Expand Up @@ -187,30 +187,6 @@ fn _init_modern_graph() -> LargeGraphDB<DefaultId, InternalId> {
mut_graph.into_graph(schema)
}

macro_rules! limit_n {
($iter: expr, $n: expr) => {
if let Some(limit) = $n {
let r = $iter.take(limit);
Box::new(r)
} else {
Box::new($iter)
}
};
}

macro_rules! filter_limit_ok {
($iter: expr, $f: expr, $n: expr) => {
if let Some(ref f) = $f {
let f = f.clone();
let r = $iter.filter(move |v| f.test(v).unwrap_or(false)).map(|v| Ok(v));
limit_n!(r, $n)
} else {
let r = $iter.map(|v| Ok(v));
limit_n!(r, $n)
}
};
}

impl GraphProxy for DemoGraph {
fn scan_vertex(
&self, params: &QueryParams<Vertex>,
Expand All @@ -224,13 +200,7 @@ impl GraphProxy for DemoGraph {
// to_runtime_vertex_with_property(v, params.props.as_ref())
});

if let Some(ref filter) = params.filter {
let f = filter.clone();
let result = result.filter(move |v| f.test(v).unwrap_or(false));
Ok(limit_n!(result, params.limit))
} else {
Ok(limit_n!(result, params.limit))
}
Ok(filter_limit!(result, params.filter, params.limit))
}

fn scan_edge(
Expand All @@ -241,13 +211,7 @@ impl GraphProxy for DemoGraph {
let result =
self.store.get_all_edges(label_ids.as_ref()).map(move |e| to_runtime_edge(e, store));

if let Some(ref filter) = params.filter {
let f = filter.clone();
let result = result.filter(move |e| f.test(e).unwrap_or(false));
Ok(limit_n!(result, params.limit))
} else {
Ok(limit_n!(result, params.limit))
}
Ok(filter_limit!(result, params.filter, params.limit))
}

fn get_vertex(
Expand All @@ -261,17 +225,10 @@ impl GraphProxy for DemoGraph {
} else {
to_runtime_vertex(local_vertex, self.store)
};
if let Some(ref filter) = params.filter {
if filter.test(&v).unwrap_or(false) {
result.push(v);
}
} else {
result.push(v);
}
result.push(v);
}
}

DynResult::Ok(Box::new(result.into_iter()))
Ok(filter_limit!(result.into_iter(), params.filter, None))
}

fn get_edge(
Expand All @@ -282,17 +239,10 @@ impl GraphProxy for DemoGraph {
let eid = encode_store_e_id(id);
if let Some(local_edge) = self.store.get_edge(eid) {
let e = to_runtime_edge(local_edge, self.store);
if let Some(ref filter) = params.filter {
if filter.test(&e).unwrap_or(false) {
result.push(e);
}
} else {
result.push(e);
}
result.push(e);
}
}

DynResult::Ok(Box::new(result.into_iter()))
Ok(filter_limit!(result.into_iter(), params.filter, None))
}

fn prepare_explore_vertex(
Expand Down Expand Up @@ -486,14 +436,6 @@ impl Details for LazyEdgeDetails {
}
}

#[inline]
fn from_fn<I, O, F>(func: F) -> Box<dyn Statement<I, O>>
where
F: Fn(I) -> DynResult<DynIter<O>> + Send + Sync + 'static,
{
Box::new(func) as Box<dyn Statement<I, O>>
}

fn encode_runtime_v_id(v: &LocalVertex<DefaultId>) -> ID {
v.get_id() as ID
}
Expand Down
5 changes: 3 additions & 2 deletions research/gaia/gremlin/gremlin_core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,14 @@ pub mod structure;

pub mod compiler;
mod result_process;
mod storage;
#[macro_use]
pub mod graph_proxy;

use crate::result_process::result_to_pb;
use crate::structure::filter::codec::ParseError;
pub use generated::gremlin::GremlinStep as GremlinStepPb;
pub use graph_proxy::{create_demo_graph, ID_MASK};
use std::io;
pub use storage::{create_demo_graph, ID_MASK, ID_SHIFT_BITS};

#[cfg(feature = "proto_inplace")]
mod generated {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,11 @@ use super::FlatMapFuncGen;
use crate::generated::gremlin as pb;
use crate::process::traversal::traverser::{Traverser, TraverserSplitIter};
use crate::structure::codec::pb_chain_to_filter;
use crate::structure::{Direction, Element, GraphElement, Label, QueryParams, Statement, ID};
use crate::structure::{
Direction, Element, GraphElement, Label, LabelId, QueryParams, Statement, ID,
};
use crate::{str_to_dyn_error, DynIter, DynResult, FromPb};
use bit_set::BitSet;
use graph_store::prelude::LabelId;
use pegasus::api::function::FlatMapFunction;
use std::sync::Arc;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,9 @@ use crate::process::traversal::step::util::StepSymbol;
use crate::process::traversal::step::Step;
use crate::process::traversal::traverser::{Requirement, Traverser};
use crate::structure::codec::pb_chain_to_filter;
use crate::structure::{Edge, Label, QueryParams, Vertex, ID};
use crate::structure::{Edge, Label, LabelId, QueryParams, Vertex, ID};
use crate::{FromPb, Partitioner};
use bit_set::BitSet;
use graph_store::common::LabelId;
use pegasus::BuildJobError;
use pegasus_common::downcast::*;
use prost::alloc::str::FromStr;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ use crate::structure::property::DynDetails;
use dyn_type::object::Primitives;
use dyn_type::Object;
pub use edge::Edge;
use graph_store::common::LabelId;
use pegasus::codec::{Decode, Encode, ReadExt, WriteExt};
use std::fmt::Debug;
use std::io;
Expand All @@ -26,6 +25,8 @@ pub use vertex::Vertex;

/// The type of either vertex or edge id
pub type ID = u128;
/// The type of LabelId defined in Runtime
pub type LabelId = u8;

/// The number of bits in an `ID`
pub const ID_BITS: usize = std::mem::size_of::<ID>() * 8;
Expand Down
2 changes: 1 addition & 1 deletion research/gaia/gremlin/gremlin_core/src/structure/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ mod property;
use crate::generated::gremlin as pb;
use crate::structure::codec::ParseError;
use crate::FromPb;
pub use element::{Edge, Element, GraphElement, Label, Vertex, VertexOrEdge, ID, ID_BITS};
pub use element::{Edge, Element, GraphElement, Label, LabelId, Vertex, VertexOrEdge, ID, ID_BITS};
pub use filter::*;
pub use graph::*;
pub use property::{DefaultDetails, Details, DynDetails, PropId, PropKey, Token};
Expand Down