Skip to content

Commit

Permalink
impl hybrid runtime vertex
Browse files Browse the repository at this point in the history
  • Loading branch information
waruto210 committed Sep 9, 2022
1 parent 48268f6 commit 254810c
Show file tree
Hide file tree
Showing 2 changed files with 177 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ use crate::apis::{
from_fn, register_graph, Details, Direction, DynDetails, Edge, PropertyValue, QueryParams, ReadGraph,
Statement, Vertex, ID,
};
use crate::utils::expr::eval_pred::{EvalPred, PEvaluator};
use crate::utils::expr::eval_pred::{zip_option_vecs, EvalPred, PEvaluator};
use crate::{filter_limit, filter_sample_limit, limit_n, sample_limit};
use crate::{GraphProxyError, GraphProxyResult};

Expand Down Expand Up @@ -110,8 +110,9 @@ where

// props that will be used in futher compute
let cache_prop_ids = encode_storage_prop_keys(params.columns.as_ref())?;
let column_filter_pushdown = self.column_filter_pushdown;
// props that will be returned by storage layer
let prop_ids = if self.column_filter_pushdown {
let prop_ids = if column_filter_pushdown {
if row_filter_exists_but_not_pushdown {
// need to call filter_limit!, so get columns in row_filter and params.columns
extract_needed_columns(row_filter.as_ref(), cache_prop_ids.as_ref())?
Expand Down Expand Up @@ -144,7 +145,13 @@ where
// Each worker will scan the partitions pre-allocated in source operator. Same as follows.
partitions.as_ref(),
)
.map(move |v| to_runtime_vertex(v, cache_prop_ids.clone()));
.map(move |v| {
if column_filter_pushdown {
to_hybrid_runtime_vertex(v);
} else {
to_runtime_vertex(v, cache_prop_ids.clone());
}
});

if row_filter_exists_but_not_pushdown {
// fall back to call filter_limit! to do row filter
Expand Down Expand Up @@ -256,8 +263,9 @@ where

// props that will be used in futher compute
let cache_prop_ids = encode_storage_prop_keys(params.columns.as_ref())?;
let column_filter_pushdown = self.column_filter_pushdown;
// also need props in filter, because `filter_limit!`
let prop_ids = if self.column_filter_pushdown {
let prop_ids = if column_filter_pushdown {
extract_needed_columns(params.filter.as_ref(), cache_prop_ids.as_ref())?
} else {
// column filter not pushdown, ir assume that it can get all props locally
Expand All @@ -270,7 +278,13 @@ where

let result = store
.get_vertex_properties(si, partition_label_vertex_ids.clone(), prop_ids.as_ref())
.map(move |v| to_runtime_vertex(v, cache_prop_ids.clone()));
.map(move |v| {
if column_filter_pushdown {
to_hybrid_runtime_vertex(v);
} else {
to_runtime_vertex(v, cache_prop_ids.clone());
}
});

Ok(filter_limit!(result, filter, None))
}
Expand Down Expand Up @@ -472,6 +486,17 @@ where
Vertex::new(id, Some(label), DynDetails::lazy(details))
}

#[inline]
fn to_hybrid_runtime_vertex<V>(v: V) -> Vertex
where
V: 'static + StoreVertex,
{
let id = v.get_id() as ID;
let label = encode_runtime_v_label(&v);
let details = HybridVertexDetails::new(v);
Vertex::new(id, Some(label), DynDetails::lazy(details))
}

#[inline]
fn to_empty_vertex<V: StoreVertex>(v: &V) -> Vertex {
let id = v.get_id() as ID;
Expand Down Expand Up @@ -524,6 +549,151 @@ fn edge_trim(mut edge: Edge, columns: Option<&Vec<NameOrId>>) -> Edge {
edge
}

pub struct HybridVertexDetails<V>
where
V: StoreVertex + 'static,
{
lazy: AtomicPtr<V>,
cached: Option<HashMap<PropId, Object>>,
}

impl<V> HybridVertexDetails<V>
where
V: StoreVertex + 'static,
{
pub fn new(v: V) -> Self {
let ptr = Box::into_raw(Box::new(v));
HybridVertexDetails { lazy: AtomicPtr::new(ptr), cached: None }
}

fn get_vertex_ptr(&self) -> Option<*mut V> {
let ptr = self.inner.load(Ordering::SeqCst);
if ptr.is_null() {
None
} else {
Some(ptr)
}
}
}

impl<V> fmt::Debug for HybridVertexDetails<V>
where
V: StoreVertex + 'static,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("gs_store LazyVertexDetails")
.field("lazy", &self.lazy)
.field("cached", &self.cached)
.finish()
}
}

impl<V> AsAny for HybridVertexDetails<V>
where
V: StoreVertex + 'static,
{
fn as_any_mut(&mut self) -> &mut dyn Any {
self
}

fn as_any_ref(&self) -> &dyn Any {
self
}
}

impl<V> Drop for HybridVertexDetails<V>
where
V: StoreVertex + 'static,
{
fn drop(&mut self) {
let ptr = self.lazy.load(Ordering::SeqCst);
if !ptr.is_null() {
unsafe {
std::ptr::drop_in_place(ptr);
}
}
}
}

impl<V> Details for HybridVertexDetails<V>
where
V: StoreVertex + 'static,
{
fn get_property(&self, key: &NameOrId) -> Option<PropertyValue> {
if let NameOrId::Id(key) = key {
if let Some(ref cached) = self.cached {
cached
.get(key as &PropId)
.map(|obj| PropertyValue::Owned(obj.clone()))
} else if let Some(ptr) = self.get_vertex_ptr() {
unsafe {
(*ptr)
.get_property(*key as PropId)
.map(|prop| PropertyValue::Owned(encode_runtime_prop_val(prop)))
}
} else {
None
}
} else {
info!("Have not support getting property by prop_name in gs_store yet");
None
}
}

fn get_all_properties(&self) -> Option<HashMap<NameOrId, Object>> {
// note that column filter is pushed down, so just return all props
let lazy_props: Option<HashMap<NameOrId, Object>> = if let Some(ptr) = self.get_vertex_ptr() {
unsafe {
let props: HashMap<NameOrId, Object> = ptr
.get_properties()
.map(|(prop_id, prop_val)| encode_runtime_property(prop_id, prop_val))
.collect();
if props.is_empty() {
None
} else {
Some(props)
}
}
} else {
None
};
let cached_props = if let Some(ref cached) = self.cached {
let props: HashMap<NameOrId, Object> = cached
.iter()
.map(|(k, v)| (NameOrId::Id(*Property as KeyId), v.clone()))
.collect();
if props.is_empty() {
None
} else {
Some(props)
}
} else {
None
};

match (lazy_props, cached_props) {
(None, None) => None,
(Some(lazy_props), None) => Some(lazy_props),
(None, Some(cached_props)) => Some(cached_props),
(Some(mut lazy_props), Some(cached_props)) => Some(lazy_props.extend(cached_props)),
}
}

fn insert_property(&mut self, key: NameOrId, value: Object) {
if let NameOrId::Id(key) = key {
if let Some(ref mut cached) = self.cached {
cached.insert(key as PropId, value);
} else {
let mut props = HashMap::new();
props.insert(key as PropId, value);
self.cached = Some(props);
}
} else {
info!("Have not support insert property by prop_name in gs_store yet");
}
}
}

/// LazyVertexDetails is used for local property fetching optimization.
/// That is, the required properties will not be materialized until LazyVertexDetails need to be shuffled.
#[allow(dead_code)]
Expand Down Expand Up @@ -717,7 +887,7 @@ fn extract_needed_columns(
// Some(vec[]) means need all props, so can't merge it with props needed in filter
if let Some(out_columns) = out_columns {
if out_columns.is_empty() {
return Ok(Some(Vec::with_capacity(0)))
return Ok(Some(Vec::with_capacity(0)));
}
}

Expand Down
2 changes: 1 addition & 1 deletion interactive_engine/executor/ir/integrated/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ dyn_type = {path = "../../common/dyn_type"}
global_query = {path = "../../store/global_query"}

[features]
default = []
default = ["column_filter_push_down"]
proto_inplace = ["ir_common/proto_inplace", "pegasus_server/gcip"]
with_v6d = ["runtime/with_v6d"]
column_filter_push_down = []

0 comments on commit 254810c

Please sign in to comment.