From 2e0fc70f38e1011ad0ba46333beb1bb826669bd2 Mon Sep 17 00:00:00 2001 From: liulx20 Date: Wed, 19 Jun 2024 18:14:36 +0800 Subject: [PATCH] fix --- flex/bin/bulk_loader.cc | 21 ++- flex/storages/rt_mutable_graph/csr/csr_base.h | 5 +- .../rt_mutable_graph/csr/immutable_csr.h | 75 ++++++++-- .../rt_mutable_graph/csr/mutable_csr.h | 107 +++++++++++++-- flex/storages/rt_mutable_graph/dual_csr.h | 44 ++++-- .../loader/abstract_arrow_fragment_loader.cc | 4 - .../loader/abstract_arrow_fragment_loader.h | 129 +++++++----------- .../loader/basic_fragment_loader.h | 39 ++++-- .../loader/csv_fragment_loader.cc | 4 +- .../loader/csv_fragment_loader.h | 6 +- .../rt_mutable_graph/loader/load_utils.h | 79 ----------- .../rt_mutable_graph/loader/loader_factory.cc | 6 +- .../rt_mutable_graph/loader/loader_factory.h | 4 +- .../loader/odps_fragment_loader.cc | 4 +- .../loader/odps_fragment_loader.h | 6 +- flex/tests/hqps/admin_http_test.cc | 30 ---- flex/utils/arrow_utils.cc | 4 - flex/utils/arrow_utils.h | 20 --- flex/utils/id_indexer.h | 1 - flex/utils/mmap_array.h | 10 +- flex/utils/mmap_vector.h | 38 ++---- flex/utils/property/column.h | 8 +- 22 files changed, 329 insertions(+), 315 deletions(-) delete mode 100644 flex/storages/rt_mutable_graph/loader/load_utils.h diff --git a/flex/bin/bulk_loader.cc b/flex/bin/bulk_loader.cc index cb0e369afbdb..d462b5e97440 100644 --- a/flex/bin/bulk_loader.cc +++ b/flex/bin/bulk_loader.cc @@ -47,19 +47,19 @@ void signal_handler(int signal) { int main(int argc, char** argv) { bpo::options_description desc("Usage:"); /** - * When reading the edges of a graph, there are two stages involved. + * When loading the edges of a graph, there are two stages involved. * * The first stage involves reading the edges into a temporary vector and * acquiring information on the degrees of the vertices, * Then constructs the CSR using the degree information. * * During the first stage, the edges are stored in the form of triplets, which - * can lead to a certain amount of memory expansion, so the `use_mmap_vector` + * can lead to a certain amount of memory expansion, so the `use-mmap-vector` * option is provided, mmap_vector utilizes mmap to map files, supporting * runtime memory swapping to disk. * - * Constructing the CSR involves random reads and writes,we offer the - * `batch_init_in_memory` option, which allows CSR to be built in-memory to + * Constructing the CSR involves random reads and writes, we offer the + * `build-csr-in-mem` option, which allows CSR to be built in-memory to * avoid extensive disk random read and write operations * */ @@ -70,7 +70,7 @@ int main(int argc, char** argv) { "data-path,d", bpo::value(), "data directory path")( "graph-config,g", bpo::value(), "graph schema config file")( "bulk-load,l", bpo::value(), "bulk-load config file")( - "memory-batch-init,m", bpo::value(), "batch init in memory")( + "build-csr-in-mem,m", bpo::value(), "build csr in memory")( "use-mmap-vector", bpo::value(), "use mmap vector"); google::InitGoogleLogging(argv[0]); @@ -110,11 +110,10 @@ int main(int argc, char** argv) { return -1; } bulk_load_config_path = vm["bulk-load"].as(); - bool batch_init_in_memory = false; - if (vm.count("memory-batch-init")) { - batch_init_in_memory = vm["memory-batch-init"].as(); - LOG(INFO) << "batch init in memory: " - << static_cast(batch_init_in_memory); + bool build_csr_in_mem = false; + if (vm.count("build-csr-in-mem")) { + build_csr_in_mem = vm["build-csr-in-mem"].as(); + LOG(INFO) << "batch init in memory: " << static_cast(build_csr_in_mem); } bool use_mmap_vector = false; @@ -165,7 +164,7 @@ int main(int argc, char** argv) { auto loader = gs::LoaderFactory::CreateFragmentLoader( data_dir_path.string(), schema_res.value(), loading_config_res.value(), - parallelism, batch_init_in_memory, use_mmap_vector); + parallelism, build_csr_in_mem, use_mmap_vector); loader->LoadFragment(); t += grape::GetCurrentTime(); diff --git a/flex/storages/rt_mutable_graph/csr/csr_base.h b/flex/storages/rt_mutable_graph/csr/csr_base.h index dc359ea8151a..67db79ff9c34 100644 --- a/flex/storages/rt_mutable_graph/csr/csr_base.h +++ b/flex/storages/rt_mutable_graph/csr/csr_base.h @@ -65,8 +65,11 @@ class CsrBase { virtual size_t batch_init(const std::string& name, const std::string& work_dir, const std::vector& degree, - bool batch_init_in_memory = false, double reserve_ratio = 1.2) = 0; + + virtual size_t batch_init_in_memory(const std::vector& degree, + double reserve_ratio = 1.2) = 0; + virtual void batch_sort_by_edge_data(timestamp_t ts) { LOG(FATAL) << "not supported..."; } diff --git a/flex/storages/rt_mutable_graph/csr/immutable_csr.h b/flex/storages/rt_mutable_graph/csr/immutable_csr.h index 526df3bbb516..3aa3b77cca39 100644 --- a/flex/storages/rt_mutable_graph/csr/immutable_csr.h +++ b/flex/storages/rt_mutable_graph/csr/immutable_csr.h @@ -56,10 +56,44 @@ class ImmutableCsr : public TypedImmutableCsrBase { using slice_t = ImmutableNbrSlice; size_t batch_init(const std::string& name, const std::string& work_dir, - const std::vector& degree, bool batch_init_in_memory, + const std::vector& degree, bool build_csr_in_mem, double reserve_ratio) override { size_t vnum = degree.size(); - adj_lists_.open(work_dir + "/" + name + ".adj", !batch_init_in_memory); + adj_lists_.open(work_dir + "/" + name + ".adj", true); + adj_lists_.resize(vnum); + + size_t edge_num = 0; + for (auto d : degree) { + edge_num += d; + } + + nbr_list_.open(work_dir + "/" + name + ".nbr", true); + nbr_list_.resize(edge_num); + + degree_list_.open(work_dir + "/" + name + ".deg", true); + degree_list_.resize(vnum); + + nbr_t* ptr = nbr_list_.data(); + for (vid_t i = 0; i < vnum; ++i) { + int deg = degree[i]; + if (deg != 0) { + adj_lists_[i] = ptr; + } else { + adj_lists_[i] = NULL; + } + ptr += deg; + + degree_list_[i] = 0; + } + + unsorted_since_ = 0; + return edge_num; + } + + size_t batch_init_in_memory(const std::vector& degree, + double reserve_ratio) override { + size_t vnum = degree.size(); + adj_lists_.reset(); adj_lists_.resize(vnum); size_t edge_num = 0; @@ -67,10 +101,10 @@ class ImmutableCsr : public TypedImmutableCsrBase { edge_num += d; } - nbr_list_.open(work_dir + "/" + name + ".nbr", !batch_init_in_memory); + nbr_list_.reset(); nbr_list_.resize(edge_num); - degree_list_.open(work_dir + "/" + name + ".deg", !batch_init_in_memory); + degree_list_.reset(); degree_list_.resize(vnum); nbr_t* ptr = nbr_list_.data(); @@ -290,10 +324,21 @@ class SingleImmutableCsr : public TypedImmutableCsrBase { ~SingleImmutableCsr() {} size_t batch_init(const std::string& name, const std::string& work_dir, - const std::vector& degree, bool batch_init_in_memory, + const std::vector& degree, double reserve_ratio) override { size_t vnum = degree.size(); - nbr_list_.open(work_dir + "/" + name + ".snbr", !batch_init_in_memory); + nbr_list_.open(work_dir + "/" + name + ".snbr", true); + nbr_list_.resize(vnum); + for (size_t k = 0; k != vnum; ++k) { + nbr_list_[k].neighbor = std::numeric_limits::max(); + } + return vnum; + } + + size_t batch_init_in_memory(const std::vector& degree, + double reserve_ratio) override { + size_t vnum = degree.size(); + nbr_list_.reset(); nbr_list_.resize(vnum); for (size_t k = 0; k != vnum; ++k) { nbr_list_[k].neighbor = std::numeric_limits::max(); @@ -441,8 +486,6 @@ class SingleImmutableCsr : public TypedImmutableCsrBase { const nbr_t& get_edge(vid_t i) const { return nbr_list_[i]; } - void close() override { nbr_list_.reset(); } - private: mmap_array nbr_list_; }; @@ -458,10 +501,21 @@ class SingleImmutableCsr ~SingleImmutableCsr() {} size_t batch_init(const std::string& name, const std::string& work_dir, - const std::vector& degree, bool batch_init_in_memory, + const std::vector& degree, double reserve_ratio) override { size_t vnum = degree.size(); - nbr_list_.open(work_dir + "/" + name + ".snbr", !batch_init_in_memory); + nbr_list_.open(work_dir + "/" + name + ".snbr", true); + nbr_list_.resize(vnum); + for (size_t k = 0; k != vnum; ++k) { + nbr_list_[k].neighbor = std::numeric_limits::max(); + } + return vnum; + } + + size_t batch_init_in_memory(const std::vector& degree, + double reserve_ratio) override { + size_t vnum = degree.size(); + nbr_list_.reset(); nbr_list_.resize(vnum); for (size_t k = 0; k != vnum; ++k) { nbr_list_[k].neighbor = std::numeric_limits::max(); @@ -613,6 +667,7 @@ class SingleImmutableCsr return nbr; } void close() override { nbr_list_.reset(); } + private: StringColumn& column_; mmap_array nbr_list_; diff --git a/flex/storages/rt_mutable_graph/csr/mutable_csr.h b/flex/storages/rt_mutable_graph/csr/mutable_csr.h index 1e8d4c7c3284..dc502cd95335 100644 --- a/flex/storages/rt_mutable_graph/csr/mutable_csr.h +++ b/flex/storages/rt_mutable_graph/csr/mutable_csr.h @@ -149,11 +149,39 @@ class MutableCsr : public TypedMutableCsrBase { } size_t batch_init(const std::string& name, const std::string& work_dir, - const std::vector& degree, bool batch_init_in_memory, + const std::vector& degree, double reserve_ratio) override { reserve_ratio = std::max(reserve_ratio, 1.0); size_t vnum = degree.size(); - adj_lists_.open(work_dir + "/" + name + ".adj", !batch_init_in_memory); + adj_lists_.open(work_dir + "/" + name + ".adj", true); + adj_lists_.resize(vnum); + + locks_ = new grape::SpinLock[vnum]; + + size_t edge_num = 0; + for (auto d : degree) { + edge_num += (std::ceil(d * reserve_ratio)); + } + nbr_list_.open(work_dir + "/" + name + ".nbr", true); + nbr_list_.resize(edge_num); + + nbr_t* ptr = nbr_list_.data(); + for (vid_t i = 0; i < vnum; ++i) { + int deg = degree[i]; + int cap = std::ceil(deg * reserve_ratio); + adj_lists_[i].init(ptr, cap, 0); + ptr += cap; + } + + unsorted_since_ = 0; + return edge_num; + } + + size_t batch_init_in_memory(const std::vector& degree, + double reserve_ratio) override { + reserve_ratio = std::max(reserve_ratio, 1.0); + size_t vnum = degree.size(); + adj_lists_.reset(); adj_lists_.resize(vnum); locks_ = new grape::SpinLock[vnum]; @@ -162,7 +190,7 @@ class MutableCsr : public TypedMutableCsrBase { for (auto d : degree) { edge_num += (std::ceil(d * reserve_ratio)); } - nbr_list_.open(work_dir + "/" + name + ".nbr", !batch_init_in_memory); + nbr_list_.reset(); nbr_list_.resize(edge_num); nbr_t* ptr = nbr_list_.data(); @@ -474,10 +502,10 @@ class MutableCsr } size_t batch_init(const std::string& name, const std::string& work_dir, - const std::vector& degree, bool batch_init_in_memory, + const std::vector& degree, double reserve_ratio) override { size_t vnum = degree.size(); - adj_lists_.open(work_dir + "/" + name + ".adj", !batch_init_in_memory); + adj_lists_.open(work_dir + "/" + name + ".adj", true); adj_lists_.resize(vnum); locks_ = new grape::SpinLock[vnum]; @@ -486,7 +514,31 @@ class MutableCsr for (auto d : degree) { edge_num += d; } - nbr_list_.open(work_dir + "/" + name + ".nbr", !batch_init_in_memory); + nbr_list_.open(work_dir + "/" + name + ".nbr", true); + nbr_list_.resize(edge_num); + + nbr_t* ptr = nbr_list_.data(); + for (vid_t i = 0; i < vnum; ++i) { + int deg = degree[i]; + adj_lists_[i].init(ptr, deg, 0); + ptr += deg; + } + return edge_num; + } + + size_t batch_init_in_memory(const std::vector& degree, + double reserve) override { + size_t vnum = degree.size(); + adj_lists_.reset(); + adj_lists_.resize(vnum); + + locks_ = new grape::SpinLock[vnum]; + + size_t edge_num = 0; + for (auto d : degree) { + edge_num += d; + } + nbr_list_.reset(); nbr_list_.resize(edge_num); nbr_t* ptr = nbr_list_.data(); @@ -686,10 +738,21 @@ class SingleMutableCsr : public TypedMutableCsrBase { ~SingleMutableCsr() {} size_t batch_init(const std::string& name, const std::string& work_dir, - const std::vector& degree, bool batch_init_in_memory, + const std::vector& degree, double reserve_ratio) override { size_t vnum = degree.size(); - nbr_list_.open(work_dir + "/" + name + ".snbr", !batch_init_in_memory); + nbr_list_.open(work_dir + "/" + name + ".snbr", true); + nbr_list_.resize(vnum); + for (size_t k = 0; k != vnum; ++k) { + nbr_list_[k].timestamp.store(std::numeric_limits::max()); + } + return vnum; + } + + size_t batch_init_in_memory(const std::vector& degree, + double reserve_ratio) override { + size_t vnum = degree.size(); + nbr_list_.reset(); nbr_list_.resize(vnum); for (size_t k = 0; k != vnum; ++k) { nbr_list_[k].timestamp.store(std::numeric_limits::max()); @@ -868,10 +931,21 @@ class SingleMutableCsr ~SingleMutableCsr() {} size_t batch_init(const std::string& name, const std::string& work_dir, - const std::vector& degree, bool batch_init_in_memory, + const std::vector& degree, double reserve_ratio) override { size_t vnum = degree.size(); - nbr_list_.open(work_dir + "/" + name + ".snbr", !batch_init_in_memory); + nbr_list_.open(work_dir + "/" + name + ".snbr", true); + nbr_list_.resize(vnum); + for (size_t k = 0; k != vnum; ++k) { + nbr_list_[k].timestamp.store(std::numeric_limits::max()); + } + return vnum; + } + + size_t batch_init_in_memory(const std::vector& degree, + double reserve_ratio) override { + size_t vnum = degree.size(); + nbr_list_.reset(); nbr_list_.resize(vnum); for (size_t k = 0; k != vnum; ++k) { nbr_list_[k].timestamp.store(std::numeric_limits::max()); @@ -1040,11 +1114,16 @@ class EmptyCsr : public TypedMutableCsrBase { ~EmptyCsr() = default; size_t batch_init(const std::string& name, const std::string& work_dir, - const std::vector& degree, bool batch_init_in_memory, + const std::vector& degree, double reserve_ratio) override { return 0; } + size_t batch_init_in_memory(const std::vector& degree, + double reserve_ratio) override { + return 0; + } + void open(const std::string& name, const std::string& snapshot_dir, const std::string& work_dir) override {} @@ -1100,10 +1179,14 @@ class EmptyCsr ~EmptyCsr() = default; size_t batch_init(const std::string& name, const std::string& work_dir, - const std::vector& degree, bool batch_init_in_memory, + const std::vector& degree, double reserve_ratio) override { return 0; } + size_t batch_init_in_memory(const std::vector& degree, + double reserve_ratio) override { + return 0; + } void open(const std::string& name, const std::string& snapshot_dir, const std::string& work_dir) override {} diff --git a/flex/storages/rt_mutable_graph/dual_csr.h b/flex/storages/rt_mutable_graph/dual_csr.h index 6e2fbc70b24a..74dc5578b4e8 100644 --- a/flex/storages/rt_mutable_graph/dual_csr.h +++ b/flex/storages/rt_mutable_graph/dual_csr.h @@ -33,8 +33,13 @@ class DualCsrBase { const std::string& edata_name, const std::string& work_dir, const std::vector& oe_degree, - const std::vector& ie_degree, - bool batch_init_in_memory) = 0; + const std::vector& ie_degree) = 0; + + virtual void BatchInitInMemory(const std::string& edata_name, + const std::string& work_dir, + const std::vector& oe_degree, + const std::vector& ie_degree) = 0; + virtual void Open(const std::string& oe_name, const std::string& ie_name, const std::string& edata_name, const std::string& snapshot_dir, @@ -119,10 +124,17 @@ class DualCsr : public DualCsrBase { void BatchInit(const std::string& oe_name, const std::string& ie_name, const std::string& edata_name, const std::string& work_dir, const std::vector& oe_degree, - const std::vector& ie_degree, - bool batch_init_in_memory) override { - in_csr_->batch_init(ie_name, work_dir, ie_degree, batch_init_in_memory); - out_csr_->batch_init(oe_name, work_dir, oe_degree, batch_init_in_memory); + const std::vector& ie_degree) override { + in_csr_->batch_init(ie_name, work_dir, ie_degree); + out_csr_->batch_init(oe_name, work_dir, oe_degree); + } + + void BatchInitInMemory(const std::string& edata_name, + const std::string& work_dir, + const std::vector& oe_degree, + const std::vector& ie_degree) override { + in_csr_->batch_init_in_memory(ie_degree); + out_csr_->batch_init_in_memory(oe_degree); } void Open(const std::string& oe_name, const std::string& ie_name, @@ -251,12 +263,20 @@ class DualCsr : public DualCsrBase { void BatchInit(const std::string& oe_name, const std::string& ie_name, const std::string& edata_name, const std::string& work_dir, const std::vector& oe_degree, - const std::vector& ie_degree, - bool batch_init_in_memory) override { - size_t ie_num = - in_csr_->batch_init(ie_name, work_dir, ie_degree, batch_init_in_memory); - size_t oe_num = out_csr_->batch_init(oe_name, work_dir, oe_degree, - batch_init_in_memory); + const std::vector& ie_degree) override { + size_t ie_num = in_csr_->batch_init(ie_name, work_dir, ie_degree); + size_t oe_num = out_csr_->batch_init(oe_name, work_dir, oe_degree); + column_.open(edata_name, "", work_dir); + column_.resize(std::max(ie_num, oe_num)); + column_idx_.store(0); + } + + void BatchInitInMemory(const std::string& edata_name, + const std::string& work_dir, + const std::vector& oe_degree, + const std::vector& ie_degree) { + size_t ie_num = in_csr_->batch_init_in_memory(ie_degree); + size_t oe_num = out_csr_->batch_init_in_memory(oe_degree); column_.open(edata_name, "", work_dir); column_.resize(std::max(ie_num, oe_num)); column_idx_.store(0); diff --git a/flex/storages/rt_mutable_graph/loader/abstract_arrow_fragment_loader.cc b/flex/storages/rt_mutable_graph/loader/abstract_arrow_fragment_loader.cc index 9b9b3ba04163..415a8cd23688 100644 --- a/flex/storages/rt_mutable_graph/loader/abstract_arrow_fragment_loader.cc +++ b/flex/storages/rt_mutable_graph/loader/abstract_arrow_fragment_loader.cc @@ -82,10 +82,6 @@ void set_vertex_properties(gs::ColumnBase* col, // TODO(zhanglei): reduce the dummy code here with a template function. if (col_type == PropertyType::kBool) { set_single_vertex_column(col, array, vids); - } else if (col_type == PropertyType::kUInt8) { - set_single_vertex_column(col, array, vids); - } else if (col_type == PropertyType::kUInt16) { - set_single_vertex_column(col, array, vids); } else if (col_type == PropertyType::kInt64) { set_single_vertex_column(col, array, vids); } else if (col_type == PropertyType::kInt32) { diff --git a/flex/storages/rt_mutable_graph/loader/abstract_arrow_fragment_loader.h b/flex/storages/rt_mutable_graph/loader/abstract_arrow_fragment_loader.h index 1231407a50d2..8edce97fe2aa 100644 --- a/flex/storages/rt_mutable_graph/loader/abstract_arrow_fragment_loader.h +++ b/flex/storages/rt_mutable_graph/loader/abstract_arrow_fragment_loader.h @@ -19,10 +19,10 @@ #include "flex/storages/rt_mutable_graph/loader/basic_fragment_loader.h" #include "flex/storages/rt_mutable_graph/loader/i_fragment_loader.h" -#include "flex/storages/rt_mutable_graph/loader/load_utils.h" #include "flex/storages/rt_mutable_graph/loading_config.h" #include "flex/storages/rt_mutable_graph/mutable_property_fragment.h" #include "flex/utils/mmap_vector.h" +#include "grape/utils/concurrent_queue.h" #include #include @@ -90,8 +90,7 @@ template struct _add_vertex { #ifndef USE_PTHASH void operator()(const std::shared_ptr& col, - IdIndexer& indexer, std::vector& vids, - std::mutex& mtx) { + IdIndexer& indexer, std::vector& vids) { size_t row_num = col->length(); vid_t vid; if constexpr (!std::is_same::value) { @@ -104,7 +103,6 @@ struct _add_vertex { << col->type()->ToString(); } auto casted_array = std::static_pointer_cast(col); - std::unique_lock lock(mtx); for (size_t i = 0; i < row_num; ++i) { if (!indexer.add(casted_array->Value(i), vid)) { VLOG(2) << "Duplicate vertex id: " << casted_array->Value(i) << ".."; @@ -116,7 +114,6 @@ struct _add_vertex { } else { if (col->type()->Equals(arrow::utf8())) { auto casted_array = std::static_pointer_cast(col); - std::unique_lock lock(mtx); for (size_t i = 0; i < row_num; ++i) { auto str = casted_array->GetView(i); std::string_view str_view(str.data(), str.size()); @@ -130,7 +127,6 @@ struct _add_vertex { } else if (col->type()->Equals(arrow::large_utf8())) { auto casted_array = std::static_pointer_cast(col); - std::unique_lock lock(mtx); for (size_t i = 0; i < row_num; ++i) { auto str = casted_array->GetView(i); std::string_view str_view(str.data(), str.size()); @@ -149,7 +145,7 @@ struct _add_vertex { #else void operator()(const std::shared_ptr& col, - PTIndexerBuilder& indexer, std::mutex& mtx) { + PTIndexerBuilder& indexer) { size_t row_num = col->length(); if constexpr (!std::is_same::value) { // for non-string value @@ -161,14 +157,12 @@ struct _add_vertex { << col->type()->ToString(); } auto casted_array = std::static_pointer_cast(col); - std::unique_lock lock(mtx); for (size_t i = 0; i < row_num; ++i) { indexer.add_vertex(casted_array->Value(i)); } } else { if (col->type()->Equals(arrow::utf8())) { auto casted_array = std::static_pointer_cast(col); - std::unique_lock lock(mtx); for (size_t i = 0; i < row_num; ++i) { auto str = casted_array->GetView(i); std::string_view str_view(str.data(), str.size()); @@ -177,7 +171,6 @@ struct _add_vertex { } else if (col->type()->Equals(arrow::large_utf8())) { auto casted_array = std::static_pointer_cast(col); - std::unique_lock lock(mtx); for (size_t i = 0; i < row_num; ++i) { auto str = casted_array->GetView(i); std::string_view str_view(str.data(), str.size()); @@ -333,12 +326,12 @@ class AbstractArrowFragmentLoader : public IFragmentLoader { public: AbstractArrowFragmentLoader(const std::string& work_dir, const Schema& schema, const LoadingConfig& loading_config, - int32_t thread_num, bool batch_init_in_memory, + int32_t thread_num, bool build_csr_in_mem, bool use_mmap_vector) : loading_config_(loading_config), schema_(schema), thread_num_(thread_num), - batch_init_in_memory_(batch_init_in_memory), + build_csr_in_mem_(build_csr_in_mem), use_mmap_vector_(use_mmap_vector), basic_fragment_loader_(schema_, work_dir) { vertex_label_num_ = schema_.vertex_label_num(); @@ -383,9 +376,10 @@ class AbstractArrowFragmentLoader : public IFragmentLoader { std::vector vids; vids.reserve(row_num); - - _add_vertex()(primary_key_col, indexer, vids, mtxs_[v_label_id]); - + { + std::unique_lock lock(mtxs_[v_label_id]); + _add_vertex()(primary_key_col, indexer, vids); + } for (size_t j = 0; j < property_cols.size(); ++j) { auto array = property_cols[j]; auto chunked_array = std::make_shared(array); @@ -412,7 +406,8 @@ class AbstractArrowFragmentLoader : public IFragmentLoader { size_t primary_key_ind = std::get<2>(primary_key); IdIndexer indexer; - ConsumerQueue> queue; + grape::BlockingQueue> queue; + queue.SetLimit(1024); std::vector work_threads; for (auto& v_file : v_files) { @@ -421,18 +416,7 @@ class AbstractArrowFragmentLoader : public IFragmentLoader { auto record_batch_supplier_vec = supplier_creator(v_label_id, v_file, loading_config_, std::thread::hardware_concurrency()); - std::atomic finish_reads(0); - work_threads.emplace_back([&]() { - while (true) { - if (finish_reads.load() == - static_cast(record_batch_supplier_vec.size()) && - queue.size() == 0) { - queue.finish(); - break; - } - std::this_thread::sleep_for(std::chrono::seconds(1)); - } - }); + queue.SetProducerNum(record_batch_supplier_vec.size()); for (size_t idx = 0; idx < record_batch_supplier_vec.size(); ++idx) { work_threads.emplace_back( @@ -442,7 +426,7 @@ class AbstractArrowFragmentLoader : public IFragmentLoader { while (true) { auto batch = record_batch_supplier->GetNextBatch(); if (!batch) { - finish_reads++; + queue.DecProducerNum(); break; } if (first_batch) { @@ -455,7 +439,7 @@ class AbstractArrowFragmentLoader : public IFragmentLoader { << schema_column_names.size() + 1; first_batch = false; } - queue.push(batch); + queue.Put(batch); } }, idx); @@ -468,10 +452,14 @@ class AbstractArrowFragmentLoader : public IFragmentLoader { work_threads.emplace_back( [&](int i) { while (true) { - auto batch = queue.pop(); - if (!batch) { + std::shared_ptr batch{nullptr}; + auto ret = queue.Get(batch); + if (!ret) { break; } + if (!batch) { + LOG(FATAL) << "get nullptr batch"; + } auto columns = batch->columns(); CHECK(primary_key_ind < columns.size()); auto primary_key_column = columns[primary_key_ind]; @@ -513,7 +501,8 @@ class AbstractArrowFragmentLoader : public IFragmentLoader { auto primary_key = schema_.get_vertex_primary_key(v_label_id)[0]; auto primary_key_name = std::get<1>(primary_key); size_t primary_key_ind = std::get<2>(primary_key); - ConsumerQueue> queue; + grape::BlockingQueue> queue; + queue.SetLimit(1024); PTIndexerBuilder indexer_builder; std::vector>> batchs( std::thread::hardware_concurrency()); @@ -524,19 +513,7 @@ class AbstractArrowFragmentLoader : public IFragmentLoader { auto record_batch_supplier_vec = supplier_creator(v_label_id, v_file, loading_config_, std::thread::hardware_concurrency()); - std::atomic finish_reads(0); - work_threads.emplace_back([&]() { - while (true) { - if (finish_reads.load() == - static_cast(record_batch_supplier_vec.size()) && - queue.size() == 0) { - queue.finish(); - break; - } - std::this_thread::sleep_for(std::chrono::seconds(1)); - } - }); - + queue.SetProducerNum(record_batch_supplier_vec.size()); for (size_t idx = 0; idx < record_batch_supplier_vec.size(); ++idx) { work_threads.emplace_back( [&](int i) { @@ -545,7 +522,7 @@ class AbstractArrowFragmentLoader : public IFragmentLoader { while (true) { auto batch = record_batch_supplier->GetNextBatch(); if (!batch) { - finish_reads++; + queue.DecProducerNum(); break; } if (first_batch) { @@ -558,7 +535,7 @@ class AbstractArrowFragmentLoader : public IFragmentLoader { << schema_column_names.size() + 1; first_batch = false; } - queue.push(batch); + queue.Put(batch); } }, idx); @@ -568,16 +545,22 @@ class AbstractArrowFragmentLoader : public IFragmentLoader { work_threads.emplace_back( [&](int i) { while (true) { - auto batch = queue.pop(); - if (!batch) { + std::shared_ptr batch{nullptr}; + auto ret = queue.Get(batch); + if (!ret) { break; } + if (!batch) { + LOG(FATAL) << "get nullptr batch"; + } batchs[i].emplace_back(batch); auto columns = batch->columns(); CHECK(primary_key_ind < columns.size()); auto primary_key_column = columns[primary_key_ind]; - _add_vertex()(primary_key_column, indexer_builder, - mtxs_[v_label_id]); + { + std::unique_lock lock(mtxs_[v_label_id]); + _add_vertex()(primary_key_column, indexer_builder); + } } }, idx); @@ -761,7 +744,6 @@ class AbstractArrowFragmentLoader : public IFragmentLoader { } else { parsed_edges_vec.resize(std::thread::hardware_concurrency()); } - std::vector> ie_degree(dst_indexer.size()), oe_degree(src_indexer.size()); for (size_t idx = 0; idx < ie_degree.size(); ++idx) { @@ -773,33 +755,23 @@ class AbstractArrowFragmentLoader : public IFragmentLoader { VLOG(10) << "src indexer size: " << src_indexer.size() << " dst indexer size: " << dst_indexer.size(); - ConsumerQueue> queue; + grape::BlockingQueue> queue; + queue.SetLimit(1024); std::vector work_threads; std::vector>> string_columns( std::thread::hardware_concurrency()); - // use a dummy vector to store the string columns, to avoid the strings - // being released as record batch is released. + // use a dummy vector to store the string columns, to avoid the + // strings being released as record batch is released. std::vector> string_cols; for (auto filename : e_files) { - std::atomic finish_readers(0); auto record_batch_supplier_vec = supplier_creator(src_label_id, dst_label_id, e_label_id, filename, loading_config_, parsed_edges_vec.size()); - work_threads.emplace_back([&]() { - while (true) { - if (finish_readers.load() == - static_cast(record_batch_supplier_vec.size()) && - queue.size() == 0) { - queue.finish(); - break; - } - std::this_thread::sleep_for(std::chrono::seconds(1)); - } - }); + queue.SetProducerNum(record_batch_supplier_vec.size()); for (size_t i = 0; i < record_batch_supplier_vec.size(); ++i) { work_threads.emplace_back( @@ -810,7 +782,7 @@ class AbstractArrowFragmentLoader : public IFragmentLoader { while (true) { auto record_batch = record_batch_supplier->GetNextBatch(); if (!record_batch) { - finish_readers++; + queue.DecProducerNum(); break; } if (first_batch) { @@ -832,7 +804,7 @@ class AbstractArrowFragmentLoader : public IFragmentLoader { } } - queue.push(record_batch); + queue.Put(record_batch); } }, i); @@ -847,14 +819,17 @@ class AbstractArrowFragmentLoader : public IFragmentLoader { // copy the table to csr. auto& parsed_edges = parsed_edges_vec[idx]; while (true) { - auto record_batch = queue.pop(); - - if (!record_batch) { + std::shared_ptr record_batch{nullptr}; + auto ret = queue.Get(record_batch); + if (!ret) { break; } + if (!record_batch) { + LOG(FATAL) << "get nullptr batch"; + } auto columns = record_batch->columns(); - // We assume the src_col and dst_col will always be put at - // front. + // We assume the src_col and dst_col will always be put + // at front. CHECK(columns.size() >= 2); auto src_col = columns[0]; auto dst_col = columns[1]; @@ -921,7 +896,7 @@ class AbstractArrowFragmentLoader : public IFragmentLoader { basic_fragment_loader_.PutEdges( src_label_id, dst_label_id, e_label_id, parsed_edges_vec, ie_deg, - oe_deg, batch_init_in_memory_); + oe_deg, build_csr_in_mem_); string_columns.clear(); size_t sum = 0; for (const auto& edges : parsed_edges_vec) { @@ -936,7 +911,7 @@ class AbstractArrowFragmentLoader : public IFragmentLoader { size_t vertex_label_num_, edge_label_num_; int32_t thread_num_; std::mutex* mtxs_; - bool batch_init_in_memory_; + bool build_csr_in_mem_; bool use_mmap_vector_; mutable BasicFragmentLoader basic_fragment_loader_; }; diff --git a/flex/storages/rt_mutable_graph/loader/basic_fragment_loader.h b/flex/storages/rt_mutable_graph/loader/basic_fragment_loader.h index 2377841e80ed..86d34efecd00 100644 --- a/flex/storages/rt_mutable_graph/loader/basic_fragment_loader.h +++ b/flex/storages/rt_mutable_graph/loader/basic_fragment_loader.h @@ -141,15 +141,14 @@ class BasicFragmentLoader { oe_prefix(src_label_name, dst_label_name, edge_label_name), ie_prefix(src_label_name, dst_label_name, edge_label_name), edata_prefix(src_label_name, dst_label_name, edge_label_name), - tmp_dir(work_dir_), {}, {}, false); + tmp_dir(work_dir_), {}, {}); } template void PutEdges(label_t src_label_id, label_t dst_label_id, label_t edge_label_id, const std::vector& edges_vec, const std::vector& ie_degree, - const std::vector& oe_degree, - bool batch_init_in_memory) { + const std::vector& oe_degree, bool build_csr_in_mem) { size_t index = src_label_id * vertex_label_num_ * edge_label_num_ + dst_label_id * edge_label_num_ + edge_label_id; auto& src_indexer = lf_indexers_[src_label_id]; @@ -175,11 +174,17 @@ class BasicFragmentLoader { oe_[index] = dual_csr_list_[index]->GetOutCsr(); CHECK(ie_degree.size() == dst_indexer.size()); CHECK(oe_degree.size() == src_indexer.size()); - dual_csr->BatchInit( - oe_prefix(src_label_name, dst_label_name, edge_label_name), - ie_prefix(src_label_name, dst_label_name, edge_label_name), - edata_prefix(src_label_name, dst_label_name, edge_label_name), - tmp_dir(work_dir_), oe_degree, ie_degree, batch_init_in_memory); + if (build_csr_in_mem) { + dual_csr->BatchInitInMemory( + edata_prefix(src_label_name, dst_label_name, edge_label_name), + tmp_dir(work_dir_), oe_degree, ie_degree); + } else { + dual_csr->BatchInit( + oe_prefix(src_label_name, dst_label_name, edge_label_name), + ie_prefix(src_label_name, dst_label_name, edge_label_name), + edata_prefix(src_label_name, dst_label_name, edge_label_name), + tmp_dir(work_dir_), oe_degree, ie_degree); + } std::vector work_threads; for (size_t i = 0; i < edges_vec.size(); ++i) { work_threads.emplace_back( @@ -227,11 +232,19 @@ class BasicFragmentLoader { oe_[index] = dual_csr_list_[index]->GetOutCsr(); CHECK(ie_degree.size() == dst_indexer.size()); CHECK(oe_degree.size() == src_indexer.size()); - dual_csr->BatchInit( - oe_prefix(src_label_name, dst_label_name, edge_label_name), - ie_prefix(src_label_name, dst_label_name, edge_label_name), - edata_prefix(src_label_name, dst_label_name, edge_label_name), - tmp_dir(work_dir_), oe_degree, ie_degree, batch_init_in_memory); + + if (build_csr_in_mem) { + dual_csr->BatchInitInMemory( + edata_prefix(src_label_name, dst_label_name, edge_label_name), + tmp_dir(work_dir_), oe_degree, ie_degree); + } else { + dual_csr->BatchInit( + oe_prefix(src_label_name, dst_label_name, edge_label_name), + ie_prefix(src_label_name, dst_label_name, edge_label_name), + edata_prefix(src_label_name, dst_label_name, edge_label_name), + tmp_dir(work_dir_), oe_degree, ie_degree); + } + std::vector work_threads; for (size_t i = 0; i < edges_vec.size(); ++i) { work_threads.emplace_back( diff --git a/flex/storages/rt_mutable_graph/loader/csv_fragment_loader.cc b/flex/storages/rt_mutable_graph/loader/csv_fragment_loader.cc index 5f02e56b1f53..6e7d0e4873eb 100644 --- a/flex/storages/rt_mutable_graph/loader/csv_fragment_loader.cc +++ b/flex/storages/rt_mutable_graph/loader/csv_fragment_loader.cc @@ -231,10 +231,10 @@ static void put_column_names_option(const LoadingConfig& loading_config, std::shared_ptr CSVFragmentLoader::Make( const std::string& work_dir, const Schema& schema, const LoadingConfig& loading_config, int32_t thread_num, - bool batch_init_in_memory, bool use_mmap_vector) { + bool build_csr_in_mem, bool use_mmap_vector) { return std::shared_ptr( new CSVFragmentLoader(work_dir, schema, loading_config, thread_num, - batch_init_in_memory, use_mmap_vector)); + build_csr_in_mem, use_mmap_vector)); } void CSVFragmentLoader::addVertices(label_t v_label_id, diff --git a/flex/storages/rt_mutable_graph/loader/csv_fragment_loader.h b/flex/storages/rt_mutable_graph/loader/csv_fragment_loader.h index be925cd9a464..0127cb150095 100644 --- a/flex/storages/rt_mutable_graph/loader/csv_fragment_loader.h +++ b/flex/storages/rt_mutable_graph/loader/csv_fragment_loader.h @@ -67,15 +67,15 @@ class CSVFragmentLoader : public AbstractArrowFragmentLoader { public: CSVFragmentLoader(const std::string& work_dir, const Schema& schema, const LoadingConfig& loading_config, int32_t thread_num, - bool batch_init_in_memory, bool use_mmap_vector) + bool build_csr_in_mem, bool use_mmap_vector) : AbstractArrowFragmentLoader(work_dir, schema, loading_config, - thread_num, batch_init_in_memory, + thread_num, build_csr_in_mem, use_mmap_vector) {} static std::shared_ptr Make( const std::string& work_dir, const Schema& schema, const LoadingConfig& loading_config, int32_t thread_num, - bool batch_init_in_memory, bool use_mmap_vector); + bool build_csr_in_mem, bool use_mmap_vector); ~CSVFragmentLoader() {} diff --git a/flex/storages/rt_mutable_graph/loader/load_utils.h b/flex/storages/rt_mutable_graph/loader/load_utils.h deleted file mode 100644 index 7ee31dd450a9..000000000000 --- a/flex/storages/rt_mutable_graph/loader/load_utils.h +++ /dev/null @@ -1,79 +0,0 @@ -/** Copyright 2020 Alibaba Group Holding Limited. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#ifndef STORAGES_RT_MUTABLE_GRAPH_LOADER_LOADER_UTILS_H_ -#define STORAGES_RT_MUTABLE_GRAPH_LOADER_LOADER_UTILS_H_ -#include -#include -#include - -// A simple queue which stores the record batches, for consuming. -template -struct ConsumerQueue { - public: - ConsumerQueue(int32_t max_length = 2048) - : max_length_(max_length), finished_(false) {} - - void push(const EDATA_T& data) { - std::unique_lock lock(mutex_); - VLOG(10) << "Try Pushing item to queue, size: " << queue_.size() - << ", max_length_: " << max_length_; - full_cv_.wait(lock, - [this] { return queue_.size() < max_length_ || finished_; }); - if (finished_) { - return; - } - VLOG(10) << "Pushing item to queue, size: " << queue_.size() - << ", max_length_: " << max_length_; - queue_.push(data); - empty_cv_.notify_one(); - } - - EDATA_T pop() { - std::unique_lock lock(mutex_); - VLOG(10) << "Try Popping item from queue, size: " << queue_.size() - << ", max_length_: " << max_length_; - empty_cv_.wait(lock, [this] { return !queue_.empty() || finished_; }); - if (queue_.empty()) { - return nullptr; - } - VLOG(10) << "Popping item from queue, size: " << queue_.size() - << ", max_length_: " << max_length_; - auto data = queue_.front(); - queue_.pop(); - full_cv_.notify_one(); - return data; - } - - size_t size() const { - std::unique_lock lock(mutex_); - return queue_.size(); - } - - void finish() { - std::unique_lock lock(mutex_); - finished_ = true; - empty_cv_.notify_all(); - full_cv_.notify_all(); - } - - private: - std::queue queue_; - mutable std::mutex mutex_; - std::condition_variable full_cv_; - std::condition_variable empty_cv_; - size_t max_length_; - bool finished_; -}; -#endif // STORAGES_RT_MUTABLE_GRAPH_LOADER_LOADER_UTILS_H_ diff --git a/flex/storages/rt_mutable_graph/loader/loader_factory.cc b/flex/storages/rt_mutable_graph/loader/loader_factory.cc index 279b05f48bef..9d257309cc94 100644 --- a/flex/storages/rt_mutable_graph/loader/loader_factory.cc +++ b/flex/storages/rt_mutable_graph/loader/loader_factory.cc @@ -46,8 +46,8 @@ void LoaderFactory::Finalize() {} std::shared_ptr LoaderFactory::CreateFragmentLoader( const std::string& work_dir, const Schema& schema, - const LoadingConfig& loading_config, int thread_num, - bool batch_init_in_memory, bool use_mmap_vector) { + const LoadingConfig& loading_config, int thread_num, bool build_csr_in_mem, + bool use_mmap_vector) { auto scheme = loading_config.GetScheme(); auto format = loading_config.GetFormat(); auto key = scheme + format; @@ -55,7 +55,7 @@ std::shared_ptr LoaderFactory::CreateFragmentLoader( auto iter = known_loaders_.find(key); if (iter != known_loaders_.end()) { return iter->second(work_dir, schema, loading_config, thread_num, - batch_init_in_memory, use_mmap_vector); + build_csr_in_mem, use_mmap_vector); } else { LOG(FATAL) << "Unsupported format: " << format; } diff --git a/flex/storages/rt_mutable_graph/loader/loader_factory.h b/flex/storages/rt_mutable_graph/loader/loader_factory.h index 4dc8db2dc3f1..95034482a58a 100644 --- a/flex/storages/rt_mutable_graph/loader/loader_factory.h +++ b/flex/storages/rt_mutable_graph/loader/loader_factory.h @@ -31,7 +31,7 @@ class LoaderFactory { using loader_initializer_t = std::shared_ptr (*)( const std::string& work_dir, const Schema& schema, const LoadingConfig& loading_config, int thread_num, - bool batch_init_in_memory, bool use_mmap_vector); + bool build_csr_in_mem, bool use_mmap_vector); static void Init(); @@ -40,7 +40,7 @@ class LoaderFactory { static std::shared_ptr CreateFragmentLoader( const std::string& work_dir, const Schema& schema, const LoadingConfig& loading_config, int thread_num, - bool batch_init_in_memory, bool use_mmap_vector); + bool build_csr_in_mem, bool use_mmap_vector); static bool Register(const std::string& scheme_type, const std::string& format_type, diff --git a/flex/storages/rt_mutable_graph/loader/odps_fragment_loader.cc b/flex/storages/rt_mutable_graph/loader/odps_fragment_loader.cc index 42d913cd856f..18c1a73baf82 100644 --- a/flex/storages/rt_mutable_graph/loader/odps_fragment_loader.cc +++ b/flex/storages/rt_mutable_graph/loader/odps_fragment_loader.cc @@ -342,10 +342,10 @@ ODPSTableRecordBatchSupplier::GetNextBatch() { std::shared_ptr ODPSFragmentLoader::Make( const std::string& work_dir, const Schema& schema, const LoadingConfig& loading_config, int32_t thread_num, - bool batch_init_in_memory, bool use_mmap_vector) { + bool build_csr_in_mem, bool use_mmap_vector) { return std::shared_ptr( new ODPSFragmentLoader(work_dir, schema, loading_config, thread_num, - batch_init_in_memory, use_mmap_vector)); + build_csr_in_mem, use_mmap_vector)); } void ODPSFragmentLoader::init() { odps_read_client_.init(); } diff --git a/flex/storages/rt_mutable_graph/loader/odps_fragment_loader.h b/flex/storages/rt_mutable_graph/loader/odps_fragment_loader.h index 305128278df9..8b08b140ed4d 100644 --- a/flex/storages/rt_mutable_graph/loader/odps_fragment_loader.h +++ b/flex/storages/rt_mutable_graph/loader/odps_fragment_loader.h @@ -172,15 +172,15 @@ class ODPSFragmentLoader : public AbstractArrowFragmentLoader { public: ODPSFragmentLoader(const std::string& work_dir, const Schema& schema, const LoadingConfig& loading_config, int32_t thread_num, - bool batch_init_in_memory, bool use_mmap_vector) + bool build_csr_in_mem, bool use_mmap_vector) : AbstractArrowFragmentLoader(work_dir, schema, loading_config, - thread_num, batch_init_in_memory, + thread_num, build_csr_in_mem, use_mmap_vector) {} static std::shared_ptr Make( const std::string& work_dir, const Schema& schema, const LoadingConfig& loading_config, int32_t thread_num, - bool batch_init_in_memory, bool); + bool build_csr_in_mem, bool); ~ODPSFragmentLoader() {} diff --git a/flex/tests/hqps/admin_http_test.cc b/flex/tests/hqps/admin_http_test.cc index 778a8b30d58a..8ea2e4a3895a 100644 --- a/flex/tests/hqps/admin_http_test.cc +++ b/flex/tests/hqps/admin_http_test.cc @@ -238,36 +238,6 @@ gs::GraphId run_graph_tests(httplib::Client& cli, LOG(FATAL) << "Empty response: "; } LOG(INFO) << "load graph response: " << body; - //---4. wait until loading done----------------------- - nlohmann::json job_id_resp = nlohmann::json::parse(body); - if (!job_id_resp.contains("job_id")) { - LOG(FATAL) << "load graph response does not contain job_id: " << body; - } - auto job_id = job_id_resp["job_id"].get(); - while (true) { - res = cli.Get("/v1/job/" + job_id); - if (res->status != 200) { - LOG(FATAL) << "get job status failed: " << res->body; - } - body = res->body; - if (body.empty()) { - LOG(FATAL) << "Empty response: "; - } - LOG(INFO) << "get job status response: " << body; - nlohmann::json job_resp = nlohmann::json::parse(body); - if (!job_resp.contains("status")) { - LOG(FATAL) << "get job status response does not contain status: " << body; - } - auto job_status = job_resp["status"].get(); - if (job_status == "SUCCESS") { - break; - } else if (job_status == "RUNNING") { - std::this_thread::sleep_for(std::chrono::seconds(1)); - } else { - LOG(FATAL) << "job status is not FINISHED or RUNNING: " << body; - } - } - LOG(INFO) << "Bulk loading done"; return graph_id; } diff --git a/flex/utils/arrow_utils.cc b/flex/utils/arrow_utils.cc index a49866f1a854..220f96881f1c 100644 --- a/flex/utils/arrow_utils.cc +++ b/flex/utils/arrow_utils.cc @@ -18,10 +18,6 @@ namespace gs { std::shared_ptr PropertyTypeToArrowType(PropertyType type) { if (type == PropertyType::Bool()) { return arrow::boolean(); - } else if (type == PropertyType::UInt8()) { - return arrow::uint8(); - } else if (type == PropertyType::UInt16()) { - return arrow::uint16(); } else if (type == PropertyType::Int32()) { return arrow::int32(); } else if (type == PropertyType::Int64()) { diff --git a/flex/utils/arrow_utils.h b/flex/utils/arrow_utils.h index 92a5c255217d..e6436e7cad06 100644 --- a/flex/utils/arrow_utils.h +++ b/flex/utils/arrow_utils.h @@ -297,26 +297,6 @@ struct TypeConverter { } }; -template <> -struct TypeConverter { - static PropertyType property_type() { return PropertyType::kUInt8; } - using ArrowType = arrow::UInt8Type; - using ArrowArrayType = arrow::UInt8Array; - static std::shared_ptr ArrowTypeValue() { - return arrow::uint8(); - } -}; - -template <> -struct TypeConverter { - static PropertyType property_type() { return PropertyType::kUInt16; } - using ArrowType = arrow::UInt16Type; - using ArrowArrayType = arrow::UInt16Array; - static std::shared_ptr ArrowTypeValue() { - return arrow::uint16(); - } -}; - std::shared_ptr PropertyTypeToArrowType(PropertyType type); } // namespace gs diff --git a/flex/utils/id_indexer.h b/flex/utils/id_indexer.h index 87977c7c7497..4e15df69f9f1 100644 --- a/flex/utils/id_indexer.h +++ b/flex/utils/id_indexer.h @@ -1008,7 +1008,6 @@ void build_lf_indexer(const IdIndexer& input, residuals.push_back(input.indices_[idx]); } } - for (const auto& lid : residuals) { auto oid = input.keys_[lid]; size_t index = input.hash_policy_.index_for_hash( diff --git a/flex/utils/mmap_array.h b/flex/utils/mmap_array.h index 535e7bb1b6b1..7f915a57b653 100644 --- a/flex/utils/mmap_array.h +++ b/flex/utils/mmap_array.h @@ -98,10 +98,18 @@ class mmap_array { } fd_ = -1; } - filename_ = ""; sync_to_file_ = false; } + void unlink() { + if (filename_ != "" && std::filesystem::exists(filename_)) { + if (std::filesystem::remove(filename_) == 0) { + LOG(FATAL) << "Failed to remove file [ " << filename_ << " ] " + << strerror(errno); + } + } + } + void set_hugepage_prefered(bool val) { hugepage_prefered_ = (val && !sync_to_file_); } diff --git a/flex/utils/mmap_vector.h b/flex/utils/mmap_vector.h index ba382ecf98ee..1fa5deebc564 100644 --- a/flex/utils/mmap_vector.h +++ b/flex/utils/mmap_vector.h @@ -21,49 +21,45 @@ namespace gs { template class mmap_vector { public: - mmap_vector(const std::string& work_dir, const std::string& file_name) - : work_dir_(work_dir), file_name_(file_name) { - array_.open(work_dir + "/" + file_name_, true); + mmap_vector(const std::string& work_dir, const std::string& file_name) { + array_.open(work_dir + "/" + file_name, true); array_.resize(4096); size_ = 0; - cap_ = 4096; } ~mmap_vector() { array_.reset(); - unlink((work_dir_ + "/" + file_name_).c_str()); + array_.unlink(); } mmap_vector(mmap_vector&& other) { array_.swap(other.array_); size_ = other.size_; - cap_ = other.cap_; - file_name_.swap(other.file_name_); - work_dir_.swap(other.work_dir_); } void push_back(const EDATA_T& val) { - if (size_ == cap_) { - array_.resize(cap_ * 2); - cap_ = cap_ * 2; + size_t cap = array_.size(); + if (size_ == cap) { + array_.resize(cap * 2); } array_.set(size_, val); ++size_; } void emplace_back(EDATA_T&& val) { - if (size_ == cap_) { - array_.resize(cap_ * 2); - cap_ = cap_ * 2; + size_t cap = array_.size(); + if (size_ == cap) { + array_.resize(cap * 2); } array_.set(size_, val); ++size_; } void resize(size_t size) { - while (size > cap_) { - cap_ *= 2; - array_.resize(cap_); + size_t cap = array_.size(); + while (size > cap) { + cap *= 2; + array_.resize(cap); } size_ = size; } @@ -75,17 +71,11 @@ class mmap_vector { const EDATA_T* begin() const { return array_.data(); } const EDATA_T* end() const { return array_.data() + size_; } - void clear() { - size_ = 0; - cap_ = 0; - } + void clear() { size_ = 0; } private: mmap_array array_; - std::string work_dir_; - std::string file_name_; size_t size_; - size_t cap_; }; }; // namespace gs #endif // GRAPHSCOPE_UTILS_MMAP_VECTOR_H_ diff --git a/flex/utils/property/column.h b/flex/utils/property/column.h index 85d27e4f765e..67c4e87e29ea 100644 --- a/flex/utils/property/column.h +++ b/flex/utils/property/column.h @@ -18,6 +18,7 @@ #include #include +#include "grape/utils/concurrent_queue.h" #include "flex/utils/mmap_array.h" #include "flex/utils/property/types.h" @@ -504,6 +505,7 @@ class StringMapColumn : public ColumnBase { private: TypedColumn index_col_; LFIndexer* meta_map_; + grape::SpinLock lock_; }; template @@ -547,7 +549,11 @@ void StringMapColumn::set_value(size_t idx, const std::string_view& val) { INDEX_T lid; if (!meta_map_->get_index(val, lid)) { - lid = meta_map_->insert(val); + lock_.lock(); + if (!meta_map_->get_index(val, lid)) { + lid = meta_map_->insert(val); + } + lock_.unlock(); } index_col_.set_value(idx, lid); }