diff --git a/Makefile b/Makefile index e6247173dacc..973ad1753267 100644 --- a/Makefile +++ b/Makefile @@ -186,6 +186,6 @@ clean: rm -fr $(WORKING_DIR)/learning_engine/graph-learn/proto/*.h || true && \ rm -fr $(WORKING_DIR)/learning_engine/graph-learn/proto/*.cc || true && \ rm -fr $(WORKING_DIR)/interactive_engine/executor/target || true && \ - rm -fr $(WORKING_DIR)/interactive_engine/assembly/target || true + rm -fr $(WORKING_DIR)/interactive_engine/assembly/target || true && \ cd $(WORKING_DIR)/python && python3 setup.py clean --all && \ cd $(WORKING_DIR)/coordinator && python3 setup.py clean --all diff --git a/analytical_engine/core/fragment/dynamic_fragment.h b/analytical_engine/core/fragment/dynamic_fragment.h index 51e14e6c7bfd..e5956aec8841 100644 --- a/analytical_engine/core/fragment/dynamic_fragment.h +++ b/analytical_engine/core/fragment/dynamic_fragment.h @@ -36,6 +36,7 @@ #include "core/object/dynamic.h" #include "core/utils/partitioner.h" +#include "core/utils/transform_utils.h" #include "proto/graphscope/proto/types.pb.h" namespace gs { @@ -122,7 +123,7 @@ class DynamicFragment init(fid, directed); load_strategy_ = directed ? grape::LoadStrategy::kBothOutIn - : grape::LoadStrategy::kOnlyIn; + : grape::LoadStrategy::kOnlyOut; ovnum_ = 0; static constexpr vid_t invalid_vid = std::numeric_limits::max(); @@ -162,53 +163,83 @@ class DynamicFragment } } - alive_ivnum_ = ivnum_; - alive_ovnum_ = ovnum_; - iv_alive_.init(ivnum_); - ov_alive_.init(ovnum_); - for (size_t i = 0; i < ivnum_; i++) { - iv_alive_.set_bit(i); - } - for (size_t i = 0; i < ovnum_; i++) { - ov_alive_.set_bit(i); - } - is_selfloops_.init(ivnum_); - - this->inner_vertices_.SetRange(0, ivnum_); - this->outer_vertices_.SetRange(id_parser_.max_local_id() - ovnum_, - id_parser_.max_local_id()); - this->vertices_.SetRange(0, ivnum_, id_parser_.max_local_id() - ovnum_, - id_parser_.max_local_id()); + initVertexMembersOfFragment(); initOuterVerticesOfFragment(); buildCSR(this->InnerVertices(), edges, load_strategy_); ivdata_.clear(); ivdata_.resize(ivnum_, dynamic::Value(rapidjson::kObjectType)); - ovdata_.clear(); - ovdata_.resize(ovnum_); if (sizeof(internal_vertex_t) > sizeof(vid_t)) { for (auto& v : vertices) { vid_t gid = v.vid; if (id_parser_.get_fragment_id(gid) == fid_) { - ivdata_[id_parser_.get_local_id(gid)].Update(v.vdata); - } else { - auto iter = ovg2i_.find(gid); - if (iter != ovg2i_.end()) { - auto index = outerVertexLidToIndex(iter->second); - ovdata_[index] = std::move(v.vdata); - } + ivdata_[id_parser_.get_local_id(gid)] = std::move(v.vdata); } } } } + // Init an empty fragment. void Init(fid_t fid, bool directed) { std::vector empty_vertices; std::vector empty_edges; Init(fid, directed, empty_vertices, empty_edges); } + // Init fragment from arrow property fragment. + void Init(fid_t fid, bool directed, + std::vector>& vertices, + std::vector>& edges, + std::vector& oe_degree, std::vector& ie_degree, + uint32_t thread_num) { + init(fid, directed); + load_strategy_ = directed ? grape::LoadStrategy::kBothOutIn + : grape::LoadStrategy::kOnlyOut; + + ovnum_ = 0; + if (load_strategy_ == grape::LoadStrategy::kOnlyOut) { + for (auto& vec : edges) { + for (auto& e : vec) { + if (!IsInnerVertexGid(e.dst)) { + parseOrAddOuterVertexGid(e.dst); + } + } + } + } else if (load_strategy_ == grape::LoadStrategy::kBothOutIn) { + for (auto& vec : edges) { + for (auto& e : vec) { + if (IsInnerVertexGid(e.src)) { + if (!IsInnerVertexGid(e.dst)) { + parseOrAddOuterVertexGid(e.dst); + } + } else { + parseOrAddOuterVertexGid(e.src); + } + } + } + } + + initVertexMembersOfFragment(); + initOuterVerticesOfFragment(); + + buildCSRParallel(edges, oe_degree, ie_degree, thread_num); + + ivdata_.clear(); + ivdata_.resize(ivnum_); + // process vertices data parallel + if (sizeof(internal_vertex_t) > sizeof(vid_t)) { + parallel_for( + vertices.begin(), vertices.end(), + [&](uint32_t tid, std::vector& vs) { + for (auto& v : vs) { + ivdata_[v.vid] = std::move(v.vdata); + } + }, + thread_num, 1); + } + } + using base_t::Gid2Lid; using base_t::ie_; using base_t::oe_; @@ -321,7 +352,6 @@ class DynamicFragment id_parser_.max_local_id()); } ivdata_.resize(this->ivnum_, dynamic::Value(rapidjson::kObjectType)); - ovdata_.resize(this->ovnum_); iv_alive_.resize(this->ivnum_); ov_alive_.resize(this->ovnum_); alive_ovnum_ = this->ovnum_; @@ -337,7 +367,6 @@ class DynamicFragment } else { if (this->OuterVertexGid2Lid(v.vid, lid)) { auto index = outerVertexLidToIndex(lid); - ovdata_[index] = std::move(v.vdata); if (ov_alive_.get_bit(index) == false) { ov_alive_.set_bit(index); } @@ -349,10 +378,6 @@ class DynamicFragment if (IsInnerVertexGid(v.vid)) { this->InnerVertexGid2Lid(v.vid, lid); ivdata_[lid] = std::move(v.vdata); - } else { - if (this->OuterVertexGid2Lid(v.vid, lid)) { - ovdata_[outerVertexLidToIndex(lid)] = std::move(v.vdata); - } } } } @@ -383,16 +408,13 @@ class DynamicFragment vid_t GetOuterVerticesNum() const { return alive_ovnum_; } inline const vdata_t& GetData(const vertex_t& v) const override { - return IsInnerVertex(v) ? ivdata_[v.GetValue()] - : ovdata_[outerVertexLidToIndex(v.GetValue())]; + CHECK(IsInnerVertex(v)); + return ivdata_[v.GetValue()]; } inline void SetData(const vertex_t& v, const vdata_t& val) override { - if (IsInnerVertex(v)) { - ivdata_[v.GetValue()] = val; - } else { - ovdata_[outerVertexLidToIndex(v.GetValue())] = val; - } + CHECK(IsInnerVertex(v)); + ivdata_[v.GetValue()] = val; } bool OuterVertexGid2Lid(vid_t gid, vid_t& lid) const override { @@ -1347,6 +1369,108 @@ class DynamicFragment } } + void initVertexMembersOfFragment() { + alive_ivnum_ = ivnum_; + alive_ovnum_ = ovnum_; + iv_alive_.init(ivnum_); + ov_alive_.init(ovnum_); + for (size_t i = 0; i < ivnum_; i++) { + iv_alive_.set_bit(i); + } + for (size_t i = 0; i < ovnum_; i++) { + ov_alive_.set_bit(i); + } + is_selfloops_.init(ivnum_); + + this->inner_vertices_.SetRange(0, ivnum_); + this->outer_vertices_.SetRange(id_parser_.max_local_id() - ovnum_, + id_parser_.max_local_id()); + this->vertices_.SetRange(0, ivnum_, id_parser_.max_local_id() - ovnum_, + id_parser_.max_local_id()); + } + + void buildCSRParallel(std::vector>& edges, + const std::vector& oe_degree, + const std::vector& ie_degree, + uint32_t thread_num) { + ie_.reserve_vertices(ivnum_); + oe_.reserve_vertices(ivnum_); + + // parse edges, global id to local id + parallel_for( + edges.begin(), edges.end(), + [&](uint32_t tid, std::vector& es) { + if (load_strategy_ == grape::LoadStrategy::kOnlyOut) { + for (auto& e : es) { + CHECK(InnerVertexGid2Lid(e.src, e.src)); + CHECK(Gid2Lid(e.dst, e.dst)); + } + } else { + for (auto& e : es) { + CHECK(Gid2Lid(e.src, e.src)); + CHECK(Gid2Lid(e.dst, e.dst)); + } + } + }, + thread_num, 1); + + // insert the edges + insertEdgesParallel(edges, oe_degree, ie_degree, thread_num); + } + + void insertEdgesParallel(std::vector>& edges, + const std::vector& oe_degree, + const std::vector& ie_degree, + uint32_t thread_num) { + auto insert_edges_out_in = [&](uint32_t tid, std::vector& es) { + dynamic::Value tmp_data; // avoid to use default allocator on parallel + for (auto& e : es) { + if (e.src < ivnum_) { + if (e.dst < ivnum_) { + tmp_data.CopyFrom(e.edata, (*allocators_)[tid]); + nbr_t nbr(e.dst, std::move(tmp_data)); + oe_.put_edge(e.src, std::move(nbr)); + } else { + // avoid copy + nbr_t nbr(e.dst, std::move(e.edata)); + oe_.put_edge(e.src, std::move(nbr)); + } + } else { + nbr_t nbr(e.src, std::move(e.edata)); + ie_.put_edge(e.dst, std::move(nbr)); + } + } + }; + auto insert_edges_out = [&](uint32_t tid, std::vector& es) { + for (auto& e : es) { + nbr_t nbr(e.dst, std::move(e.edata)); + oe_.put_edge(e.src, std::move(nbr)); + } + }; + + oe_.reserve_edges_dense(oe_degree); + if (load_strategy_ == grape::LoadStrategy::kBothOutIn) { + ie_.reserve_edges_dense(ie_degree); + parallel_for(edges.begin(), edges.end(), insert_edges_out_in, thread_num, + 1); + // The incoming edges may not store in the same thread vector, + // can't be parallel process. + for (auto& vec : edges) { + for (auto& e : vec) { + if (e.src < ivnum_ && e.dst < ivnum_) { + nbr_t nbr(e.src, std::move(e.edata)); + ie_.put_edge(e.dst, std::move(nbr)); + } + } + } + ie_.sort_neighbors_dense(ie_degree); + } else { + parallel_for(edges.begin(), edges.end(), insert_edges_out, thread_num, 1); + } + oe_.sort_neighbors_dense(oe_degree); + } + + private: using base_t::ivnum_; vid_t ovnum_; vid_t alive_ivnum_, alive_ovnum_; @@ -1359,17 +1483,22 @@ class DynamicFragment ska::flat_hash_map ovg2i_; std::vector ovgid_; grape::Array> ivdata_; - grape::Array> ovdata_; grape::Bitset iv_alive_; grape::Bitset ov_alive_; grape::Bitset is_selfloops_; grape::VertexArray iespliter_, oespliter_; + // allocators for parallel convert + std::shared_ptr> allocators_; + using base_t::outer_vertices_of_frag_; template friend class DynamicProjectedFragment; + + template + friend class ArrowToDynamicConverter; }; class DynamicFragmentMutator { @@ -1489,7 +1618,8 @@ class DynamicFragmentMutator { } } else if (modify_type == rpc::NX_UPDATE_EDGES) { if (src_fid == fid || dst_fid == fid) { - mutation.edges_to_update.emplace_back(src_gid, dst_gid, e_data); + mutation.edges_to_update.emplace_back(src_gid, dst_gid, + std::move(e_data)); } } } diff --git a/analytical_engine/core/fragment/fragment_reporter.h b/analytical_engine/core/fragment/fragment_reporter.h index 2d46933bf88b..39cfe07329d6 100644 --- a/analytical_engine/core/fragment/fragment_reporter.h +++ b/analytical_engine/core/fragment/fragment_reporter.h @@ -572,7 +572,6 @@ class ArrowFragmentReporter> void getEdgeData(std::shared_ptr& fragment, label_id_t u_label_id, const oid_t& u_oid, label_id_t v_label_id, const oid_t& v_oid, grape::InArchive& arc) { - dynamic::Value ref_data; vid_t u_gid, v_gid; vertex_t u, v; auto vm_ptr = fragment->GetVertexMap(); @@ -585,7 +584,7 @@ class ArrowFragmentReporter> auto oe = fragment->GetOutgoingAdjList(u, e_label); for (auto& e : oe) { if (v == e.neighbor()) { - ref_data = dynamic::Value(rapidjson::kObjectType); + dynamic::Value ref_data(rapidjson::kObjectType); auto edge_data = fragment->edge_data_table(e_label); PropertyConverter::EdgeValue(edge_data, e.edge_id(), ref_data); diff --git a/analytical_engine/core/loader/arrow_to_dynamic_converter.h b/analytical_engine/core/loader/arrow_to_dynamic_converter.h index 3e29640e5fbd..7b263415e21b 100644 --- a/analytical_engine/core/loader/arrow_to_dynamic_converter.h +++ b/analytical_engine/core/loader/arrow_to_dynamic_converter.h @@ -83,11 +83,14 @@ struct DynamicWrapper { template class ArrowToDynamicConverter { using src_fragment_t = FRAG_T; + using vertex_t = typename src_fragment_t::vertex_t; using oid_t = typename src_fragment_t::oid_t; using label_id_t = typename src_fragment_t::label_id_t; using dst_fragment_t = DynamicFragment; using vertex_map_t = typename dst_fragment_t::vertex_map_t; using vid_t = typename dst_fragment_t::vid_t; + using internal_vertex_t = typename dst_fragment_t::internal_vertex_t; + using edge_t = typename dst_fragment_t::edge_t; using vdata_t = typename dst_fragment_t::vdata_t; using edata_t = typename dst_fragment_t::edata_t; @@ -98,7 +101,11 @@ class ArrowToDynamicConverter { bl::result> Convert( const std::shared_ptr& arrow_frag) { - auto arrow_vm = arrow_frag->GetVertexMap(); + arrow_vm_ptr_ = arrow_frag->GetVertexMap(); + CHECK(arrow_vm_ptr_->fnum() == comm_spec_.fnum()); + arrow_id_parser_.Init(comm_spec_.fnum(), arrow_vm_ptr_->label_num()); + dynamic_id_parser_.init(comm_spec_.fnum()); + BOOST_LEAF_AUTO(dynamic_vm, convertVertexMap(arrow_frag)); BOOST_LEAF_AUTO(dynamic_frag, convertFragment(arrow_frag, dynamic_vm)); return dynamic_frag; @@ -107,32 +114,37 @@ class ArrowToDynamicConverter { private: bl::result> convertVertexMap( const std::shared_ptr& arrow_frag) { - auto src_vm_ptr = arrow_frag->GetVertexMap(); const auto& schema = arrow_frag->schema(); - auto fnum = src_vm_ptr->fnum(); - auto dst_vm_ptr = std::make_shared(comm_spec_); - vineyard::IdParser id_parser; - CHECK(src_vm_ptr->fnum() == comm_spec_.fnum()); + auto dst_vm_ptr = std::make_shared(comm_spec_); dst_vm_ptr->Init(); typename vertex_map_t::partitioner_t partitioner(comm_spec_.fnum()); dst_vm_ptr->SetPartitioner(partitioner); - id_parser.Init(fnum, src_vm_ptr->label_num()); dynamic::Value to_oid; - for (label_id_t v_label = 0; v_label < src_vm_ptr->label_num(); v_label++) { - std::string label_name = schema.GetVertexLabelName(v_label); - for (fid_t fid = 0; fid < fnum; fid++) { - for (vid_t offset = 0; - offset < src_vm_ptr->GetInnerVertexSize(fid, v_label); offset++) { - auto gid = id_parser.GenerateId(fid, v_label, offset); - typename vineyard::InternalType::type oid; - - CHECK(src_vm_ptr->GetOid(gid, oid)); - if (v_label == default_label_id_) { + for (label_id_t v_label = 0; v_label < arrow_vm_ptr_->label_num(); + v_label++) { + if (v_label == default_label_id_) { + for (fid_t fid = 0; fid < comm_spec_.fnum(); fid++) { + for (vid_t offset = 0; + offset < arrow_vm_ptr_->GetInnerVertexSize(fid, v_label); + offset++) { + auto gid = arrow_id_parser_.GenerateId(fid, v_label, offset); + typename vineyard::InternalType::type oid; + CHECK(arrow_vm_ptr_->GetOid(gid, oid)); DynamicWrapper::to_dynamic(oid, to_oid); dst_vm_ptr->AddVertex(std::move(to_oid), gid); - } else { + } + } + } else { + std::string label_name = schema.GetVertexLabelName(v_label); + for (fid_t fid = 0; fid < comm_spec_.fnum(); fid++) { + for (vid_t offset = 0; + offset < arrow_vm_ptr_->GetInnerVertexSize(fid, v_label); + offset++) { + auto gid = arrow_id_parser_.GenerateId(fid, v_label, offset); + typename vineyard::InternalType::type oid; + CHECK(arrow_vm_ptr_->GetOid(gid, oid)); DynamicWrapper::to_dynamic_array(label_name, oid, to_oid); dst_vm_ptr->AddVertex(std::move(to_oid), gid); } @@ -147,93 +159,117 @@ class ArrowToDynamicConverter { const std::shared_ptr& src_frag, const std::shared_ptr& dst_vm) { auto fid = src_frag->fid(); - const auto& schema = src_frag->schema(); - dst_fragment_t::mutation_t mutation; + auto dynamic_frag = std::make_shared(dst_vm); + uint32_t thread_num = + (std::thread::hardware_concurrency() + comm_spec_.local_num() - 1) / + comm_spec_.local_num(); + + // Init allocators for dynamic fragment + dynamic_frag->allocators_ = + std::make_shared>(thread_num); + auto& allocators = dynamic_frag->allocators_; + std::vector> vertices(thread_num); + std::vector> edges(thread_num); + + // we record the degree messages here to avoid fetch these messages in + // dynamic_frag.Init again. + std::vector oe_degree(dst_vm->GetInnerVertexSize(fid), 0); + std::vector ie_degree(dst_vm->GetInnerVertexSize(fid), 0); for (label_id_t v_label = 0; v_label < src_frag->vertex_label_num(); v_label++) { - auto label_name = schema.GetVertexLabelName(v_label); + auto inner_vertices = src_frag->InnerVertices(v_label); auto v_data = src_frag->vertex_data_table(v_label); - dynamic::Value u_oid, v_oid, data; - vid_t u_gid, v_gid; - - // traverse vertices and extract data from ArrowFragment - for (const auto& u : src_frag->InnerVertices(v_label)) { - if (v_label == default_label_id_) { - u_oid = dynamic::Value(src_frag->GetId(u)); - } else { - u_oid = dynamic::Value(rapidjson::kArrayType); - u_oid.PushBack(label_name).PushBack(src_frag->GetId(u)); - } - CHECK(dst_vm->GetGid(fid, u_oid, u_gid)); - data = dynamic::Value(rapidjson::kObjectType); - // N.B: th last column is id, we ignore it. - for (auto col_id = 0; col_id < v_data->num_columns() - 1; col_id++) { - auto column = v_data->column(col_id); - auto prop_key = v_data->field(col_id)->name(); - auto type = column->type(); - PropertyConverter::NodeValue(src_frag, u, type, - prop_key, col_id, data); - } - mutation.vertices_to_add.emplace_back(u_gid, data); - - // traverse edges and extract data - for (label_id_t e_label = 0; e_label < src_frag->edge_label_num(); - e_label++) { - auto oe = src_frag->GetOutgoingAdjList(u, e_label); - auto e_data = src_frag->edge_data_table(e_label); - for (auto& e : oe) { - auto v = e.get_neighbor(); - auto e_id = e.edge_id(); - auto v_label_id = src_frag->vertex_label(v); - if (v_label_id == default_label_id_) { - v_oid = dynamic::Value(src_frag->GetId(v)); - } else { - v_oid = dynamic::Value(rapidjson::kArrayType); - v_oid.PushBack(schema.GetVertexLabelName(v_label_id)) - .PushBack(src_frag->GetId(v)); + parallel_for( + inner_vertices.begin(), inner_vertices.end(), + [&](uint32_t tid, vertex_t u) { + vid_t u_gid = gid2Gid(src_frag->GetInnerVertexGid(u)); + vid_t lid = dynamic_id_parser_.get_local_id(u_gid); + // extract vertex properties + // N.B: th last column is id, we ignore it. + dynamic::Value vertex_data(rapidjson::kObjectType); + for (auto col_id = 0; col_id < v_data->num_columns() - 1; + col_id++) { + auto column = v_data->column(col_id); + auto& prop_key = v_data->field(col_id)->name(); + auto type = column->type(); + PropertyConverter::NodeValue( + src_frag, u, type, prop_key, col_id, vertex_data, + (*allocators)[tid]); } - CHECK(dst_vm->GetGid(v_oid, v_gid)); - data = dynamic::Value(rapidjson::kObjectType); - PropertyConverter::EdgeValue(e_data, e_id, data); - mutation.edges_to_add.emplace_back(u_gid, v_gid, data); - } + vertices[tid].emplace_back(lid, std::move(vertex_data)); - if (src_frag->directed()) { - auto ie = src_frag->GetIncomingAdjList(u, e_label); - for (auto& e : ie) { - auto v = e.get_neighbor(); - if (src_frag->IsOuterVertex(v)) { + // traverse edges and extract edge properties + for (label_id_t e_label = 0; e_label < src_frag->edge_label_num(); + e_label++) { + auto e_data = src_frag->edge_data_table(e_label); + auto oe = src_frag->GetOutgoingAdjList(u, e_label); + oe_degree[lid] += oe.Size(); + for (auto& e : oe) { + auto v = e.get_neighbor(); auto e_id = e.edge_id(); auto v_label_id = src_frag->vertex_label(v); - if (v_label_id == default_label_id_) { - v_oid = dynamic::Value(src_frag->GetId(v)); - } else { - v_oid = dynamic::Value(rapidjson::kArrayType); - v_oid.PushBack(schema.GetVertexLabelName(v_label_id)) - .PushBack(src_frag->GetId(v)); + vid_t v_gid = gid2Gid(src_frag->Vertex2Gid(v)); + dynamic::Value edge_data(rapidjson::kObjectType); + PropertyConverter::EdgeValue( + e_data, e_id, edge_data, (*allocators)[tid]); + edges[tid].emplace_back(u_gid, v_gid, std::move(edge_data)); + } + + if (src_frag->directed()) { + auto ie = src_frag->GetIncomingAdjList(u, e_label); + ie_degree[lid] += ie.Size(); + for (auto& e : ie) { + auto v = e.get_neighbor(); + if (src_frag->IsOuterVertex(v)) { + auto e_id = e.edge_id(); + auto v_label_id = src_frag->vertex_label(v); + vid_t v_gid = gid2Gid(src_frag->GetOuterVertexGid(v)); + dynamic::Value edge_data(rapidjson::kObjectType); + PropertyConverter::EdgeValue( + e_data, e_id, edge_data, (*allocators)[tid]); + edges[tid].emplace_back(v_gid, u_gid, std::move(edge_data)); + } } - CHECK(dst_vm->GetGid(v_oid, v_gid)); - data = dynamic::Value(rapidjson::kObjectType); - PropertyConverter::EdgeValue(e_data, e_id, - data); - mutation.edges_to_add.emplace_back(v_gid, u_gid, data); } } - } - } - } + }, + thread_num); } - auto dynamic_frag = std::make_shared(dst_vm); - dynamic_frag->Init(src_frag->fid(), src_frag->directed()); - dynamic_frag->Mutate(mutation); + dynamic_frag->Init(src_frag->fid(), src_frag->directed(), vertices, edges, + oe_degree, ie_degree, thread_num); + return dynamic_frag; } + /** + * Convert arrow fragment gid of vertex to corresponding dynamic fragment gid. + * In the covertVertexMap process, the insert order of vertex in dynamic + * fragment vertex map is the same as arrow fragment vertex map. + * + * Params: + * - gid: the arrow fragment gid of vertex + * + * Returns: + * The corresponding dynamic fragment gid of the vertex + */ + vid_t gid2Gid(const vid_t gid) const { + auto fid = arrow_id_parser_.GetFid(gid); + auto label_id = arrow_id_parser_.GetLabelId(gid); + auto offset = arrow_id_parser_.GetOffset(gid); + for (label_id_t i = 0; i < label_id; ++i) { + offset += arrow_vm_ptr_->GetInnerVertexSize(fid, i); + } + return dynamic_id_parser_.generate_global_id(fid, offset); + } + grape::CommSpec comm_spec_; label_id_t default_label_id_; + std::shared_ptr arrow_vm_ptr_; + vineyard::IdParser arrow_id_parser_; + grape::IdParser dynamic_id_parser_; }; } // namespace gs diff --git a/analytical_engine/core/object/dynamic.h b/analytical_engine/core/object/dynamic.h index 4d1b9c93b8c5..7813f5461f36 100644 --- a/analytical_engine/core/object/dynamic.h +++ b/analytical_engine/core/object/dynamic.h @@ -26,6 +26,8 @@ #include "rapidjson/stringbuffer.h" #include "rapidjson/writer.h" +#include "grape/serialization/in_archive.h" + namespace gs { namespace dynamic { @@ -57,9 +59,10 @@ class Value : public rapidjson::Value { } // Constructor with move semantics. Value(Value& rhs) { Base::CopyFrom(rhs, allocator_); } - explicit Value(rapidjson::Value& rhs) { Base::CopyFrom(rhs, allocator_); } + explicit Value(rapidjson::Value& rhs) : Base(std::move(rhs)) {} // Move constructor - Value(Value&& rhs) : Base(std::move(rhs)) {} + Value(Value&& rhs) noexcept : Base(std::move(rhs)) {} + explicit Value(rapidjson::Value&& rhs) : Base(std::move(rhs)) {} // Constructor with value type explicit Value(rapidjson::Type type) noexcept : Base(type) {} // Constructor for common type @@ -94,7 +97,11 @@ class Value : public rapidjson::Value { return *this; } Value& operator=(rapidjson::Value&& rhs) noexcept { - Base::operator=(rhs.Move()); + Base::operator=(rhs); + return *this; + } + Value& operator=(rapidjson::Value& rhs) noexcept { + Base::operator=(rhs); return *this; } @@ -130,9 +137,9 @@ class Value : public rapidjson::Value { explicit Value(const std::string& s) : Base(s.c_str(), allocator_) {} explicit Value(const char* s) : Base(s, allocator_) {} - void CopyFrom(const Value& rhs) { + void CopyFrom(const Value& rhs, AllocatorT& allocator = allocator_) { if (this != &rhs) { - Base::CopyFrom(rhs, allocator_); + Base::CopyFrom(rhs, allocator); } } @@ -140,15 +147,17 @@ class Value : public rapidjson::Value { template void Insert(const std::string& key, T&& value) { Value v_(value); - Base::AddMember(Value(key).Move(), v_, allocator_); + Base::AddMember(rapidjson::Value(key, allocator_).Move(), v_, allocator_); } void Insert(const std::string& key, Value& value) { - Base::AddMember(Value(key).Move(), value, allocator_); + Base::AddMember(rapidjson::Value(key, allocator_).Move(), value, + allocator_); } void Insert(const std::string& key, rapidjson::Value& value) { - Base::AddMember(Value(key).Move(), value, allocator_); + Base::AddMember(rapidjson::Value(key, allocator_).Move(), value, + allocator_); } // Update for object @@ -221,7 +230,7 @@ class Value : public rapidjson::Value { }; // Stringify Value to json. -static inline const char* Stringify(const Value& value) { +static inline const char* Stringify(const rapidjson::Value& value) { static rapidjson::StringBuffer buffer; rapidjson::Writer writer(buffer); buffer.Clear(); @@ -230,7 +239,7 @@ static inline const char* Stringify(const Value& value) { } // Parse json to Value. -static inline void Parse(const std::string& str, Value& val) { +static inline void Parse(const std::string& str, rapidjson::Value& val) { // the document d must use the same allocator with other values rapidjson::Document d(&Value::allocator_); d.Parse(str.c_str()); @@ -268,5 +277,41 @@ struct hash<::gs::dynamic::Value> { } // namespace std +namespace grape { +inline grape::InArchive& operator<<(grape::InArchive& archive, + const rapidjson::Value& value) { + if (value.IsInt64()) { + archive << value.GetInt64(); + } else if (value.IsDouble()) { + archive << value.GetDouble(); + } else if (value.IsString()) { + size_t size = value.GetStringLength(); + archive << size; + archive.AddBytes(value.GetString(), size); + } else { + std::string json = gs::dynamic::Stringify(value); + archive << json; + } + return archive; +} + +inline grape::InArchive& operator<<(grape::InArchive& archive, + const gs::dynamic::Value& value) { + if (value.IsInt64()) { + archive << value.GetInt64(); + } else if (value.IsDouble()) { + archive << value.GetDouble(); + } else if (value.IsString()) { + size_t size = value.GetStringLength(); + archive << size; + archive.AddBytes(value.GetString(), size); + } else { + std::string json = gs::dynamic::Stringify(value); + archive << json; + } + return archive; +} +} // namespace grape + #endif // NETWORKX #endif // ANALYTICAL_ENGINE_CORE_OBJECT_DYNAMIC_H_ diff --git a/analytical_engine/core/utils/convert_utils.h b/analytical_engine/core/utils/convert_utils.h index 11c654a28104..26102910ba2b 100644 --- a/analytical_engine/core/utils/convert_utils.h +++ b/analytical_engine/core/utils/convert_utils.h @@ -27,40 +27,56 @@ namespace gs { template struct PropertyConverter { - inline static void NodeValue(const std::shared_ptr& fragment, - const typename FRAGMENT_T::vertex_t& v, - const std::shared_ptr data_type, - const std::string& prop_name, int prop_id, - dynamic::Value& ret) { + inline static void NodeValue( + const std::shared_ptr& fragment, + const typename FRAGMENT_T::vertex_t& v, + const std::shared_ptr data_type, + const std::string& prop_name, int prop_id, rapidjson::Value& ret, + dynamic::AllocatorT& allocator = dynamic::Value::allocator_) { switch (data_type->id()) { case arrow::Type::type::INT32: { - ret.Insert(prop_name, fragment->template GetData(v, prop_id)); + rapidjson::Value v_(fragment->template GetData(v, prop_id)); + ret.AddMember(rapidjson::Value(prop_name, allocator).Move(), v_, + allocator); break; } case arrow::Type::type::INT64: { - ret.Insert(prop_name, fragment->template GetData(v, prop_id)); + rapidjson::Value v_(fragment->template GetData(v, prop_id)); + ret.AddMember(rapidjson::Value(prop_name, allocator).Move(), v_, + allocator); break; } case arrow::Type::type::UINT32: { - ret.Insert(prop_name, fragment->template GetData(v, prop_id)); + rapidjson::Value v_(fragment->template GetData(v, prop_id)); + ret.AddMember(rapidjson::Value(prop_name, allocator).Move(), v_, + allocator); break; } case arrow::Type::type::UINT64: { - ret.Insert(prop_name, fragment->template GetData(v, prop_id)); + rapidjson::Value v_(fragment->template GetData(v, prop_id)); + ret.AddMember(rapidjson::Value(prop_name, allocator).Move(), v_, + allocator); break; } case arrow::Type::type::FLOAT: { - ret.Insert(prop_name, fragment->template GetData(v, prop_id)); + rapidjson::Value v_(fragment->template GetData(v, prop_id)); + ret.AddMember(rapidjson::Value(prop_name, allocator).Move(), v_, + allocator); break; } case arrow::Type::type::DOUBLE: { - ret.Insert(prop_name, fragment->template GetData(v, prop_id)); + rapidjson::Value v_(fragment->template GetData(v, prop_id)); + ret.AddMember(rapidjson::Value(prop_name, allocator).Move(), v_, + allocator); break; } case arrow::Type::type::STRING: case arrow::Type::type::LARGE_STRING: { - ret.Insert(prop_name, - fragment->template GetData(v, prop_id)); + rapidjson::Value v_( + fragment->template GetData(v, prop_id).c_str(), + allocator); + ret.AddMember(rapidjson::Value(prop_name, allocator).Move(), v_, + allocator); break; } default: @@ -69,8 +85,10 @@ struct PropertyConverter { } } - inline static void EdgeValue(const std::shared_ptr& data_table, - int64_t row_id, dynamic::Value& ret) { + inline static void EdgeValue( + const std::shared_ptr& data_table, int64_t row_id, + rapidjson::Value& ret, + dynamic::AllocatorT& allocator = dynamic::Value::allocator_) { for (auto col_id = 0; col_id < data_table->num_columns(); col_id++) { auto column = data_table->column(col_id); auto type = data_table->column(col_id)->type(); @@ -79,43 +97,57 @@ struct PropertyConverter { case arrow::Type::type::INT32: { auto array = std::dynamic_pointer_cast(column->chunk(0)); - ret.Insert(property_name, array->Value(row_id)); + rapidjson::Value v(array->Value(row_id)); + ret.AddMember(rapidjson::Value(property_name, allocator).Move(), v, + allocator); break; } case arrow::Type::type::INT64: { auto array = std::dynamic_pointer_cast(column->chunk(0)); - ret.Insert(property_name, array->Value(row_id)); + rapidjson::Value v(array->Value(row_id)); + ret.AddMember(rapidjson::Value(property_name, allocator).Move(), v, + allocator); break; } case arrow::Type::type::UINT32: { auto array = std::dynamic_pointer_cast(column->chunk(0)); - ret.Insert(property_name, array->Value(row_id)); + rapidjson::Value v(array->Value(row_id)); + ret.AddMember(rapidjson::Value(property_name, allocator).Move(), v, + allocator); break; } case arrow::Type::type::FLOAT: { auto array = std::dynamic_pointer_cast(column->chunk(0)); - ret.Insert(property_name, array->Value(row_id)); + rapidjson::Value v(array->Value(row_id)); + ret.AddMember(rapidjson::Value(property_name, allocator).Move(), v, + allocator); break; } case arrow::Type::type::DOUBLE: { auto array = std::dynamic_pointer_cast(column->chunk(0)); - ret.Insert(property_name, array->Value(row_id)); + rapidjson::Value v(array->Value(row_id)); + ret.AddMember(rapidjson::Value(property_name, allocator).Move(), v, + allocator); break; } case arrow::Type::type::STRING: { auto array = std::dynamic_pointer_cast(column->chunk(0)); - ret.Insert(property_name, array->GetString(row_id)); + rapidjson::Value v(array->GetString(row_id).c_str(), allocator); + ret.AddMember(rapidjson::Value(property_name, allocator).Move(), v, + allocator); break; } case arrow::Type::type::LARGE_STRING: { auto array = std::dynamic_pointer_cast( column->chunk(0)); - ret.Insert(property_name, array->GetString(row_id)); + rapidjson::Value v(array->GetString(row_id).c_str(), allocator); + ret.AddMember(rapidjson::Value(property_name, allocator).Move(), v, + allocator); break; } default: diff --git a/analytical_engine/core/utils/transform_utils.h b/analytical_engine/core/utils/transform_utils.h index 85761399127a..fdcd8ae5efbf 100644 --- a/analytical_engine/core/utils/transform_utils.h +++ b/analytical_engine/core/utils/transform_utils.h @@ -16,6 +16,7 @@ limitations under the License. #ifndef ANALYTICAL_ENGINE_CORE_UTILS_TRANSFORM_UTILS_H_ #define ANALYTICAL_ENGINE_CORE_UTILS_TRANSFORM_UTILS_H_ +#include #include #include #include @@ -26,6 +27,7 @@ limitations under the License. #include "vineyard/basic/ds/dataframe.h" #include "vineyard/basic/ds/tensor.h" +#include "vineyard/graph/fragment/fragment_traits.h" #ifdef NETWORKX #include "core/object/dynamic.h" @@ -33,27 +35,6 @@ limitations under the License. #include "core/context/column.h" #include "core/utils/trait_utils.h" -#ifdef NETWORKX -namespace grape { -inline grape::InArchive& operator<<(grape::InArchive& archive, - const gs::dynamic::Value& value) { - if (value.IsInt64()) { - archive << value.GetInt64(); - } else if (value.IsDouble()) { - archive << value.GetDouble(); - } else if (value.IsString()) { - size_t size = value.GetStringLength(); - archive << size; - archive.AddBytes(value.GetString(), size); - } else { - std::string json = gs::dynamic::Stringify(value); - archive << json; - } - return archive; -} -} // namespace grape -#endif - namespace gs { #ifdef NETWORKX @@ -1085,6 +1066,31 @@ class TransformUtils< const FRAG_T& frag_; }; #endif + +template +void parallel_for(const ITER_T& begin, const ITER_T& end, const FUNC_T& func, + uint32_t thread_num, size_t chunk = 1024) { + std::vector threads(thread_num); + std::atomic cur(0); + for (uint32_t i = 0; i < thread_num; ++i) { + threads[i] = std::thread([&cur, chunk, &func, begin, end, i]() { + while (true) { + const ITER_T cur_beg = std::min(begin + cur.fetch_add(chunk), end); + const ITER_T cur_end = std::min(cur_beg + chunk, end); + if (cur_beg == cur_end) { + break; + } + for (auto iter = cur_beg; iter != cur_end; ++iter) { + func(i, *iter); + } + } + }); + } + for (auto& thrd : threads) { + thrd.join(); + } +} + } // namespace gs #endif // ANALYTICAL_ENGINE_CORE_UTILS_TRANSFORM_UTILS_H_ diff --git a/python/graphscope/framework/dag_utils.py b/python/graphscope/framework/dag_utils.py index 57f834dcbec9..93560a5b354d 100644 --- a/python/graphscope/framework/dag_utils.py +++ b/python/graphscope/framework/dag_utils.py @@ -770,7 +770,7 @@ def unload_graph(graph): if graph.graph_type == graph_def_pb2.ARROW_PROPERTY: inputs.append(graph.op) else: - config[types_pb2.GRAPH_NAME] = (utils.s_to_attr(graph.key),) + config[types_pb2.GRAPH_NAME] = utils.s_to_attr(graph.key) op = Operation( graph.session_id, types_pb2.UNLOAD_GRAPH, diff --git a/python/graphscope/nx/classes/graph.py b/python/graphscope/nx/classes/graph.py index aa8d21ea089c..4d0f0805264d 100644 --- a/python/graphscope/nx/classes/graph.py +++ b/python/graphscope/nx/classes/graph.py @@ -2090,18 +2090,15 @@ def _init_with_arrow_property_graph(self, arrow_property_graph): # check session and direction compatible if arrow_property_graph.session_id != self.session_id: raise NetworkXError( - "Try to init with another session's arrow_property graph." - + "Graphs must be the same session." + "The source graph is not loaded in session {}." % self.session_id ) if arrow_property_graph.is_directed() != self.is_directed(): - raise NetworkXError( - "Try to init with another direction type's arrow_property graph." - + "Graphs must have the same direction type." - ) - if arrow_property_graph._is_multigraph: - raise NetworkXError( - "Graph is multigraph, cannot be converted to networkx graph." - ) + if arrow_property_graph.is_directed(): + msg = "The source graph is a directed graph, can't be used to init nx.Graph. You may use nx.DiGraph" + else: + msg = "The source graph is a undirected graph, can't be used to init nx.DiGraph. You may use nx.Graph" + raise NetworkXError(msg) + self._key = arrow_property_graph.key self._schema = arrow_property_graph.schema if self._default_label is not None: diff --git a/python/graphscope/nx/tests/test_copy_on_write.py b/python/graphscope/nx/tests/test_copy_on_write.py index 6d4a43e07288..60d832a09d7a 100644 --- a/python/graphscope/nx/tests/test_copy_on_write.py +++ b/python/graphscope/nx/tests/test_copy_on_write.py @@ -297,12 +297,8 @@ def setup_method(self): # ).values # ) - def test_error_with_multigraph(self): - with pytest.raises( - NetworkXError, - match="Graph is multigraph, cannot be converted to networkx graph", - ): - MSG = nx.DiGraph(self.multi_simple) + def test_with_multigraph(self): + nx.DiGraph(self.multi_simple) def test_single_source_dijkstra_path_length(self): ret = nx.builtin.single_source_dijkstra_path_length(