From d8f1aef4d75244a90e933d5c53d86bb40a4388c1 Mon Sep 17 00:00:00 2001 From: Tao He Date: Fri, 9 Jun 2023 09:09:36 +0800 Subject: [PATCH] Fixes the inconsistent usage of msgpack/json in graph reporter Signed-off-by: Tao He --- .../core/fragment/dynamic_fragment.h | 4 ++-- .../core/fragment/fragment_reporter.h | 9 ++++++--- python/graphscope/nx/classes/cache.py | 19 +++++++------------ .../nx/tests/classes/test_graphviews.py | 6 +++--- python/requirements.txt | 2 +- python/setup.py | 1 + 6 files changed, 20 insertions(+), 21 deletions(-) diff --git a/analytical_engine/core/fragment/dynamic_fragment.h b/analytical_engine/core/fragment/dynamic_fragment.h index 5cea4fe47686..1fc009665707 100644 --- a/analytical_engine/core/fragment/dynamic_fragment.h +++ b/analytical_engine/core/fragment/dynamic_fragment.h @@ -1529,7 +1529,7 @@ class DynamicFragmentMutator { v_fid = partitioner.GetPartitionId(oid); if (modify_type == rpc::NX_ADD_NODES) { vm_ptr_->AddVertex(std::move(oid), gid); - if (!v_data.Empty()) { + if (v_data.IsObject() && !v_data.GetObject().ObjectEmpty()) { for (const auto& prop : v_data.GetObject()) { if (!fragment_->schema_["vertex"].HasMember(prop.name)) { dynamic::Value key(prop.name); @@ -1606,7 +1606,7 @@ class DynamicFragmentMutator { std::move(empty_data)); } } - if (!e_data.Empty()) { + if (e_data.IsObject() && !e_data.GetObject().ObjectEmpty()) { for (const auto& prop : e_data.GetObject()) { if (!fragment_->schema_["edge"].HasMember(prop.name)) { dynamic::Value key(prop.name); diff --git a/analytical_engine/core/fragment/fragment_reporter.h b/analytical_engine/core/fragment/fragment_reporter.h index ad3f78156f1c..d055d6d0cf2d 100644 --- a/analytical_engine/core/fragment/fragment_reporter.h +++ b/analytical_engine/core/fragment/fragment_reporter.h @@ -105,7 +105,6 @@ class DynamicFragmentReporter : public grape::Communicator { } break; } - case rpc::HAS_EDGE: { BOOST_LEAF_AUTO(edge_in_json, params.Get(rpc::EDGE)); dynamic::Value edge; @@ -761,7 +760,9 @@ class ArrowFragmentReporter< } } // archive the start gid and nodes attribute array. - arc << gid << nodes_attr; + msgpack::sbuffer sbuf; + msgpack::pack(&sbuf, nodes_attr); + arc << gid << sbuf; } } @@ -859,7 +860,9 @@ class ArrowFragmentReporter< } } // archive the start gid and edges attributes array. - arc << gid << adj_list; + msgpack::sbuffer sbuf; + msgpack::pack(&sbuf, adj_list); + arc << gid << sbuf; } } diff --git a/python/graphscope/nx/classes/cache.py b/python/graphscope/nx/classes/cache.py index d6012ee4bc58..4028be8f654b 100644 --- a/python/graphscope/nx/classes/cache.py +++ b/python/graphscope/nx/classes/cache.py @@ -113,9 +113,8 @@ def get_pred_attr(self, n): def align_node_attr_cache(self): """Check and align the node attr cache with node id cache""" if self.enable_iter_cache and self.node_attr_align is False: - f = self.futures["node_attr"] - if f is not None: - start_gid, self.node_attr_cache = f.result() + if self.futures["node_attr"] is not None: + start_gid, self.node_attr_cache = self.futures["node_attr"].result() if start_gid == self.iter_pre_gid: # align to current node_id_cache if self.iter_gid != self.iter_pre_gid: @@ -129,8 +128,7 @@ def align_node_attr_cache(self): def align_succ_cache(self): """Check and align the succ neighbor cache with node id cache""" if self.enable_iter_cache and self.succ_align is False: - f = self.futures["succ"] - start_gid, self.succ_cache = f.result() + start_gid, self.succ_cache = self.futures["succ"].result() if start_gid == self.iter_pre_gid: if self.iter_gid != self.iter_pre_gid: self._async_fetch_succ_cache(self.iter_gid) @@ -143,9 +141,8 @@ def align_succ_cache(self): def align_succ_attr_cache(self): """Check and align the succ neighbor attr cache with node id cache""" if self.enable_iter_cache and self.succ_attr_align is False: - f = self.futures["succ_attr"] - if f is not None: - start_gid, self.succ_attr_cache = f.result() + if self.futures["succ_attr"] is not None: + start_gid, self.succ_attr_cache = self.futures["succ_attr"].result() if start_gid == self.iter_pre_gid: if self.iter_gid != self.iter_pre_gid: self._async_fetch_succ_attr_cache(self.iter_gid) @@ -160,8 +157,7 @@ def align_pred_cache(self): if self.enable_iter_cache and self.pred_align is False: if self.futures["pred"] is None: self._async_fetch_pred_cache(self.iter_pre_gid) - f = self.futures["pred"] - start_gid, self.pred_cache = f.result() + start_gid, self.pred_cache = self.futures["pred"].result() if start_gid == self.iter_pre_gid: if self.iter_gid != self.iter_pre_gid: self._async_fetch_pred_cache(self.iter_gid) @@ -177,8 +173,7 @@ def align_pred_attr_cache(self): if self.enable_iter_cache and self.pred_attr_align is False: if self.futures["pred_attr"] is None: self._async_fetch_pred_attr_cache(self.iter_pre_gid) - f = self.futures["pred_attr"] - start_gid, self.pred_attr_cache = f.result() + start_gid, self.pred_attr_cache = self.futures["pred_attr"].result() if start_gid == self.iter_pre_gid: if self.iter_gid != self.iter_pre_gid: self._async_fetch_pred_attr_cache(self.iter_gid) diff --git a/python/graphscope/nx/tests/classes/test_graphviews.py b/python/graphscope/nx/tests/classes/test_graphviews.py index 0aaffadda3c8..455154ab093a 100644 --- a/python/graphscope/nx/tests/classes/test_graphviews.py +++ b/python/graphscope/nx/tests/classes/test_graphviews.py @@ -29,7 +29,7 @@ @pytest.mark.usefixtures("graphscope_session") class TestReverseView(test_gvs.TestReverseView): - def setup(self): + def setup_method(self): self.G = nx.path_graph(9, create_using=nx.DiGraph()) self.rv = self.G.reverse(copy=False) # self.rv = nx.reverse_view(self.G) @@ -67,7 +67,7 @@ def test_pickle(self): @pytest.mark.usefixtures("graphscope_session") class TestToDirected(test_gvs.TestToDirected): - def setup(self): + def setup_method(self): self.G = nx.path_graph(9) self.dv = nx.to_directed(self.G) @@ -87,7 +87,7 @@ def test_iter(self): @pytest.mark.usefixtures("graphscope_session") class TestToUndirected(test_gvs.TestToUndirected): - def setup(self): + def setup_method(self): self.DG = nx.path_graph(9, create_using=nx.DiGraph()) self.uv = nx.to_undirected(self.DG) diff --git a/python/requirements.txt b/python/requirements.txt index c25860bc1950..04bbb677676b 100644 --- a/python/requirements.txt +++ b/python/requirements.txt @@ -3,7 +3,7 @@ gremlinpython==3.6.3rc1 grpcio>=1.49 grpcio-tools>=1.49 kubernetes>=24.2.0 -msgpack +msgpack>=1.0.5 mypy-protobuf>=3.4.0 nest_asyncio networkx==2.8.0;python_version>="3.8" diff --git a/python/setup.py b/python/setup.py index 7a06946a86e1..827beca0d973 100644 --- a/python/setup.py +++ b/python/setup.py @@ -193,6 +193,7 @@ def parsed_package_data(): return { "graphscope": [ "VERSION", + "proto/*.pyi", ], }