Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(interactive): Fix some issues of AdminService #3761

Merged
merged 5 commits into from
May 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/flex.yml
Original file line number Diff line number Diff line change
Expand Up @@ -196,8 +196,8 @@ jobs:
rm -rf /tmp/csr-data-dir/

cd ${GITHUB_WORKSPACE}/flex/build/
SCHEMA_FILE=${GS_TEST_DIR}/type_test/graph.yaml
BULK_LOAD_FILE=${GS_TEST_DIR}/type_test/import.yaml
SCHEMA_FILE=${GS_TEST_DIR}/flex/type_test/graph.yaml
BULK_LOAD_FILE=${GS_TEST_DIR}/flex/type_test/import.yaml
GLOG_v=10 ./bin/bulk_loader -g ${SCHEMA_FILE} -l ${BULK_LOAD_FILE} -d /tmp/csr-data-dir/ -p 2

- name: Test Graph Loading on LDBC SNB sf0.1
Expand Down
21 changes: 1 addition & 20 deletions flex/engines/hqps_db/core/operator/group_by.h
Original file line number Diff line number Diff line change
Expand Up @@ -390,10 +390,7 @@ class GroupByOp {
std::integer_sequence<int32_t, 0>>>) {
auto& builder = std::get<0>(value_set_builder_tuple);
auto size = ctx.GetHead().Size();
std::tuple<std::tuple<grape::EmptyType>> empty_tuple;
for (size_t i = 0; i < size; ++i) {
builder.insert(0, empty_tuple, empty_tuple);
}
builder.inc_count(0, size);
} else {
for (auto iter : ctx) {
auto ele_tuple = iter.GetAllIndexElement();
Expand All @@ -410,22 +407,6 @@ class GroupByOp {
std::make_index_sequence<grouped_value_num>());
return RES_T(std::move(std::get<0>(value_set_built)),
ctx.get_sub_task_start_tag());

// // create offset array with one-one mapping.
// if (grouped_value_num == 1) {
// } else {
// auto offset_vec = make_offset_vector(
// grouped_value_num - 1, std::get<0>(value_set_built).size() + 1);
// VLOG(10) << "after group by, the set size: " << keyed_set_built.Size();
// VLOG(10) << "offset vec: " << offset_vec.size();
// VLOG(10) << "," << offset_vec[0].size();

// RES_T res(std::move(std::get<grouped_value_num - 1>(value_set_built)),
// std::move(gs::tuple_slice<0, grouped_value_num - 1>(
// std::move(value_set_built))),
// std::move(offset_vec));
// return res;
// }
}

// group by only one key_alias
Expand Down
9 changes: 8 additions & 1 deletion flex/engines/hqps_db/structures/collection.h
Original file line number Diff line number Diff line change
Expand Up @@ -426,8 +426,15 @@ class CountBuilder {
return true;
}

bool inc_count(size_t ind, int32_t v) {
while (vec_.size() <= ind) {
vec_.emplace_back(0);
}
vec_[ind] += v;
return true;
}

Collection<int64_t> Build() {
// VLOG(10) << "Finish building counter" << gs::to_string(vec_);
return Collection<int64_t>(std::move(vec_));
}

Expand Down
29 changes: 25 additions & 4 deletions flex/engines/http_server/actor/admin_actor.act.cc
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,20 @@ std::string merge_graph_and_plugin_meta(
return res.empty() ? "{}" : res.dump();
}

gs::Result<YAML::Node> preprocess_vertex_schema(YAML::Node root,
const std::string& type_name) {
// 1. To support open a empty graph, we should check if the x_csr_params is
// set for each vertex type, if not set, we set it to a rather small max_vnum,
// to avoid to much memory usage.
auto types = root[type_name];
for (auto type : types) {
if (!type["x_csr_params"]) {
type["x_csr_params"]["max_vertex_num"] = 8192;
}
}
return types;
}

gs::Result<YAML::Node> preprocess_vertex_edge_types(
YAML::Node root, const std::string& type_name) {
auto types = root[type_name];
Expand Down Expand Up @@ -94,13 +108,16 @@ gs::Result<YAML::Node> preprocess_vertex_edge_types(
// vertex/edge types should all set.
// 2. If property_id or type_id is not set, then set them according to the order
gs::Result<YAML::Node> preprocess_graph_schema(YAML::Node&& node) {
if (node["schema"] && node["schema"]["vertex_types"] &&
node["schema"]["edge_types"]) {
if (node["schema"] && node["schema"]["vertex_types"]) {
// First check whether property_id or type_id is set in the schema
RETURN_IF_NOT_OK(
preprocess_vertex_edge_types(node["schema"], "vertex_types"));
RETURN_IF_NOT_OK(
preprocess_vertex_edge_types(node["schema"], "edge_types"));
RETURN_IF_NOT_OK(preprocess_vertex_schema(node["schema"], "vertex_types"));
if (node["schema"]["edge_types"]) {
// edge_type could be optional.
RETURN_IF_NOT_OK(
preprocess_vertex_edge_types(node["schema"], "edge_types"));
}
return node;
} else {
return gs::Status(gs::StatusCode::InvalidSchema, "Invalid graph schema: ");
Expand Down Expand Up @@ -925,6 +942,7 @@ seastar::future<admin_query_result> admin_actor::start_service(
// use the previous thread num
auto thread_num = db.SessionNum();
db.Close();
VLOG(10) << "Closed the previous graph db";
if (!db.Open(schema_value, data_dir_value, thread_num).ok()) {
LOG(ERROR) << "Fail to load graph from data directory: "
<< data_dir_value;
Expand All @@ -938,6 +956,8 @@ seastar::future<admin_query_result> admin_actor::start_service(
gs::StatusCode::InternalError,
"Fail to load graph from data directory: " + data_dir_value)));
}
LOG(INFO) << "Successfully load graph from data directory: "
<< data_dir_value;
// unlock the previous graph
if (graph_name != cur_running_graph) {
auto unlock_res =
Expand All @@ -951,6 +971,7 @@ seastar::future<admin_query_result> admin_actor::start_service(
gs::Result<seastar::sstring>(unlock_res.status()));
}
}
LOG(INFO) << "Update running graph to: " << graph_name;
auto set_res = metadata_store_->SetRunningGraph(graph_name);
if (!set_res.ok()) {
LOG(ERROR) << "Fail to set running graph: " << graph_name;
Expand Down
16 changes: 13 additions & 3 deletions flex/engines/http_server/workdir_manipulator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -631,12 +631,16 @@ std::string WorkDirManipulator::GetCompilerLogFile() {
return log_path;
}

std::string WorkDirManipulator::CommitTempIndices(const std::string& graph_id) {
gs::Result<std::string> WorkDirManipulator::CommitTempIndices(
const std::string& graph_id) {
auto temp_indices_dir = GetTempIndicesDir(graph_id);
auto indices_dir = GetGraphIndicesDir(graph_id);
if (std::filesystem::exists(indices_dir)) {
std::filesystem::remove_all(indices_dir);
}
if (!std::filesystem::exists(temp_indices_dir)) {
return {gs::Status(gs::StatusCode::NotFound, "Temp indices dir not found")};
}
std::filesystem::rename(temp_indices_dir, indices_dir);
return indices_dir;
}
Expand Down Expand Up @@ -774,7 +778,7 @@ gs::Result<seastar::sstring> WorkDirManipulator::load_graph_impl(
VLOG(10) << "Graph loader finished, job_id: " << internal_job_id
<< ", res: " << res;

LOG(INFO) << "Updating graph meta";
LOG(INFO) << "Updating job meta and graph meta";
auto exit_request = gs::UpdateJobMetaRequest::NewFinished(res);
auto update_exit_res =
metadata_store->UpdateJobMeta(internal_job_id, exit_request);
Expand All @@ -800,7 +804,13 @@ gs::Result<seastar::sstring> WorkDirManipulator::load_graph_impl(

LOG(INFO) << "Committing temp indices for graph: "
<< copied_graph_id;
WorkDirManipulator::CommitTempIndices(copied_graph_id);
auto commit_res =
WorkDirManipulator::CommitTempIndices(copied_graph_id);
if (!commit_res.ok()) {
LOG(ERROR) << "Fail to commit temp indices for graph: "
<< copied_graph_id;
return gs::Result<seastar::sstring>(commit_res.status());
}
return gs::Result<seastar::sstring>(
"Finish Loading and commit temp "
"indices");
Expand Down
3 changes: 2 additions & 1 deletion flex/engines/http_server/workdir_manipulator.h
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,8 @@ class WorkDirManipulator {
static std::string CleanTempIndicesDir(const std::string& graph_name);

// Move the temp indices to the graph indices dir.
static std::string CommitTempIndices(const std::string& graph_name);
static gs::Result<std::string> CommitTempIndices(
const std::string& graph_name);

private:
static std::string get_tmp_bulk_loading_job_log_path(
Expand Down
10 changes: 8 additions & 2 deletions flex/storages/metadata/graph_meta_store.cc
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ std::string GraphMeta::ToJson() const {
json["stored_procedures"].push_back(
nlohmann::json::parse(plugin_meta.ToJson()));
}
json["store_type"] = store_type;
return json.dump();
}

Expand Down Expand Up @@ -107,6 +108,11 @@ GraphMeta GraphMeta::FromJson(const nlohmann::json& json) {
meta.plugin_metas.push_back(PluginMeta::FromJson(plugin));
}
}
if (json.contains("store_type")) {
meta.store_type = json["store_type"].get<std::string>();
} else {
meta.store_type = "mutable_csr";
}
return meta;
}

Expand Down Expand Up @@ -636,8 +642,8 @@ CreateJobMetaRequest CreateJobMetaRequest::NewRunning(

std::string CreateJobMetaRequest::ToString() const {
nlohmann::json json;
json["graph_id"] = graph_id;
json["process_id"] = process_id;
json["detail"]["graph_id"] = graph_id;
json["detail"]["process_id"] = process_id;
json["start_time"] = start_time;
json["status"] = std::to_string(status);
json["log_path"] = log_path;
Expand Down
1 change: 1 addition & 0 deletions flex/storages/metadata/graph_meta_store.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ struct GraphMeta {
uint64_t data_update_time;
std::string data_import_config;
std::string schema;
std::string store_type{"mutable_csr"};

std::vector<PluginMeta> plugin_metas;

Expand Down