Skip to content

Commit

Permalink
impl hybrid runtime vertex, add ci test for column filter push down test
Browse files Browse the repository at this point in the history
  • Loading branch information
waruto210 committed Sep 9, 2022
1 parent 48268f6 commit fcd7190
Show file tree
Hide file tree
Showing 8 changed files with 238 additions and 43 deletions.
18 changes: 17 additions & 1 deletion .github/workflows/gss.yml
Original file line number Diff line number Diff line change
Expand Up @@ -74,13 +74,29 @@ jobs:
cmake .. && make -j && sudo make install && \
rm -fr /tmp/cppkafka
- name: Build GraphScope Store
- name: Build GraphScope Store With column filter push down
run: |
source ${HOME}/.bashrc
export SCCACHE_DIR=~/.cache/sccache
export RUSTC_WRAPPER=/usr/local/bin/sccache
sccache --start-server
cd ${GITHUB_WORKSPACE}/interactive_engine
mvn clean install -P groot,groot-assembly -Drust.compile.mode=debug -DskipTests -Dgroot.compile.feature="maxgraph-ffi/column_filter_push_down" --quiet
sccache --show-stats
- name: Gremlin Test
run: |
cd interactive_engine/groot-server
mvn test -P gremlin-test

- name: Build GraphScope Store
run: |
source ${HOME}/.bashrc
export SCCACHE_DIR=~/.cache/sccache
export RUSTC_WRAPPER=/usr/local/bin/sccache
cd ${GITHUB_WORKSPACE}/interactive_engine
mvn clean install -P groot,groot-assembly -Drust.compile.mode=debug -DskipTests --quiet
sccache --show-stats
Expand Down
3 changes: 3 additions & 0 deletions interactive_engine/executor/assembly/groot/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,6 @@ runtime_integration = { path = "../../ir/integrated" }
log = "0.3"
log4rs = "0.8.0"
tokio = { version = "1.0", features = ["macros", "sync"] }

[features]
column_filter_push_down = ["runtime_integration/column_filter_push_down"]
5 changes: 3 additions & 2 deletions interactive_engine/executor/build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ set -x

MODE=$1
SKIP=$2
FEATURE=$3

if [ "$SKIP" = "true" ]; then
exit 0
Expand All @@ -18,9 +19,9 @@ fi

cd assembly;
if [ "$MODE" = "debug" ]; then
../exec.sh cargo build --workspace
../exec.sh cargo build --workspace --features="$FEATURE"
elif [ "$MODE" = "release" ]; then
../exec.sh cargo build --workspace --release
../exec.sh cargo build --workspace --release --features="$FEATURE"
else
exit 1
fi
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,9 @@ where

// props that will be used in futher compute
let cache_prop_ids = encode_storage_prop_keys(params.columns.as_ref())?;
let column_filter_pushdown = self.column_filter_pushdown;
// props that will be returned by storage layer
let prop_ids = if self.column_filter_pushdown {
let prop_ids = if column_filter_pushdown {
if row_filter_exists_but_not_pushdown {
// need to call filter_limit!, so get columns in row_filter and params.columns
extract_needed_columns(row_filter.as_ref(), cache_prop_ids.as_ref())?
Expand Down Expand Up @@ -144,7 +145,13 @@ where
// Each worker will scan the partitions pre-allocated in source operator. Same as follows.
partitions.as_ref(),
)
.map(move |v| to_runtime_vertex(v, cache_prop_ids.clone()));
.map(move |v| {
if column_filter_pushdown {
to_hybrid_runtime_vertex(v)
} else {
to_runtime_vertex(v, cache_prop_ids.clone())
}
});

if row_filter_exists_but_not_pushdown {
// fall back to call filter_limit! to do row filter
Expand Down Expand Up @@ -256,8 +263,9 @@ where

// props that will be used in futher compute
let cache_prop_ids = encode_storage_prop_keys(params.columns.as_ref())?;
let column_filter_pushdown = self.column_filter_pushdown;
// also need props in filter, because `filter_limit!`
let prop_ids = if self.column_filter_pushdown {
let prop_ids = if column_filter_pushdown {
extract_needed_columns(params.filter.as_ref(), cache_prop_ids.as_ref())?
} else {
// column filter not pushdown, ir assume that it can get all props locally
Expand All @@ -270,7 +278,13 @@ where

let result = store
.get_vertex_properties(si, partition_label_vertex_ids.clone(), prop_ids.as_ref())
.map(move |v| to_runtime_vertex(v, cache_prop_ids.clone()));
.map(move |v| {
if column_filter_pushdown {
to_hybrid_runtime_vertex(v)
} else {
to_runtime_vertex(v, cache_prop_ids.clone())
}
});

Ok(filter_limit!(result, filter, None))
}
Expand Down Expand Up @@ -472,6 +486,17 @@ where
Vertex::new(id, Some(label), DynDetails::lazy(details))
}

#[inline]
fn to_hybrid_runtime_vertex<V>(v: V) -> Vertex
where
V: 'static + StoreVertex,
{
let id = v.get_id() as ID;
let label = encode_runtime_v_label(&v);
let details = HybridVertexDetails::new(v);
Vertex::new(id, Some(label), DynDetails::lazy(details))
}

#[inline]
fn to_empty_vertex<V: StoreVertex>(v: &V) -> Vertex {
let id = v.get_id() as ID;
Expand Down Expand Up @@ -524,6 +549,154 @@ fn edge_trim(mut edge: Edge, columns: Option<&Vec<NameOrId>>) -> Edge {
edge
}

pub struct HybridVertexDetails<V>
where
V: StoreVertex + 'static,
{
lazy: AtomicPtr<V>,
cached: Option<HashMap<PropId, Object>>,
}

impl<V> HybridVertexDetails<V>
where
V: StoreVertex + 'static,
{
pub fn new(v: V) -> Self {
let ptr = Box::into_raw(Box::new(v));
HybridVertexDetails { lazy: AtomicPtr::new(ptr), cached: None }
}

fn get_vertex_ptr(&self) -> Option<*mut V> {
let ptr = self.lazy.load(Ordering::SeqCst);
if ptr.is_null() {
None
} else {
Some(ptr)
}
}
}

impl<V> fmt::Debug for HybridVertexDetails<V>
where
V: StoreVertex + 'static,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("gs_store LazyVertexDetails")
.field("lazy", &self.lazy)
.field("cached", &self.cached)
.finish()
}
}

impl<V> AsAny for HybridVertexDetails<V>
where
V: StoreVertex + 'static,
{
fn as_any_mut(&mut self) -> &mut dyn Any {
self
}

fn as_any_ref(&self) -> &dyn Any {
self
}
}

impl<V> Drop for HybridVertexDetails<V>
where
V: StoreVertex + 'static,
{
fn drop(&mut self) {
let ptr = self.lazy.load(Ordering::SeqCst);
if !ptr.is_null() {
unsafe {
std::ptr::drop_in_place(ptr);
}
}
}
}

impl<V> Details for HybridVertexDetails<V>
where
V: StoreVertex + 'static,
{
fn get_property(&self, key: &NameOrId) -> Option<PropertyValue> {
if let NameOrId::Id(key) = key {
if let Some(ref cached) = self.cached {
cached
.get(&(*key as PropId))
.map(|obj| PropertyValue::Owned(obj.clone()))
} else if let Some(ptr) = self.get_vertex_ptr() {
unsafe {
(*ptr)
.get_property(*key as PropId)
.map(|prop| PropertyValue::Owned(encode_runtime_prop_val(prop)))
}
} else {
None
}
} else {
info!("Have not support getting property by prop_name in gs_store yet");
None
}
}

fn get_all_properties(&self) -> Option<HashMap<NameOrId, Object>> {
// note that column filter is pushed down, so just return all props
let lazy_props: Option<HashMap<NameOrId, Object>> = if let Some(ptr) = self.get_vertex_ptr() {
unsafe {
let props: HashMap<NameOrId, Object> = (*ptr)
.get_properties()
.map(|(prop_id, prop_val)| encode_runtime_property(prop_id, prop_val))
.collect();
if props.is_empty() {
None
} else {
Some(props)
}
}
} else {
None
};
let cached_props = if let Some(ref cached) = self.cached {
let props: HashMap<NameOrId, Object> = cached
.iter()
.map(|(k, v)| (NameOrId::Id(*k as KeyId), v.clone()))
.collect();
if props.is_empty() {
None
} else {
Some(props)
}
} else {
None
};

match (lazy_props, cached_props) {
(None, None) => None,
(Some(lazy_props), None) => Some(lazy_props),
(None, Some(cached_props)) => Some(cached_props),
(Some(mut lazy_props), Some(cached_props)) => {
lazy_props.extend(cached_props);
Some(lazy_props)
}
}
}

fn insert_property(&mut self, key: NameOrId, value: Object) {
if let NameOrId::Id(key) = key {
if let Some(ref mut cached) = self.cached {
cached.insert(key as PropId, value);
} else {
let mut props = HashMap::new();
props.insert(key as PropId, value);
self.cached = Some(props);
}
} else {
info!("Have not support insert property by prop_name in gs_store yet");
}
}
}

/// LazyVertexDetails is used for local property fetching optimization.
/// That is, the required properties will not be materialized until LazyVertexDetails need to be shuffled.
#[allow(dead_code)]
Expand Down Expand Up @@ -717,7 +890,7 @@ fn extract_needed_columns(
// Some(vec[]) means need all props, so can't merge it with props needed in filter
if let Some(out_columns) = out_columns {
if out_columns.is_empty() {
return Ok(Some(Vec::with_capacity(0)))
return Ok(Some(Vec::with_capacity(0)));
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,49 @@
use std::convert::{TryFrom, TryInto};

use global_query::store_api::condition::Operand as StoreOperand;
use global_query::store_api::{
condition::predicate::CmpOperator as StoreOprator,
condition::predicate::PredCondition as StorePredCondition,
};
use global_query::store_api::{Condition, ConditionBuilder};
use global_query::store_api::{prelude::Property, Condition, ConditionBuilder, PropId};
use ir_common::generated::common as common_pb;
use ir_common::NameOrId;

use crate::apis::PropKey;
use crate::utils::expr::eval::Operand;
use crate::utils::expr::eval_pred::{PEvaluator, Predicate, Predicates};
use crate::{GraphProxyError, GraphProxyResult};

impl Operand {
/// only get the PropId, else None
pub(crate) fn get_var_prop_id(&self) -> GraphProxyResult<PropId> {
match self {
Operand::Var { tag: None, prop_key: Some(prop_key) } => match prop_key {
PropKey::Key(NameOrId::Id(id)) => Ok(*id as PropId),
_ => Err(GraphProxyError::UnSupported(format!("var error {:?}", self))),
},
_ => Err(GraphProxyError::FilterPushDownError(format!("not a var {:?}", self))),
}
}

pub(crate) fn to_store_oprand(&self) -> GraphProxyResult<StoreOperand> {
match self {
Operand::Var { tag: None, prop_key: Some(prop_key) } => match prop_key {
PropKey::Key(NameOrId::Id(id)) => Ok(StoreOperand::PropId(*id as PropId)),
PropKey::Label => Ok(StoreOperand::Label),
PropKey::Id => Ok(StoreOperand::Id),
_ => Err(GraphProxyError::FilterPushDownError(format!("var error {:?}", self))),
},
Operand::Const(obj) => {
let prop = Property::from_borrow_object(obj.as_borrow())
.map_err(|e| GraphProxyError::FilterPushDownError(format!("{:?}", e)));
prop.map(StoreOperand::Const)
}
_ => Err(GraphProxyError::FilterPushDownError(format!("not a var {:?}", self))),
}
}
}

impl TryFrom<&Predicate> for StorePredCondition {
type Error = GraphProxyError;

Expand Down
33 changes: 0 additions & 33 deletions interactive_engine/executor/ir/graph_proxy/src/utils/expr/eval.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@ use std::convert::{TryFrom, TryInto};
use dyn_type::arith::Exp;
use dyn_type::object;
use dyn_type::{BorrowObject, Object};
use global_query::store_api::condition::Operand as StoreOperand;
use global_query::store_api::{prelude::Property, PropId};
use ir_common::error::{ParsePbError, ParsePbResult};
use ir_common::expr_parse::to_suffix_expr;
use ir_common::generated::common as common_pb;
Expand All @@ -31,7 +29,6 @@ use ir_common::{NameOrId, ALL_KEY, ID_KEY, LABEL_KEY, LENGTH_KEY};
use crate::apis::{Details, Element, PropKey};
use crate::utils::expr::eval_pred::EvalPred;
use crate::utils::expr::{ExprEvalError, ExprEvalResult};
use crate::{GraphProxyError, GraphProxyResult};

/// The trait to define evaluating an expression
pub trait Evaluate {
Expand All @@ -57,36 +54,6 @@ pub enum Operand {
VarMap(Vec<Operand>),
}

impl Operand {
/// only get the PropId, else None
pub(crate) fn get_var_prop_id(&self) -> GraphProxyResult<PropId> {
match self {
Operand::Var { tag: None, prop_key: Some(prop_key) } => match prop_key {
PropKey::Key(NameOrId::Id(id)) => Ok(*id as PropId),
_ => Err(GraphProxyError::UnSupported(format!("var error {:?}", self))),
},
_ => Err(GraphProxyError::FilterPushDownError(format!("not a var {:?}", self))),
}
}

pub(crate) fn to_store_oprand(&self) -> GraphProxyResult<StoreOperand> {
match self {
Operand::Var { tag: None, prop_key: Some(prop_key) } => match prop_key {
PropKey::Key(NameOrId::Id(id)) => Ok(StoreOperand::PropId(*id as PropId)),
PropKey::Label => Ok(StoreOperand::Label),
PropKey::Id => Ok(StoreOperand::Id),
_ => Err(GraphProxyError::FilterPushDownError(format!("var error {:?}", self))),
},
Operand::Const(obj) => {
let prop = Property::from_borrow_object(obj.as_borrow())
.map_err(|e| GraphProxyError::FilterPushDownError(format!("{:?}", e)));
prop.map(StoreOperand::Const)
}
_ => Err(GraphProxyError::FilterPushDownError(format!("not a var {:?}", self))),
}
}
}

/// An inner representation of `common_pb::ExprOpr` for one-shot translation of `common_pb::ExprOpr`.
#[derive(Debug, Clone)]
pub(crate) enum InnerOpr {
Expand Down
Loading

0 comments on commit fcd7190

Please sign in to comment.