Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(flex): Configure the memory strategy with memory-level. #3511

Merged
merged 5 commits into from
Jan 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 0 additions & 7 deletions flex/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ option(BUILD_TEST "Whether to build test" ON)
option(BUILD_DOC "Whether to build doc" ON)
option(BUILD_ODPS_FRAGMENT_LOADER "Whether to build odps fragment loader" ON)
option(MONITOR_SESSIONS "Whether monitor sessions" OFF)
option(ENABLE_HUGEPAGE "Whether to use hugepages when open mmap array in memory" OFF)

#print options
message(STATUS "Build HighQPS Engine: ${BUILD_HQPS}")
Expand Down Expand Up @@ -49,12 +48,6 @@ if (MONITOR_SESSIONS)
add_definitions(-DMONITOR_SESSIONS)
endif ()


if (ENABLE_HUGEPAGE)
message("Hugepage is enabled")
add_definitions(-DHUGEPAGE)
endif ()

execute_process(COMMAND uname -r OUTPUT_VARIABLE LINUX_KERNEL_VERSION)
string(STRIP ${LINUX_KERNEL_VERSION} LINUX_KERNEL_VERSION)
message(${LINUX_KERNEL_VERSION})
Expand Down
18 changes: 4 additions & 14 deletions flex/bin/rt_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ int main(int argc, char** argv) {
"graph schema config file")(
"data-path,d", bpo::value<std::string>(), "data directory path")(
"warmup,w", bpo::value<bool>()->default_value(false),
"warmup graph data")("memory-only,m",
bpo::value<bool>()->default_value(true));
"warmup graph data")("memory-level,m",
bpo::value<int>()->default_value(1));
google::InitGoogleLogging(argv[0]);
FLAGS_logtostderr = true;

Expand All @@ -58,7 +58,7 @@ int main(int argc, char** argv) {

bool enable_dpdk = false;
bool warmup = vm["warmup"].as<bool>();
bool memory_only = vm["memory-only"].as<bool>();
int memory_level = vm["memory-level"].as<int>();
uint32_t shard_num = vm["shard-num"].as<uint32_t>();
uint16_t http_port = vm["http-port"].as<uint16_t>();

Expand All @@ -84,17 +84,7 @@ int main(int argc, char** argv) {

auto schema = gs::Schema::LoadFromYaml(graph_schema_path);
gs::GraphDBConfig config(schema, data_path, shard_num);
#ifdef HUGEPAGE
config.allocator_strategy = gs::MemoryStrategy::kHugepagePrefered;
config.vertex_map_strategy = gs::MemoryStrategy::kHugepagePrefered;
config.vertex_table_strategy = gs::MemoryStrategy::kHugepagePrefered;
config.topology_strategy = gs::MemoryStrategy::kHugepagePrefered;
#else
config.allocator_strategy = gs::MemoryStrategy::kMemoryOnly;
config.vertex_map_strategy = gs::MemoryStrategy::kMemoryOnly;
config.vertex_table_strategy = gs::MemoryStrategy::kMemoryOnly;
config.topology_strategy = gs::MemoryStrategy::kMemoryOnly;
#endif
config.memory_level = memory_level;
config.enable_auto_compaction = true;
config.service_port = http_port;
db.Open(config);
Expand Down
22 changes: 11 additions & 11 deletions flex/engines/graph_db/database/graph_db.cc
Original file line number Diff line number Diff line change
Expand Up @@ -66,15 +66,9 @@ Result<bool> GraphDB::Open(const Schema& schema, const std::string& data_dir,
GraphDBConfig config(schema, data_dir, thread_num);
config.warmup = warmup;
if (memory_only) {
config.allocator_strategy = MemoryStrategy::kMemoryOnly;
config.topology_strategy = MemoryStrategy::kMemoryOnly;
config.vertex_map_strategy = MemoryStrategy::kMemoryOnly;
config.vertex_table_strategy = MemoryStrategy::kMemoryOnly;
config.memory_level = 1;
} else {
config.allocator_strategy = MemoryStrategy::kSyncToFile;
config.topology_strategy = MemoryStrategy::kSyncToFile;
config.vertex_map_strategy = MemoryStrategy::kSyncToFile;
config.vertex_table_strategy = MemoryStrategy::kSyncToFile;
config.memory_level = 0;
}
config.enable_auto_compaction = enable_auto_compaction;
config.service_port = port;
Expand All @@ -97,8 +91,7 @@ Result<bool> GraphDB::Open(const GraphDBConfig& config) {
work_dir_ = data_dir;
thread_num_ = config.thread_num;
try {
graph_.Open(data_dir, config.vertex_map_strategy,
config.vertex_table_strategy, config.topology_strategy);
graph_.Open(data_dir, config.memory_level);
} catch (std::exception& e) {
LOG(ERROR) << "Exception: " << e.what();
return Result<bool>(StatusCode::InternalError,
Expand Down Expand Up @@ -129,7 +122,14 @@ Result<bool> GraphDB::Open(const GraphDBConfig& config) {
mutable_schema.EmplacePlugins(plugin_paths);

last_compaction_ts_ = 0;
openWalAndCreateContexts(data_dir, config.allocator_strategy);
MemoryStrategy allocator_strategy = MemoryStrategy::kMemoryOnly;
if (config.memory_level == 0) {
allocator_strategy = MemoryStrategy::kSyncToFile;
} else if (config.memory_level >= 2) {
allocator_strategy = MemoryStrategy::kHugepagePrefered;
}

openWalAndCreateContexts(data_dir, allocator_strategy);

if ((!create_empty_graph) && config.warmup) {
graph_.Warmup(thread_num_);
Expand Down
17 changes: 9 additions & 8 deletions flex/engines/graph_db/database/graph_db.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,21 +49,22 @@ struct GraphDBConfig {
warmup(false),
enable_auto_compaction(false),
service_port(-1),
vertex_map_strategy(MemoryStrategy::kMemoryOnly),
vertex_table_strategy(MemoryStrategy::kMemoryOnly),
topology_strategy(MemoryStrategy::kMemoryOnly),
allocator_strategy(MemoryStrategy::kMemoryOnly) {}
memory_level(1) {}

Schema schema;
std::string data_dir;
int thread_num;
bool warmup;
bool enable_auto_compaction;
int service_port;
MemoryStrategy vertex_map_strategy;
MemoryStrategy vertex_table_strategy;
MemoryStrategy topology_strategy;
MemoryStrategy allocator_strategy;

/*
0 - sync with disk;
1 - mmap virtual memory;
2 - prefering hugepages;
3 - force hugepages;
*/
int memory_level;
};

class GraphDB {
Expand Down
6 changes: 0 additions & 6 deletions flex/storages/rt_mutable_graph/dual_csr.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,12 @@ class DualCsrBase {
const std::string& edata_name,
const std::string& snapshot_dir,
size_t src_vertex_cap, size_t dst_vertex_cap) = 0;
#ifdef HUGEPAGE
virtual void OpenWithHugepages(const std::string& oe_name,
const std::string& ie_name,
const std::string& edata_name,
const std::string& snapshot_dir,
size_t src_vertex_cap,
size_t dst_vertex_cap) = 0;
#endif
virtual void Dump(const std::string& oe_name, const std::string& ie_name,
const std::string& edata_name,
const std::string& new_snapshot_dir) = 0;
Expand Down Expand Up @@ -120,15 +118,13 @@ class DualCsr : public DualCsrBase {
out_csr_->open_in_memory(snapshot_dir + "/" + oe_name, src_vertex_cap);
}

#ifdef HUGEPAGE
void OpenWithHugepages(const std::string& oe_name, const std::string& ie_name,
const std::string& edata_name,
const std::string& snapshot_dir, size_t src_vertex_cap,
size_t dst_vertex_cap) override {
in_csr_->open_with_hugepages(snapshot_dir + "/" + ie_name, dst_vertex_cap);
out_csr_->open_with_hugepages(snapshot_dir + "/" + oe_name, src_vertex_cap);
}
#endif

void Dump(const std::string& oe_name, const std::string& ie_name,
const std::string& edata_name,
Expand Down Expand Up @@ -261,14 +257,12 @@ class DualCsr<std::string_view> : public DualCsrBase {
column_.resize(std::max(column_.size() + (column_.size() + 4) / 5, 4096ul));
}

#ifdef HUGEPAGE
void OpenWithHugepages(const std::string& oe_name, const std::string& ie_name,
const std::string& edata_name,
const std::string& snapshot_dir, size_t src_vertex_cap,
size_t dst_vertex_cap) override {
LOG(FATAL) << "not supported...";
}
#endif

void Dump(const std::string& oe_name, const std::string& ie_name,
const std::string& edata_name,
Expand Down
8 changes: 0 additions & 8 deletions flex/storages/rt_mutable_graph/mutable_csr.h
Original file line number Diff line number Diff line change
Expand Up @@ -501,12 +501,10 @@ class MutableCsrBase {
const std::string& work_dir) = 0;

virtual void open_in_memory(const std::string& prefix, size_t v_cap = 0) = 0;
#ifdef HUGEPAGE
virtual void open_with_hugepages(const std::string& prefix,
size_t v_cap = 0) {
LOG(FATAL) << "not supported...";
}
#endif

virtual void dump(const std::string& name,
const std::string& new_spanshot_dir) = 0;
Expand Down Expand Up @@ -768,7 +766,6 @@ class MutableCsr : public TypedMutableCsrBase<EDATA_T> {
}
}

#ifdef HUGEPAGE
void open_with_hugepages(const std::string& prefix, size_t v_cap) override {
mmap_array<int> degree_list;
degree_list.open(prefix + ".deg", false);
Expand Down Expand Up @@ -802,7 +799,6 @@ class MutableCsr : public TypedMutableCsrBase<EDATA_T> {
delete cap_list;
}
}
#endif

void warmup(int thread_num) const override {
size_t vnum = adj_lists_.size();
Expand Down Expand Up @@ -1282,7 +1278,6 @@ class SingleMutableCsr : public TypedMutableCsrBase<EDATA_T> {
}
}

#ifdef HUGEPAGE
void open_with_hugepages(const std::string& prefix, size_t v_cap) override {
nbr_list_.open_with_hugepages(prefix + ".snbr", v_cap);
size_t old_size = nbr_list_.size();
Expand All @@ -1293,7 +1288,6 @@ class SingleMutableCsr : public TypedMutableCsrBase<EDATA_T> {
}
}
}
#endif

void dump(const std::string& name,
const std::string& new_snapshot_dir) override {
Expand Down Expand Up @@ -1640,9 +1634,7 @@ class EmptyCsr : public TypedMutableCsrBase<EDATA_T> {

void open_in_memory(const std::string& prefix, size_t v_cap) override {}

#ifdef HUGEPAGE
void open_with_hugepages(const std::string& prefix, size_t v_cap) override {}
#endif

void dump(const std::string& name,
const std::string& new_spanshot_dir) override {}
Expand Down
77 changes: 32 additions & 45 deletions flex/storages/rt_mutable_graph/mutable_property_fragment.cc
Original file line number Diff line number Diff line change
Expand Up @@ -112,15 +112,7 @@ inline DualCsrBase* create_csr(EdgeStrategy oes, EdgeStrategy ies,
}

void MutablePropertyFragment::Open(const std::string& work_dir,
bool memory_only) {
Open(work_dir, MemoryStrategy::kMemoryOnly, MemoryStrategy::kMemoryOnly,
MemoryStrategy::kMemoryOnly);
}

void MutablePropertyFragment::Open(const std::string& work_dir,
MemoryStrategy vertex_map_strategy,
MemoryStrategy vertex_table_strategy,
MemoryStrategy topology_strategy) {
int memory_level) {
std::string schema_file = schema_path(work_dir);
std::string snapshot_dir{};
bool build_empty_graph = false;
Expand Down Expand Up @@ -153,46 +145,44 @@ void MutablePropertyFragment::Open(const std::string& work_dir,
for (size_t i = 0; i < vertex_label_num_; ++i) {
std::string v_label_name = schema_.get_vertex_label_name(i);

if (vertex_map_strategy == MemoryStrategy::kMemoryOnly) {
lf_indexers_[i].open_in_memory(snapshot_dir + "/" +
vertex_map_prefix(v_label_name));
#ifdef HUGEPAGE
} else if (vertex_map_strategy == MemoryStrategy::kHugepagePrefered) {
lf_indexers_[i].open_with_hugepages(snapshot_dir + "/" +
vertex_map_prefix(v_label_name));
#endif
} else {
assert(vertex_map_strategy == MemoryStrategy::kSyncToFile);
if (memory_level == 0) {
lf_indexers_[i].open(vertex_map_prefix(v_label_name), snapshot_dir,
tmp_dir_path);
}

if (vertex_table_strategy == MemoryStrategy::kMemoryOnly) {
vertex_data_[i].open(vertex_table_prefix(v_label_name), snapshot_dir,
tmp_dir_path, schema_.get_vertex_property_names(i),
schema_.get_vertex_properties(i),
schema_.get_vertex_storage_strategies(v_label_name));
if (!build_empty_graph) {
vertex_data_[i].copy_to_tmp(vertex_table_prefix(v_label_name),
snapshot_dir, tmp_dir_path);
}
} else if (memory_level == 1) {
lf_indexers_[i].open_in_memory(snapshot_dir + "/" +
vertex_map_prefix(v_label_name));
vertex_data_[i].open_in_memory(
vertex_table_prefix(v_label_name), snapshot_dir,
schema_.get_vertex_property_names(i),
schema_.get_vertex_properties(i),
schema_.get_vertex_storage_strategies(v_label_name));
#ifdef HUGEPAGE
} else if (vertex_table_strategy == MemoryStrategy::kHugepagePrefered) {
} else if (memory_level == 2) {
lf_indexers_[i].open_with_hugepages(
snapshot_dir + "/" + vertex_map_prefix(v_label_name), false);
vertex_data_[i].open_with_hugepages(
vertex_table_prefix(v_label_name), snapshot_dir,
schema_.get_vertex_property_names(i),
schema_.get_vertex_properties(i),
schema_.get_vertex_storage_strategies(v_label_name));
#endif
schema_.get_vertex_storage_strategies(v_label_name), false);
} else {
assert(vertex_table_strategy == MemoryStrategy::kSyncToFile);
vertex_data_[i].open(vertex_table_prefix(v_label_name), snapshot_dir,
tmp_dir_path, schema_.get_vertex_property_names(i),
schema_.get_vertex_properties(i),
schema_.get_vertex_storage_strategies(v_label_name));
}
if (!build_empty_graph &&
(vertex_table_strategy == MemoryStrategy::kSyncToFile)) {
vertex_data_[i].copy_to_tmp(vertex_table_prefix(v_label_name),
snapshot_dir, tmp_dir_path);
assert(memory_level == 3);
lf_indexers_[i].open_with_hugepages(
snapshot_dir + "/" + vertex_map_prefix(v_label_name), true);
vertex_data_[i].open_with_hugepages(
vertex_table_prefix(v_label_name), snapshot_dir,
schema_.get_vertex_property_names(i),
schema_.get_vertex_properties(i),
schema_.get_vertex_storage_strategies(v_label_name), true);
}

size_t vertex_capacity =
schema_.get_max_vnum(v_label_name); // lf_indexers_[i].capacity();
if (build_empty_graph) {
Expand Down Expand Up @@ -236,27 +226,24 @@ void MutablePropertyFragment::Open(const std::string& work_dir,
create_csr(oe_strategy, ie_strategy, properties);
ie_[index] = dual_csr_list_[index]->GetInCsr();
oe_[index] = dual_csr_list_[index]->GetOutCsr();
if (topology_strategy == MemoryStrategy::kMemoryOnly) {
dual_csr_list_[index]->OpenInMemory(
if (memory_level == 0) {
dual_csr_list_[index]->Open(
oe_prefix(src_label, dst_label, edge_label),
ie_prefix(src_label, dst_label, edge_label),
edata_prefix(src_label, dst_label, edge_label), snapshot_dir,
vertex_capacities[src_label_i], vertex_capacities[dst_label_i]);
#ifdef HUGEPAGE
} else if (topology_strategy == MemoryStrategy::kHugepagePrefered) {
tmp_dir_path);
} else if (memory_level >= 2) {
dual_csr_list_[index]->OpenWithHugepages(
oe_prefix(src_label, dst_label, edge_label),
ie_prefix(src_label, dst_label, edge_label),
edata_prefix(src_label, dst_label, edge_label), snapshot_dir,
vertex_capacities[src_label_i], vertex_capacities[dst_label_i]);
#endif
} else {
assert(topology_strategy == MemoryStrategy::kSyncToFile);
dual_csr_list_[index]->Open(
dual_csr_list_[index]->OpenInMemory(
oe_prefix(src_label, dst_label, edge_label),
ie_prefix(src_label, dst_label, edge_label),
edata_prefix(src_label, dst_label, edge_label), snapshot_dir,
tmp_dir_path);
vertex_capacities[src_label_i], vertex_capacities[dst_label_i]);
}
ie_[index]->resize(vertex_capacities[dst_label_i]);
oe_[index]->resize(vertex_capacities[src_label_i]);
Expand Down
6 changes: 1 addition & 5 deletions flex/storages/rt_mutable_graph/mutable_property_fragment.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,7 @@ class MutablePropertyFragment {
vid_t dst_lid, label_t edge_label, timestamp_t ts,
const Any& arc, Allocator& alloc);

void Open(const std::string& work_dir, bool memory_only);

void Open(const std::string& work_dir, MemoryStrategy vertex_map_strategy,
MemoryStrategy vertex_table_strategy,
MemoryStrategy topology_strategy);
void Open(const std::string& work_dir, int memory_level);

void Compact(uint32_t version);

Expand Down
Loading
Loading