Skip to content

Commit

Permalink
Fixes the inconsistent usage of msgpack/json in graph reporter
Browse files Browse the repository at this point in the history
Signed-off-by: Tao He <linzhu.ht@alibaba-inc.com>
  • Loading branch information
sighingnow committed Jun 9, 2023
1 parent 1ba6102 commit 1e423ee
Show file tree
Hide file tree
Showing 11 changed files with 53 additions and 41 deletions.
4 changes: 2 additions & 2 deletions analytical_engine/core/fragment/dynamic_fragment.h
Original file line number Diff line number Diff line change
Expand Up @@ -1529,7 +1529,7 @@ class DynamicFragmentMutator {
v_fid = partitioner.GetPartitionId(oid);
if (modify_type == rpc::NX_ADD_NODES) {
vm_ptr_->AddVertex(std::move(oid), gid);
if (!v_data.Empty()) {
if (v_data.IsObject() && !v_data.GetObject().ObjectEmpty()) {
for (const auto& prop : v_data.GetObject()) {
if (!fragment_->schema_["vertex"].HasMember(prop.name)) {
dynamic::Value key(prop.name);
Expand Down Expand Up @@ -1606,7 +1606,7 @@ class DynamicFragmentMutator {
std::move(empty_data));
}
}
if (!e_data.Empty()) {
if (e_data.IsObject() && !e_data.GetObject().ObjectEmpty()) {
for (const auto& prop : e_data.GetObject()) {
if (!fragment_->schema_["edge"].HasMember(prop.name)) {
dynamic::Value key(prop.name);
Expand Down
9 changes: 6 additions & 3 deletions analytical_engine/core/fragment/fragment_reporter.h
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,6 @@ class DynamicFragmentReporter : public grape::Communicator {
}
break;
}

case rpc::HAS_EDGE: {
BOOST_LEAF_AUTO(edge_in_json, params.Get<std::string>(rpc::EDGE));
dynamic::Value edge;
Expand Down Expand Up @@ -761,7 +760,9 @@ class ArrowFragmentReporter<
}
}
// archive the start gid and nodes attribute array.
arc << gid << nodes_attr;
msgpack::sbuffer sbuf;
msgpack::pack(&sbuf, nodes_attr);
arc << gid << sbuf;
}
}

Expand Down Expand Up @@ -859,7 +860,9 @@ class ArrowFragmentReporter<
}
}
// archive the start gid and edges attributes array.
arc << gid << adj_list;
msgpack::sbuffer sbuf;
msgpack::pack(&sbuf, adj_list);
arc << gid << sbuf;
}
}

Expand Down
4 changes: 4 additions & 0 deletions python/graphscope/client/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -787,6 +787,10 @@ def info(self):
def closed(self):
return self._closed

@property
def disconnected(self):
return self._grpc_client is None or self._disconnected

def eager(self):
return self._config_params["mode"] == "eager"

Expand Down
7 changes: 5 additions & 2 deletions python/graphscope/framework/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,10 @@ class name.
# add op to dag
self._session.dag.add_op(self._op)

# statically create the unload op to prevent a possible segmentation fault
# inside the protobuf library.
self._unload_op = unload_app(self)

def __repr__(self):
s = f"graphscope.App <type: {self._app_assets.type}, algorithm: {self._app_assets.algo} "
s += f"bounded_graph: {str(self._graph)}>"
Expand Down Expand Up @@ -420,8 +424,7 @@ def _unload(self):
Returns:
:class:`graphscope.framework.app.UnloadedApp`: Evaluated in eager mode.
"""
op = unload_app(self)
return UnloadedApp(self._session, op)
return UnloadedApp(self._session, self._unload_op)


class App(object):
Expand Down
6 changes: 4 additions & 2 deletions python/graphscope/framework/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,9 @@ def __init__(self, bound_app, graph, *args, **kwargs):
self._op = run_app(self._bound_app, *args, **kwargs)
self._session.dag.add_op(self._op)

# statically create the unload op
self._unload_op = dag_utils.unload_context(self)

def _check_selector(self, selector):
raise NotImplementedError()

Expand Down Expand Up @@ -279,8 +282,7 @@ def __del__(self):
pass

def _unload(self):
op = dag_utils.unload_context(self)
return UnloadedContext(self._session, op)
return UnloadedContext(self._session, self._unload_op)


class TensorContextDAGNode(BaseContextDAGNode):
Expand Down
6 changes: 4 additions & 2 deletions python/graphscope/framework/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,9 @@ def __init__(
self._resolve_op(incoming_data)
self._session.dag.add_op(self._op)

# statically create the unload op
self._unload_op = dag_utils.unload_graph(self)

@property
def v_labels(self):
return self._v_labels
Expand Down Expand Up @@ -714,8 +717,7 @@ def _unload(self):
Returns:
:class:`graphscope.framework.graph.UnloadedGraph`: Evaluated in eager mode.
"""
op = dag_utils.unload_graph(self)
return UnloadedGraph(self._session, op)
return UnloadedGraph(self._session, self._unload_op)

def project(
self,
Expand Down
22 changes: 10 additions & 12 deletions python/graphscope/nx/classes/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,9 +113,8 @@ def get_pred_attr(self, n):
def align_node_attr_cache(self):
"""Check and align the node attr cache with node id cache"""
if self.enable_iter_cache and self.node_attr_align is False:
f = self.futures["node_attr"]
if f is not None:
start_gid, self.node_attr_cache = f.result()
if self.futures["node_attr"] is not None:
start_gid, self.node_attr_cache = self.futures["node_attr"].result()
if start_gid == self.iter_pre_gid:
# align to current node_id_cache
if self.iter_gid != self.iter_pre_gid:
Expand All @@ -129,8 +128,7 @@ def align_node_attr_cache(self):
def align_succ_cache(self):
"""Check and align the succ neighbor cache with node id cache"""
if self.enable_iter_cache and self.succ_align is False:
f = self.futures["succ"]
start_gid, self.succ_cache = f.result()
start_gid, self.succ_cache = self.futures["succ"].result()
if start_gid == self.iter_pre_gid:
if self.iter_gid != self.iter_pre_gid:
self._async_fetch_succ_cache(self.iter_gid)
Expand All @@ -143,9 +141,8 @@ def align_succ_cache(self):
def align_succ_attr_cache(self):
"""Check and align the succ neighbor attr cache with node id cache"""
if self.enable_iter_cache and self.succ_attr_align is False:
f = self.futures["succ_attr"]
if f is not None:
start_gid, self.succ_attr_cache = f.result()
if self.futures["succ_attr"] is not None:
start_gid, self.succ_attr_cache = self.futures["succ_attr"].result()
if start_gid == self.iter_pre_gid:
if self.iter_gid != self.iter_pre_gid:
self._async_fetch_succ_attr_cache(self.iter_gid)
Expand All @@ -160,8 +157,7 @@ def align_pred_cache(self):
if self.enable_iter_cache and self.pred_align is False:
if self.futures["pred"] is None:
self._async_fetch_pred_cache(self.iter_pre_gid)
f = self.futures["pred"]
start_gid, self.pred_cache = f.result()
start_gid, self.pred_cache = self.futures["pred"].result()
if start_gid == self.iter_pre_gid:
if self.iter_gid != self.iter_pre_gid:
self._async_fetch_pred_cache(self.iter_gid)
Expand All @@ -177,8 +173,7 @@ def align_pred_attr_cache(self):
if self.enable_iter_cache and self.pred_attr_align is False:
if self.futures["pred_attr"] is None:
self._async_fetch_pred_attr_cache(self.iter_pre_gid)
f = self.futures["pred_attr"]
start_gid, self.pred_attr_cache = f.result()
start_gid, self.pred_attr_cache = self.futures["pred_attr"].result()
if start_gid == self.iter_pre_gid:
if self.iter_gid != self.iter_pre_gid:
self._async_fetch_pred_attr_cache(self.iter_gid)
Expand Down Expand Up @@ -259,6 +254,9 @@ def shutdown(self):
pass
future = None

def shutdown_executor(self):
self.executor.shutdown(wait=True)

def clear(self):
"""Clear batch cache and lru cache, reset the status and warmup again"""
if self.enable_iter_cache:
Expand Down
27 changes: 13 additions & 14 deletions python/graphscope/nx/classes/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
#

import copy
import threading

import orjson as json
from networkx import freeze
Expand Down Expand Up @@ -369,29 +368,29 @@ def __init__(self, incoming_graph_data=None, default_label=None, **attr):
self._saved_signature = self.signature
self._is_client_view = False

# statically create the unload op
if self.op is None:
self._unload_op = None
else:
self._unload_op = dag_utils.unload_graph(self)

def _is_gs_graph(self, incoming_graph_data):
return (
hasattr(incoming_graph_data, "graph_type")
and incoming_graph_data.graph_type == graph_def_pb2.ARROW_PROPERTY
)

def __del__(self):
if self._session.info["status"] != "active" or self._key is None:
if self._key is None or self._session.disconnected:
return

# use thread to avoid dead-lock
def _del(graph):
# cancel cache fetch future
if graph.cache.enable_iter_cache:
graph.cache.shutdown()
op = dag_utils.unload_graph(graph)
op.eval()
graph._key = None
if self.cache.enable_iter_cache:
self.cache.shutdown()
self.cache.shutdown_executor()

if not self._is_client_view:
t = threading.Thread(target=_del, args=(self,))
t.daemon = True
t.start()
if not self._is_client_view and self._unload_op is not None:
self._unload_op.eval()
self._key = None

@property
def op(self):
Expand Down
6 changes: 3 additions & 3 deletions python/graphscope/nx/tests/classes/test_graphviews.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@

@pytest.mark.usefixtures("graphscope_session")
class TestReverseView(test_gvs.TestReverseView):
def setup(self):
def setup_method(self):
self.G = nx.path_graph(9, create_using=nx.DiGraph())
self.rv = self.G.reverse(copy=False)
# self.rv = nx.reverse_view(self.G)
Expand Down Expand Up @@ -67,7 +67,7 @@ def test_pickle(self):

@pytest.mark.usefixtures("graphscope_session")
class TestToDirected(test_gvs.TestToDirected):
def setup(self):
def setup_method(self):
self.G = nx.path_graph(9)
self.dv = nx.to_directed(self.G)

Expand All @@ -87,7 +87,7 @@ def test_iter(self):

@pytest.mark.usefixtures("graphscope_session")
class TestToUndirected(test_gvs.TestToUndirected):
def setup(self):
def setup_method(self):
self.DG = nx.path_graph(9, create_using=nx.DiGraph())
self.uv = nx.to_undirected(self.DG)

Expand Down
2 changes: 1 addition & 1 deletion python/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ gremlinpython==3.6.3rc1
grpcio>=1.49
grpcio-tools>=1.49
kubernetes>=24.2.0
msgpack
msgpack>=1.0.5
mypy-protobuf>=3.4.0
nest_asyncio
networkx==2.8.0;python_version>="3.8"
Expand Down
1 change: 1 addition & 0 deletions python/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ def parsed_package_data():
return {
"graphscope": [
"VERSION",
"proto/*.pyi",
],
}

Expand Down

0 comments on commit 1e423ee

Please sign in to comment.