Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
liulx20 committed Jun 19, 2024
1 parent dc04cb2 commit 2e0fc70
Show file tree
Hide file tree
Showing 22 changed files with 329 additions and 315 deletions.
21 changes: 10 additions & 11 deletions flex/bin/bulk_loader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -47,19 +47,19 @@ void signal_handler(int signal) {
int main(int argc, char** argv) {
bpo::options_description desc("Usage:");
/**
* When reading the edges of a graph, there are two stages involved.
* When loading the edges of a graph, there are two stages involved.
*
* The first stage involves reading the edges into a temporary vector and
* acquiring information on the degrees of the vertices,
* Then constructs the CSR using the degree information.
*
* During the first stage, the edges are stored in the form of triplets, which
* can lead to a certain amount of memory expansion, so the `use_mmap_vector`
* can lead to a certain amount of memory expansion, so the `use-mmap-vector`
* option is provided, mmap_vector utilizes mmap to map files, supporting
* runtime memory swapping to disk.
*
* Constructing the CSR involves random reads and writes,we offer the
* `batch_init_in_memory` option, which allows CSR to be built in-memory to
* Constructing the CSR involves random reads and writes, we offer the
* `build-csr-in-mem` option, which allows CSR to be built in-memory to
* avoid extensive disk random read and write operations
*
*/
Expand All @@ -70,7 +70,7 @@ int main(int argc, char** argv) {
"data-path,d", bpo::value<std::string>(), "data directory path")(
"graph-config,g", bpo::value<std::string>(), "graph schema config file")(
"bulk-load,l", bpo::value<std::string>(), "bulk-load config file")(
"memory-batch-init,m", bpo::value<bool>(), "batch init in memory")(
"build-csr-in-mem,m", bpo::value<bool>(), "build csr in memory")(
"use-mmap-vector", bpo::value<bool>(), "use mmap vector");

google::InitGoogleLogging(argv[0]);
Expand Down Expand Up @@ -110,11 +110,10 @@ int main(int argc, char** argv) {
return -1;
}
bulk_load_config_path = vm["bulk-load"].as<std::string>();
bool batch_init_in_memory = false;
if (vm.count("memory-batch-init")) {
batch_init_in_memory = vm["memory-batch-init"].as<bool>();
LOG(INFO) << "batch init in memory: "
<< static_cast<int>(batch_init_in_memory);
bool build_csr_in_mem = false;
if (vm.count("build-csr-in-mem")) {
build_csr_in_mem = vm["build-csr-in-mem"].as<bool>();
LOG(INFO) << "batch init in memory: " << static_cast<int>(build_csr_in_mem);
}

bool use_mmap_vector = false;
Expand Down Expand Up @@ -165,7 +164,7 @@ int main(int argc, char** argv) {

auto loader = gs::LoaderFactory::CreateFragmentLoader(
data_dir_path.string(), schema_res.value(), loading_config_res.value(),
parallelism, batch_init_in_memory, use_mmap_vector);
parallelism, build_csr_in_mem, use_mmap_vector);
loader->LoadFragment();

t += grape::GetCurrentTime();
Expand Down
5 changes: 4 additions & 1 deletion flex/storages/rt_mutable_graph/csr/csr_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,11 @@ class CsrBase {
virtual size_t batch_init(const std::string& name,
const std::string& work_dir,
const std::vector<int>& degree,
bool batch_init_in_memory = false,
double reserve_ratio = 1.2) = 0;

virtual size_t batch_init_in_memory(const std::vector<int>& degree,
double reserve_ratio = 1.2) = 0;

virtual void batch_sort_by_edge_data(timestamp_t ts) {
LOG(FATAL) << "not supported...";
}
Expand Down
75 changes: 65 additions & 10 deletions flex/storages/rt_mutable_graph/csr/immutable_csr.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,21 +56,55 @@ class ImmutableCsr : public TypedImmutableCsrBase<EDATA_T> {
using slice_t = ImmutableNbrSlice<EDATA_T>;

size_t batch_init(const std::string& name, const std::string& work_dir,
const std::vector<int>& degree, bool batch_init_in_memory,
const std::vector<int>& degree, bool build_csr_in_mem,
double reserve_ratio) override {
size_t vnum = degree.size();
adj_lists_.open(work_dir + "/" + name + ".adj", !batch_init_in_memory);
adj_lists_.open(work_dir + "/" + name + ".adj", true);
adj_lists_.resize(vnum);

size_t edge_num = 0;
for (auto d : degree) {
edge_num += d;
}

nbr_list_.open(work_dir + "/" + name + ".nbr", true);
nbr_list_.resize(edge_num);

degree_list_.open(work_dir + "/" + name + ".deg", true);
degree_list_.resize(vnum);

nbr_t* ptr = nbr_list_.data();
for (vid_t i = 0; i < vnum; ++i) {
int deg = degree[i];
if (deg != 0) {
adj_lists_[i] = ptr;
} else {
adj_lists_[i] = NULL;
}
ptr += deg;

degree_list_[i] = 0;
}

unsorted_since_ = 0;
return edge_num;
}

size_t batch_init_in_memory(const std::vector<int>& degree,
double reserve_ratio) override {
size_t vnum = degree.size();
adj_lists_.reset();
adj_lists_.resize(vnum);

size_t edge_num = 0;
for (auto d : degree) {
edge_num += d;
}

nbr_list_.open(work_dir + "/" + name + ".nbr", !batch_init_in_memory);
nbr_list_.reset();
nbr_list_.resize(edge_num);

degree_list_.open(work_dir + "/" + name + ".deg", !batch_init_in_memory);
degree_list_.reset();
degree_list_.resize(vnum);

nbr_t* ptr = nbr_list_.data();
Expand Down Expand Up @@ -290,10 +324,21 @@ class SingleImmutableCsr : public TypedImmutableCsrBase<EDATA_T> {
~SingleImmutableCsr() {}

size_t batch_init(const std::string& name, const std::string& work_dir,
const std::vector<int>& degree, bool batch_init_in_memory,
const std::vector<int>& degree,
double reserve_ratio) override {
size_t vnum = degree.size();
nbr_list_.open(work_dir + "/" + name + ".snbr", !batch_init_in_memory);
nbr_list_.open(work_dir + "/" + name + ".snbr", true);
nbr_list_.resize(vnum);
for (size_t k = 0; k != vnum; ++k) {
nbr_list_[k].neighbor = std::numeric_limits<vid_t>::max();
}
return vnum;
}

size_t batch_init_in_memory(const std::vector<int>& degree,
double reserve_ratio) override {
size_t vnum = degree.size();
nbr_list_.reset();
nbr_list_.resize(vnum);
for (size_t k = 0; k != vnum; ++k) {
nbr_list_[k].neighbor = std::numeric_limits<vid_t>::max();
Expand Down Expand Up @@ -441,8 +486,6 @@ class SingleImmutableCsr : public TypedImmutableCsrBase<EDATA_T> {

const nbr_t& get_edge(vid_t i) const { return nbr_list_[i]; }

void close() override { nbr_list_.reset(); }

private:
mmap_array<nbr_t> nbr_list_;
};
Expand All @@ -458,10 +501,21 @@ class SingleImmutableCsr<std::string_view>
~SingleImmutableCsr() {}

size_t batch_init(const std::string& name, const std::string& work_dir,
const std::vector<int>& degree, bool batch_init_in_memory,
const std::vector<int>& degree,
double reserve_ratio) override {
size_t vnum = degree.size();
nbr_list_.open(work_dir + "/" + name + ".snbr", !batch_init_in_memory);
nbr_list_.open(work_dir + "/" + name + ".snbr", true);
nbr_list_.resize(vnum);
for (size_t k = 0; k != vnum; ++k) {
nbr_list_[k].neighbor = std::numeric_limits<vid_t>::max();
}
return vnum;
}

size_t batch_init_in_memory(const std::vector<int>& degree,
double reserve_ratio) override {
size_t vnum = degree.size();
nbr_list_.reset();
nbr_list_.resize(vnum);
for (size_t k = 0; k != vnum; ++k) {
nbr_list_[k].neighbor = std::numeric_limits<vid_t>::max();
Expand Down Expand Up @@ -613,6 +667,7 @@ class SingleImmutableCsr<std::string_view>
return nbr;
}
void close() override { nbr_list_.reset(); }

private:
StringColumn& column_;
mmap_array<nbr_t> nbr_list_;
Expand Down
107 changes: 95 additions & 12 deletions flex/storages/rt_mutable_graph/csr/mutable_csr.h
Original file line number Diff line number Diff line change
Expand Up @@ -149,11 +149,39 @@ class MutableCsr : public TypedMutableCsrBase<EDATA_T> {
}

size_t batch_init(const std::string& name, const std::string& work_dir,
const std::vector<int>& degree, bool batch_init_in_memory,
const std::vector<int>& degree,
double reserve_ratio) override {
reserve_ratio = std::max(reserve_ratio, 1.0);
size_t vnum = degree.size();
adj_lists_.open(work_dir + "/" + name + ".adj", !batch_init_in_memory);
adj_lists_.open(work_dir + "/" + name + ".adj", true);
adj_lists_.resize(vnum);

locks_ = new grape::SpinLock[vnum];

size_t edge_num = 0;
for (auto d : degree) {
edge_num += (std::ceil(d * reserve_ratio));
}
nbr_list_.open(work_dir + "/" + name + ".nbr", true);
nbr_list_.resize(edge_num);

nbr_t* ptr = nbr_list_.data();
for (vid_t i = 0; i < vnum; ++i) {
int deg = degree[i];
int cap = std::ceil(deg * reserve_ratio);
adj_lists_[i].init(ptr, cap, 0);
ptr += cap;
}

unsorted_since_ = 0;
return edge_num;
}

size_t batch_init_in_memory(const std::vector<int>& degree,
double reserve_ratio) override {
reserve_ratio = std::max(reserve_ratio, 1.0);
size_t vnum = degree.size();
adj_lists_.reset();
adj_lists_.resize(vnum);

locks_ = new grape::SpinLock[vnum];
Expand All @@ -162,7 +190,7 @@ class MutableCsr : public TypedMutableCsrBase<EDATA_T> {
for (auto d : degree) {
edge_num += (std::ceil(d * reserve_ratio));
}
nbr_list_.open(work_dir + "/" + name + ".nbr", !batch_init_in_memory);
nbr_list_.reset();
nbr_list_.resize(edge_num);

nbr_t* ptr = nbr_list_.data();
Expand Down Expand Up @@ -474,10 +502,10 @@ class MutableCsr<std::string_view>
}

size_t batch_init(const std::string& name, const std::string& work_dir,
const std::vector<int>& degree, bool batch_init_in_memory,
const std::vector<int>& degree,
double reserve_ratio) override {
size_t vnum = degree.size();
adj_lists_.open(work_dir + "/" + name + ".adj", !batch_init_in_memory);
adj_lists_.open(work_dir + "/" + name + ".adj", true);
adj_lists_.resize(vnum);

locks_ = new grape::SpinLock[vnum];
Expand All @@ -486,7 +514,31 @@ class MutableCsr<std::string_view>
for (auto d : degree) {
edge_num += d;
}
nbr_list_.open(work_dir + "/" + name + ".nbr", !batch_init_in_memory);
nbr_list_.open(work_dir + "/" + name + ".nbr", true);
nbr_list_.resize(edge_num);

nbr_t* ptr = nbr_list_.data();
for (vid_t i = 0; i < vnum; ++i) {
int deg = degree[i];
adj_lists_[i].init(ptr, deg, 0);
ptr += deg;
}
return edge_num;
}

size_t batch_init_in_memory(const std::vector<int>& degree,
double reserve) override {
size_t vnum = degree.size();
adj_lists_.reset();
adj_lists_.resize(vnum);

locks_ = new grape::SpinLock[vnum];

size_t edge_num = 0;
for (auto d : degree) {
edge_num += d;
}
nbr_list_.reset();
nbr_list_.resize(edge_num);

nbr_t* ptr = nbr_list_.data();
Expand Down Expand Up @@ -686,10 +738,21 @@ class SingleMutableCsr : public TypedMutableCsrBase<EDATA_T> {
~SingleMutableCsr() {}

size_t batch_init(const std::string& name, const std::string& work_dir,
const std::vector<int>& degree, bool batch_init_in_memory,
const std::vector<int>& degree,
double reserve_ratio) override {
size_t vnum = degree.size();
nbr_list_.open(work_dir + "/" + name + ".snbr", !batch_init_in_memory);
nbr_list_.open(work_dir + "/" + name + ".snbr", true);
nbr_list_.resize(vnum);
for (size_t k = 0; k != vnum; ++k) {
nbr_list_[k].timestamp.store(std::numeric_limits<timestamp_t>::max());
}
return vnum;
}

size_t batch_init_in_memory(const std::vector<int>& degree,
double reserve_ratio) override {
size_t vnum = degree.size();
nbr_list_.reset();
nbr_list_.resize(vnum);
for (size_t k = 0; k != vnum; ++k) {
nbr_list_[k].timestamp.store(std::numeric_limits<timestamp_t>::max());
Expand Down Expand Up @@ -868,10 +931,21 @@ class SingleMutableCsr<std::string_view>
~SingleMutableCsr() {}

size_t batch_init(const std::string& name, const std::string& work_dir,
const std::vector<int>& degree, bool batch_init_in_memory,
const std::vector<int>& degree,
double reserve_ratio) override {
size_t vnum = degree.size();
nbr_list_.open(work_dir + "/" + name + ".snbr", !batch_init_in_memory);
nbr_list_.open(work_dir + "/" + name + ".snbr", true);
nbr_list_.resize(vnum);
for (size_t k = 0; k != vnum; ++k) {
nbr_list_[k].timestamp.store(std::numeric_limits<timestamp_t>::max());
}
return vnum;
}

size_t batch_init_in_memory(const std::vector<int>& degree,
double reserve_ratio) override {
size_t vnum = degree.size();
nbr_list_.reset();
nbr_list_.resize(vnum);
for (size_t k = 0; k != vnum; ++k) {
nbr_list_[k].timestamp.store(std::numeric_limits<timestamp_t>::max());
Expand Down Expand Up @@ -1040,11 +1114,16 @@ class EmptyCsr : public TypedMutableCsrBase<EDATA_T> {
~EmptyCsr() = default;

size_t batch_init(const std::string& name, const std::string& work_dir,
const std::vector<int>& degree, bool batch_init_in_memory,
const std::vector<int>& degree,
double reserve_ratio) override {
return 0;
}

size_t batch_init_in_memory(const std::vector<int>& degree,
double reserve_ratio) override {
return 0;
}

void open(const std::string& name, const std::string& snapshot_dir,
const std::string& work_dir) override {}

Expand Down Expand Up @@ -1100,10 +1179,14 @@ class EmptyCsr<std::string_view>
~EmptyCsr() = default;

size_t batch_init(const std::string& name, const std::string& work_dir,
const std::vector<int>& degree, bool batch_init_in_memory,
const std::vector<int>& degree,
double reserve_ratio) override {
return 0;
}
size_t batch_init_in_memory(const std::vector<int>& degree,
double reserve_ratio) override {
return 0;
}

void open(const std::string& name, const std::string& snapshot_dir,
const std::string& work_dir) override {}
Expand Down
Loading

0 comments on commit 2e0fc70

Please sign in to comment.