Skip to content

Commit

Permalink
Optimize performance of arrow fragment to dynamic fragment with multi…
Browse files Browse the repository at this point in the history
…-threading (#1458)
  • Loading branch information
acezen committed Apr 24, 2022
1 parent daa8649 commit 8d5a029
Show file tree
Hide file tree
Showing 10 changed files with 442 additions and 201 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,6 @@ clean:
rm -fr $(WORKING_DIR)/learning_engine/graph-learn/proto/*.h || true && \
rm -fr $(WORKING_DIR)/learning_engine/graph-learn/proto/*.cc || true && \
rm -fr $(WORKING_DIR)/interactive_engine/executor/target || true && \
rm -fr $(WORKING_DIR)/interactive_engine/assembly/target || true
rm -fr $(WORKING_DIR)/interactive_engine/assembly/target || true && \
cd $(WORKING_DIR)/python && python3 setup.py clean --all && \
cd $(WORKING_DIR)/coordinator && python3 setup.py clean --all
214 changes: 172 additions & 42 deletions analytical_engine/core/fragment/dynamic_fragment.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@

#include "core/object/dynamic.h"
#include "core/utils/partitioner.h"
#include "core/utils/transform_utils.h"
#include "proto/graphscope/proto/types.pb.h"

namespace gs {
Expand Down Expand Up @@ -122,7 +123,7 @@ class DynamicFragment
init(fid, directed);

load_strategy_ = directed ? grape::LoadStrategy::kBothOutIn
: grape::LoadStrategy::kOnlyIn;
: grape::LoadStrategy::kOnlyOut;

ovnum_ = 0;
static constexpr vid_t invalid_vid = std::numeric_limits<vid_t>::max();
Expand Down Expand Up @@ -162,53 +163,83 @@ class DynamicFragment
}
}

alive_ivnum_ = ivnum_;
alive_ovnum_ = ovnum_;
iv_alive_.init(ivnum_);
ov_alive_.init(ovnum_);
for (size_t i = 0; i < ivnum_; i++) {
iv_alive_.set_bit(i);
}
for (size_t i = 0; i < ovnum_; i++) {
ov_alive_.set_bit(i);
}
is_selfloops_.init(ivnum_);

this->inner_vertices_.SetRange(0, ivnum_);
this->outer_vertices_.SetRange(id_parser_.max_local_id() - ovnum_,
id_parser_.max_local_id());
this->vertices_.SetRange(0, ivnum_, id_parser_.max_local_id() - ovnum_,
id_parser_.max_local_id());
initVertexMembersOfFragment();
initOuterVerticesOfFragment();

buildCSR(this->InnerVertices(), edges, load_strategy_);

ivdata_.clear();
ivdata_.resize(ivnum_, dynamic::Value(rapidjson::kObjectType));
ovdata_.clear();
ovdata_.resize(ovnum_);
if (sizeof(internal_vertex_t) > sizeof(vid_t)) {
for (auto& v : vertices) {
vid_t gid = v.vid;
if (id_parser_.get_fragment_id(gid) == fid_) {
ivdata_[id_parser_.get_local_id(gid)].Update(v.vdata);
} else {
auto iter = ovg2i_.find(gid);
if (iter != ovg2i_.end()) {
auto index = outerVertexLidToIndex(iter->second);
ovdata_[index] = std::move(v.vdata);
}
ivdata_[id_parser_.get_local_id(gid)] = std::move(v.vdata);
}
}
}
}

// Init an empty fragment.
void Init(fid_t fid, bool directed) {
std::vector<internal_vertex_t> empty_vertices;
std::vector<edge_t> empty_edges;
Init(fid, directed, empty_vertices, empty_edges);
}

// Init fragment from arrow property fragment.
void Init(fid_t fid, bool directed,
std::vector<std::vector<internal_vertex_t>>& vertices,
std::vector<std::vector<edge_t>>& edges,
std::vector<int>& oe_degree, std::vector<int>& ie_degree,
uint32_t thread_num) {
init(fid, directed);
load_strategy_ = directed ? grape::LoadStrategy::kBothOutIn
: grape::LoadStrategy::kOnlyOut;

ovnum_ = 0;
if (load_strategy_ == grape::LoadStrategy::kOnlyOut) {
for (auto& vec : edges) {
for (auto& e : vec) {
if (!IsInnerVertexGid(e.dst)) {
parseOrAddOuterVertexGid(e.dst);
}
}
}
} else if (load_strategy_ == grape::LoadStrategy::kBothOutIn) {
for (auto& vec : edges) {
for (auto& e : vec) {
if (IsInnerVertexGid(e.src)) {
if (!IsInnerVertexGid(e.dst)) {
parseOrAddOuterVertexGid(e.dst);
}
} else {
parseOrAddOuterVertexGid(e.src);
}
}
}
}

initVertexMembersOfFragment();
initOuterVerticesOfFragment();

buildCSRParallel(edges, oe_degree, ie_degree, thread_num);

ivdata_.clear();
ivdata_.resize(ivnum_);
// process vertices data parallel
if (sizeof(internal_vertex_t) > sizeof(vid_t)) {
parallel_for(
vertices.begin(), vertices.end(),
[&](uint32_t tid, std::vector<internal_vertex_t>& vs) {
for (auto& v : vs) {
ivdata_[v.vid] = std::move(v.vdata);
}
},
thread_num, 1);
}
}

using base_t::Gid2Lid;
using base_t::ie_;
using base_t::oe_;
Expand Down Expand Up @@ -321,7 +352,6 @@ class DynamicFragment
id_parser_.max_local_id());
}
ivdata_.resize(this->ivnum_, dynamic::Value(rapidjson::kObjectType));
ovdata_.resize(this->ovnum_);
iv_alive_.resize(this->ivnum_);
ov_alive_.resize(this->ovnum_);
alive_ovnum_ = this->ovnum_;
Expand All @@ -337,7 +367,6 @@ class DynamicFragment
} else {
if (this->OuterVertexGid2Lid(v.vid, lid)) {
auto index = outerVertexLidToIndex(lid);
ovdata_[index] = std::move(v.vdata);
if (ov_alive_.get_bit(index) == false) {
ov_alive_.set_bit(index);
}
Expand All @@ -349,10 +378,6 @@ class DynamicFragment
if (IsInnerVertexGid(v.vid)) {
this->InnerVertexGid2Lid(v.vid, lid);
ivdata_[lid] = std::move(v.vdata);
} else {
if (this->OuterVertexGid2Lid(v.vid, lid)) {
ovdata_[outerVertexLidToIndex(lid)] = std::move(v.vdata);
}
}
}
}
Expand Down Expand Up @@ -383,16 +408,13 @@ class DynamicFragment
vid_t GetOuterVerticesNum() const { return alive_ovnum_; }

inline const vdata_t& GetData(const vertex_t& v) const override {
return IsInnerVertex(v) ? ivdata_[v.GetValue()]
: ovdata_[outerVertexLidToIndex(v.GetValue())];
CHECK(IsInnerVertex(v));
return ivdata_[v.GetValue()];
}

inline void SetData(const vertex_t& v, const vdata_t& val) override {
if (IsInnerVertex(v)) {
ivdata_[v.GetValue()] = val;
} else {
ovdata_[outerVertexLidToIndex(v.GetValue())] = val;
}
CHECK(IsInnerVertex(v));
ivdata_[v.GetValue()] = val;
}

bool OuterVertexGid2Lid(vid_t gid, vid_t& lid) const override {
Expand Down Expand Up @@ -1347,6 +1369,108 @@ class DynamicFragment
}
}

void initVertexMembersOfFragment() {
alive_ivnum_ = ivnum_;
alive_ovnum_ = ovnum_;
iv_alive_.init(ivnum_);
ov_alive_.init(ovnum_);
for (size_t i = 0; i < ivnum_; i++) {
iv_alive_.set_bit(i);
}
for (size_t i = 0; i < ovnum_; i++) {
ov_alive_.set_bit(i);
}
is_selfloops_.init(ivnum_);

this->inner_vertices_.SetRange(0, ivnum_);
this->outer_vertices_.SetRange(id_parser_.max_local_id() - ovnum_,
id_parser_.max_local_id());
this->vertices_.SetRange(0, ivnum_, id_parser_.max_local_id() - ovnum_,
id_parser_.max_local_id());
}

void buildCSRParallel(std::vector<std::vector<edge_t>>& edges,
const std::vector<int>& oe_degree,
const std::vector<int>& ie_degree,
uint32_t thread_num) {
ie_.reserve_vertices(ivnum_);
oe_.reserve_vertices(ivnum_);

// parse edges, global id to local id
parallel_for(
edges.begin(), edges.end(),
[&](uint32_t tid, std::vector<edge_t>& es) {
if (load_strategy_ == grape::LoadStrategy::kOnlyOut) {
for (auto& e : es) {
CHECK(InnerVertexGid2Lid(e.src, e.src));
CHECK(Gid2Lid(e.dst, e.dst));
}
} else {
for (auto& e : es) {
CHECK(Gid2Lid(e.src, e.src));
CHECK(Gid2Lid(e.dst, e.dst));
}
}
},
thread_num, 1);

// insert the edges
insertEdgesParallel(edges, oe_degree, ie_degree, thread_num);
}

void insertEdgesParallel(std::vector<std::vector<edge_t>>& edges,
const std::vector<int>& oe_degree,
const std::vector<int>& ie_degree,
uint32_t thread_num) {
auto insert_edges_out_in = [&](uint32_t tid, std::vector<edge_t>& es) {
dynamic::Value tmp_data; // avoid to use default allocator on parallel
for (auto& e : es) {
if (e.src < ivnum_) {
if (e.dst < ivnum_) {
tmp_data.CopyFrom(e.edata, (*allocators_)[tid]);
nbr_t nbr(e.dst, std::move(tmp_data));
oe_.put_edge(e.src, std::move(nbr));
} else {
// avoid copy
nbr_t nbr(e.dst, std::move(e.edata));
oe_.put_edge(e.src, std::move(nbr));
}
} else {
nbr_t nbr(e.src, std::move(e.edata));
ie_.put_edge(e.dst, std::move(nbr));
}
}
};
auto insert_edges_out = [&](uint32_t tid, std::vector<edge_t>& es) {
for (auto& e : es) {
nbr_t nbr(e.dst, std::move(e.edata));
oe_.put_edge(e.src, std::move(nbr));
}
};

oe_.reserve_edges_dense(oe_degree);
if (load_strategy_ == grape::LoadStrategy::kBothOutIn) {
ie_.reserve_edges_dense(ie_degree);
parallel_for(edges.begin(), edges.end(), insert_edges_out_in, thread_num,
1);
// The incoming edges may not store in the same thread vector,
// can't be parallel process.
for (auto& vec : edges) {
for (auto& e : vec) {
if (e.src < ivnum_ && e.dst < ivnum_) {
nbr_t nbr(e.src, std::move(e.edata));
ie_.put_edge(e.dst, std::move(nbr));
}
}
}
ie_.sort_neighbors_dense(ie_degree);
} else {
parallel_for(edges.begin(), edges.end(), insert_edges_out, thread_num, 1);
}
oe_.sort_neighbors_dense(oe_degree);
}

private:
using base_t::ivnum_;
vid_t ovnum_;
vid_t alive_ivnum_, alive_ovnum_;
Expand All @@ -1359,17 +1483,22 @@ class DynamicFragment
ska::flat_hash_map<vid_t, vid_t> ovg2i_;
std::vector<vid_t> ovgid_;
grape::Array<vdata_t, grape::Allocator<vdata_t>> ivdata_;
grape::Array<vdata_t, grape::Allocator<vdata_t>> ovdata_;
grape::Bitset iv_alive_;
grape::Bitset ov_alive_;
grape::Bitset is_selfloops_;

grape::VertexArray<inner_vertices_t, nbr_t*> iespliter_, oespliter_;

// allocators for parallel convert
std::shared_ptr<std::vector<dynamic::AllocatorT>> allocators_;

using base_t::outer_vertices_of_frag_;

template <typename _vdata_t, typename _edata_t>
friend class DynamicProjectedFragment;

template <typename FRAG_T>
friend class ArrowToDynamicConverter;
};

class DynamicFragmentMutator {
Expand Down Expand Up @@ -1489,7 +1618,8 @@ class DynamicFragmentMutator {
}
} else if (modify_type == rpc::NX_UPDATE_EDGES) {
if (src_fid == fid || dst_fid == fid) {
mutation.edges_to_update.emplace_back(src_gid, dst_gid, e_data);
mutation.edges_to_update.emplace_back(src_gid, dst_gid,
std::move(e_data));
}
}
}
Expand Down
3 changes: 1 addition & 2 deletions analytical_engine/core/fragment/fragment_reporter.h
Original file line number Diff line number Diff line change
Expand Up @@ -572,7 +572,6 @@ class ArrowFragmentReporter<vineyard::ArrowFragment<OID_T, VID_T>>
void getEdgeData(std::shared_ptr<fragment_t>& fragment, label_id_t u_label_id,
const oid_t& u_oid, label_id_t v_label_id,
const oid_t& v_oid, grape::InArchive& arc) {
dynamic::Value ref_data;
vid_t u_gid, v_gid;
vertex_t u, v;
auto vm_ptr = fragment->GetVertexMap();
Expand All @@ -585,7 +584,7 @@ class ArrowFragmentReporter<vineyard::ArrowFragment<OID_T, VID_T>>
auto oe = fragment->GetOutgoingAdjList(u, e_label);
for (auto& e : oe) {
if (v == e.neighbor()) {
ref_data = dynamic::Value(rapidjson::kObjectType);
dynamic::Value ref_data(rapidjson::kObjectType);
auto edge_data = fragment->edge_data_table(e_label);
PropertyConverter<fragment_t>::EdgeValue(edge_data, e.edge_id(),
ref_data);
Expand Down

0 comments on commit 8d5a029

Please sign in to comment.