Skip to content

Commit

Permalink
Fixes the inconsistent usage of msgpack/json in graph reporter
Browse files Browse the repository at this point in the history
Signed-off-by: Tao He <linzhu.ht@alibaba-inc.com>
  • Loading branch information
sighingnow committed Jun 9, 2023
1 parent 1ba6102 commit d8f1aef
Show file tree
Hide file tree
Showing 6 changed files with 20 additions and 21 deletions.
4 changes: 2 additions & 2 deletions analytical_engine/core/fragment/dynamic_fragment.h
Original file line number Diff line number Diff line change
Expand Up @@ -1529,7 +1529,7 @@ class DynamicFragmentMutator {
v_fid = partitioner.GetPartitionId(oid);
if (modify_type == rpc::NX_ADD_NODES) {
vm_ptr_->AddVertex(std::move(oid), gid);
if (!v_data.Empty()) {
if (v_data.IsObject() && !v_data.GetObject().ObjectEmpty()) {
for (const auto& prop : v_data.GetObject()) {
if (!fragment_->schema_["vertex"].HasMember(prop.name)) {
dynamic::Value key(prop.name);
Expand Down Expand Up @@ -1606,7 +1606,7 @@ class DynamicFragmentMutator {
std::move(empty_data));
}
}
if (!e_data.Empty()) {
if (e_data.IsObject() && !e_data.GetObject().ObjectEmpty()) {
for (const auto& prop : e_data.GetObject()) {
if (!fragment_->schema_["edge"].HasMember(prop.name)) {
dynamic::Value key(prop.name);
Expand Down
9 changes: 6 additions & 3 deletions analytical_engine/core/fragment/fragment_reporter.h
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,6 @@ class DynamicFragmentReporter : public grape::Communicator {
}
break;
}

case rpc::HAS_EDGE: {
BOOST_LEAF_AUTO(edge_in_json, params.Get<std::string>(rpc::EDGE));
dynamic::Value edge;
Expand Down Expand Up @@ -761,7 +760,9 @@ class ArrowFragmentReporter<
}
}
// archive the start gid and nodes attribute array.
arc << gid << nodes_attr;
msgpack::sbuffer sbuf;
msgpack::pack(&sbuf, nodes_attr);
arc << gid << sbuf;
}
}

Expand Down Expand Up @@ -859,7 +860,9 @@ class ArrowFragmentReporter<
}
}
// archive the start gid and edges attributes array.
arc << gid << adj_list;
msgpack::sbuffer sbuf;
msgpack::pack(&sbuf, adj_list);
arc << gid << sbuf;
}
}

Expand Down
19 changes: 7 additions & 12 deletions python/graphscope/nx/classes/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,9 +113,8 @@ def get_pred_attr(self, n):
def align_node_attr_cache(self):
"""Check and align the node attr cache with node id cache"""
if self.enable_iter_cache and self.node_attr_align is False:
f = self.futures["node_attr"]
if f is not None:
start_gid, self.node_attr_cache = f.result()
if self.futures["node_attr"] is not None:
start_gid, self.node_attr_cache = self.futures["node_attr"].result()
if start_gid == self.iter_pre_gid:
# align to current node_id_cache
if self.iter_gid != self.iter_pre_gid:
Expand All @@ -129,8 +128,7 @@ def align_node_attr_cache(self):
def align_succ_cache(self):
"""Check and align the succ neighbor cache with node id cache"""
if self.enable_iter_cache and self.succ_align is False:
f = self.futures["succ"]
start_gid, self.succ_cache = f.result()
start_gid, self.succ_cache = self.futures["succ"].result()
if start_gid == self.iter_pre_gid:
if self.iter_gid != self.iter_pre_gid:
self._async_fetch_succ_cache(self.iter_gid)
Expand All @@ -143,9 +141,8 @@ def align_succ_cache(self):
def align_succ_attr_cache(self):
"""Check and align the succ neighbor attr cache with node id cache"""
if self.enable_iter_cache and self.succ_attr_align is False:
f = self.futures["succ_attr"]
if f is not None:
start_gid, self.succ_attr_cache = f.result()
if self.futures["succ_attr"] is not None:
start_gid, self.succ_attr_cache = self.futures["succ_attr"].result()
if start_gid == self.iter_pre_gid:
if self.iter_gid != self.iter_pre_gid:
self._async_fetch_succ_attr_cache(self.iter_gid)
Expand All @@ -160,8 +157,7 @@ def align_pred_cache(self):
if self.enable_iter_cache and self.pred_align is False:
if self.futures["pred"] is None:
self._async_fetch_pred_cache(self.iter_pre_gid)
f = self.futures["pred"]
start_gid, self.pred_cache = f.result()
start_gid, self.pred_cache = self.futures["pred"].result()
if start_gid == self.iter_pre_gid:
if self.iter_gid != self.iter_pre_gid:
self._async_fetch_pred_cache(self.iter_gid)
Expand All @@ -177,8 +173,7 @@ def align_pred_attr_cache(self):
if self.enable_iter_cache and self.pred_attr_align is False:
if self.futures["pred_attr"] is None:
self._async_fetch_pred_attr_cache(self.iter_pre_gid)
f = self.futures["pred_attr"]
start_gid, self.pred_attr_cache = f.result()
start_gid, self.pred_attr_cache = self.futures["pred_attr"].result()
if start_gid == self.iter_pre_gid:
if self.iter_gid != self.iter_pre_gid:
self._async_fetch_pred_attr_cache(self.iter_gid)
Expand Down
6 changes: 3 additions & 3 deletions python/graphscope/nx/tests/classes/test_graphviews.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@

@pytest.mark.usefixtures("graphscope_session")
class TestReverseView(test_gvs.TestReverseView):
def setup(self):
def setup_method(self):
self.G = nx.path_graph(9, create_using=nx.DiGraph())
self.rv = self.G.reverse(copy=False)
# self.rv = nx.reverse_view(self.G)
Expand Down Expand Up @@ -67,7 +67,7 @@ def test_pickle(self):

@pytest.mark.usefixtures("graphscope_session")
class TestToDirected(test_gvs.TestToDirected):
def setup(self):
def setup_method(self):
self.G = nx.path_graph(9)
self.dv = nx.to_directed(self.G)

Expand All @@ -87,7 +87,7 @@ def test_iter(self):

@pytest.mark.usefixtures("graphscope_session")
class TestToUndirected(test_gvs.TestToUndirected):
def setup(self):
def setup_method(self):
self.DG = nx.path_graph(9, create_using=nx.DiGraph())
self.uv = nx.to_undirected(self.DG)

Expand Down
2 changes: 1 addition & 1 deletion python/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ gremlinpython==3.6.3rc1
grpcio>=1.49
grpcio-tools>=1.49
kubernetes>=24.2.0
msgpack
msgpack>=1.0.5
mypy-protobuf>=3.4.0
nest_asyncio
networkx==2.8.0;python_version>="3.8"
Expand Down
1 change: 1 addition & 0 deletions python/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ def parsed_package_data():
return {
"graphscope": [
"VERSION",
"proto/*.pyi",
],
}

Expand Down

0 comments on commit d8f1aef

Please sign in to comment.