Skip to content

Commit

Permalink
fix(interactive): Fix some issues of AdminService (#3761)
Browse files Browse the repository at this point in the history
- Create a graph with no edge types.
- Return the detail message when error happens. e.g. creating graph with
vertex have `DT_DOUBLE` primary keys(currently not supported)
- Support open and running query on an empty graph.
- Other issues.

Fix #3707 
Fix #3693
  • Loading branch information
zhanglei1949 authored May 17, 2024
1 parent f15605b commit 6e5dbc1
Show file tree
Hide file tree
Showing 10 changed files with 240 additions and 159 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/flex.yml
Original file line number Diff line number Diff line change
Expand Up @@ -196,8 +196,8 @@ jobs:
rm -rf /tmp/csr-data-dir/
cd ${GITHUB_WORKSPACE}/flex/build/
SCHEMA_FILE=${GS_TEST_DIR}/type_test/graph.yaml
BULK_LOAD_FILE=${GS_TEST_DIR}/type_test/import.yaml
SCHEMA_FILE=${GS_TEST_DIR}/flex/type_test/graph.yaml
BULK_LOAD_FILE=${GS_TEST_DIR}/flex/type_test/import.yaml
GLOG_v=10 ./bin/bulk_loader -g ${SCHEMA_FILE} -l ${BULK_LOAD_FILE} -d /tmp/csr-data-dir/ -p 2
- name: Test Graph Loading on LDBC SNB sf0.1
Expand Down
21 changes: 1 addition & 20 deletions flex/engines/hqps_db/core/operator/group_by.h
Original file line number Diff line number Diff line change
Expand Up @@ -390,10 +390,7 @@ class GroupByOp {
std::integer_sequence<int32_t, 0>>>) {
auto& builder = std::get<0>(value_set_builder_tuple);
auto size = ctx.GetHead().Size();
std::tuple<std::tuple<grape::EmptyType>> empty_tuple;
for (size_t i = 0; i < size; ++i) {
builder.insert(0, empty_tuple, empty_tuple);
}
builder.inc_count(0, size);
} else {
for (auto iter : ctx) {
auto ele_tuple = iter.GetAllIndexElement();
Expand All @@ -410,22 +407,6 @@ class GroupByOp {
std::make_index_sequence<grouped_value_num>());
return RES_T(std::move(std::get<0>(value_set_built)),
ctx.get_sub_task_start_tag());

// // create offset array with one-one mapping.
// if (grouped_value_num == 1) {
// } else {
// auto offset_vec = make_offset_vector(
// grouped_value_num - 1, std::get<0>(value_set_built).size() + 1);
// VLOG(10) << "after group by, the set size: " << keyed_set_built.Size();
// VLOG(10) << "offset vec: " << offset_vec.size();
// VLOG(10) << "," << offset_vec[0].size();

// RES_T res(std::move(std::get<grouped_value_num - 1>(value_set_built)),
// std::move(gs::tuple_slice<0, grouped_value_num - 1>(
// std::move(value_set_built))),
// std::move(offset_vec));
// return res;
// }
}

// group by only one key_alias
Expand Down
9 changes: 8 additions & 1 deletion flex/engines/hqps_db/structures/collection.h
Original file line number Diff line number Diff line change
Expand Up @@ -426,8 +426,15 @@ class CountBuilder {
return true;
}

bool inc_count(size_t ind, int32_t v) {
while (vec_.size() <= ind) {
vec_.emplace_back(0);
}
vec_[ind] += v;
return true;
}

Collection<int64_t> Build() {
// VLOG(10) << "Finish building counter" << gs::to_string(vec_);
return Collection<int64_t>(std::move(vec_));
}

Expand Down
29 changes: 25 additions & 4 deletions flex/engines/http_server/actor/admin_actor.act.cc
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,20 @@ std::string merge_graph_and_plugin_meta(
return res.empty() ? "{}" : res.dump();
}

gs::Result<YAML::Node> preprocess_vertex_schema(YAML::Node root,
const std::string& type_name) {
// 1. To support open a empty graph, we should check if the x_csr_params is
// set for each vertex type, if not set, we set it to a rather small max_vnum,
// to avoid to much memory usage.
auto types = root[type_name];
for (auto type : types) {
if (!type["x_csr_params"]) {
type["x_csr_params"]["max_vertex_num"] = 8192;
}
}
return types;
}

gs::Result<YAML::Node> preprocess_vertex_edge_types(
YAML::Node root, const std::string& type_name) {
auto types = root[type_name];
Expand Down Expand Up @@ -94,13 +108,16 @@ gs::Result<YAML::Node> preprocess_vertex_edge_types(
// vertex/edge types should all set.
// 2. If property_id or type_id is not set, then set them according to the order
gs::Result<YAML::Node> preprocess_graph_schema(YAML::Node&& node) {
if (node["schema"] && node["schema"]["vertex_types"] &&
node["schema"]["edge_types"]) {
if (node["schema"] && node["schema"]["vertex_types"]) {
// First check whether property_id or type_id is set in the schema
RETURN_IF_NOT_OK(
preprocess_vertex_edge_types(node["schema"], "vertex_types"));
RETURN_IF_NOT_OK(
preprocess_vertex_edge_types(node["schema"], "edge_types"));
RETURN_IF_NOT_OK(preprocess_vertex_schema(node["schema"], "vertex_types"));
if (node["schema"]["edge_types"]) {
// edge_type could be optional.
RETURN_IF_NOT_OK(
preprocess_vertex_edge_types(node["schema"], "edge_types"));
}
return node;
} else {
return gs::Status(gs::StatusCode::InvalidSchema, "Invalid graph schema: ");
Expand Down Expand Up @@ -925,6 +942,7 @@ seastar::future<admin_query_result> admin_actor::start_service(
// use the previous thread num
auto thread_num = db.SessionNum();
db.Close();
VLOG(10) << "Closed the previous graph db";
if (!db.Open(schema_value, data_dir_value, thread_num).ok()) {
LOG(ERROR) << "Fail to load graph from data directory: "
<< data_dir_value;
Expand All @@ -938,6 +956,8 @@ seastar::future<admin_query_result> admin_actor::start_service(
gs::StatusCode::InternalError,
"Fail to load graph from data directory: " + data_dir_value)));
}
LOG(INFO) << "Successfully load graph from data directory: "
<< data_dir_value;
// unlock the previous graph
if (graph_name != cur_running_graph) {
auto unlock_res =
Expand All @@ -951,6 +971,7 @@ seastar::future<admin_query_result> admin_actor::start_service(
gs::Result<seastar::sstring>(unlock_res.status()));
}
}
LOG(INFO) << "Update running graph to: " << graph_name;
auto set_res = metadata_store_->SetRunningGraph(graph_name);
if (!set_res.ok()) {
LOG(ERROR) << "Fail to set running graph: " << graph_name;
Expand Down
16 changes: 13 additions & 3 deletions flex/engines/http_server/workdir_manipulator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -631,12 +631,16 @@ std::string WorkDirManipulator::GetCompilerLogFile() {
return log_path;
}

std::string WorkDirManipulator::CommitTempIndices(const std::string& graph_id) {
gs::Result<std::string> WorkDirManipulator::CommitTempIndices(
const std::string& graph_id) {
auto temp_indices_dir = GetTempIndicesDir(graph_id);
auto indices_dir = GetGraphIndicesDir(graph_id);
if (std::filesystem::exists(indices_dir)) {
std::filesystem::remove_all(indices_dir);
}
if (!std::filesystem::exists(temp_indices_dir)) {
return {gs::Status(gs::StatusCode::NotFound, "Temp indices dir not found")};
}
std::filesystem::rename(temp_indices_dir, indices_dir);
return indices_dir;
}
Expand Down Expand Up @@ -774,7 +778,7 @@ gs::Result<seastar::sstring> WorkDirManipulator::load_graph_impl(
VLOG(10) << "Graph loader finished, job_id: " << internal_job_id
<< ", res: " << res;

LOG(INFO) << "Updating graph meta";
LOG(INFO) << "Updating job meta and graph meta";
auto exit_request = gs::UpdateJobMetaRequest::NewFinished(res);
auto update_exit_res =
metadata_store->UpdateJobMeta(internal_job_id, exit_request);
Expand All @@ -800,7 +804,13 @@ gs::Result<seastar::sstring> WorkDirManipulator::load_graph_impl(

LOG(INFO) << "Committing temp indices for graph: "
<< copied_graph_id;
WorkDirManipulator::CommitTempIndices(copied_graph_id);
auto commit_res =
WorkDirManipulator::CommitTempIndices(copied_graph_id);
if (!commit_res.ok()) {
LOG(ERROR) << "Fail to commit temp indices for graph: "
<< copied_graph_id;
return gs::Result<seastar::sstring>(commit_res.status());
}
return gs::Result<seastar::sstring>(
"Finish Loading and commit temp "
"indices");
Expand Down
3 changes: 2 additions & 1 deletion flex/engines/http_server/workdir_manipulator.h
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,8 @@ class WorkDirManipulator {
static std::string CleanTempIndicesDir(const std::string& graph_name);

// Move the temp indices to the graph indices dir.
static std::string CommitTempIndices(const std::string& graph_name);
static gs::Result<std::string> CommitTempIndices(
const std::string& graph_name);

private:
static std::string get_tmp_bulk_loading_job_log_path(
Expand Down
10 changes: 8 additions & 2 deletions flex/storages/metadata/graph_meta_store.cc
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ std::string GraphMeta::ToJson() const {
json["stored_procedures"].push_back(
nlohmann::json::parse(plugin_meta.ToJson()));
}
json["store_type"] = store_type;
return json.dump();
}

Expand Down Expand Up @@ -107,6 +108,11 @@ GraphMeta GraphMeta::FromJson(const nlohmann::json& json) {
meta.plugin_metas.push_back(PluginMeta::FromJson(plugin));
}
}
if (json.contains("store_type")) {
meta.store_type = json["store_type"].get<std::string>();
} else {
meta.store_type = "mutable_csr";
}
return meta;
}

Expand Down Expand Up @@ -636,8 +642,8 @@ CreateJobMetaRequest CreateJobMetaRequest::NewRunning(

std::string CreateJobMetaRequest::ToString() const {
nlohmann::json json;
json["graph_id"] = graph_id;
json["process_id"] = process_id;
json["detail"]["graph_id"] = graph_id;
json["detail"]["process_id"] = process_id;
json["start_time"] = start_time;
json["status"] = std::to_string(status);
json["log_path"] = log_path;
Expand Down
1 change: 1 addition & 0 deletions flex/storages/metadata/graph_meta_store.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ struct GraphMeta {
uint64_t data_update_time;
std::string data_import_config;
std::string schema;
std::string store_type{"mutable_csr"};

std::vector<PluginMeta> plugin_metas;

Expand Down
Loading

0 comments on commit 6e5dbc1

Please sign in to comment.