From baf432255e6758515c90429d81b5ec787a4c305b Mon Sep 17 00:00:00 2001 From: "xiaolei.zl" Date: Mon, 29 Apr 2024 08:42:08 +0000 Subject: [PATCH 1/4] fix some issue fix remove some comment add store_type --- flex/engines/hqps_db/core/operator/group_by.h | 21 +- flex/engines/hqps_db/structures/collection.h | 9 +- .../http_server/actor/admin_actor.act.cc | 29 +- .../http_server/workdir_manipulator.cc | 16 +- .../engines/http_server/workdir_manipulator.h | 3 +- .../metadata/default_graph_meta_store.cc | 2 + flex/storages/metadata/graph_meta_store.cc | 10 +- flex/storages/metadata/graph_meta_store.h | 1 + flex/storages/rt_mutable_graph/schema.cc | 287 +++++++++++------- .../common/ir/meta/schema/Utils.java | 21 +- 10 files changed, 242 insertions(+), 157 deletions(-) diff --git a/flex/engines/hqps_db/core/operator/group_by.h b/flex/engines/hqps_db/core/operator/group_by.h index aff1efe7b872..c2ee6e0df8e6 100644 --- a/flex/engines/hqps_db/core/operator/group_by.h +++ b/flex/engines/hqps_db/core/operator/group_by.h @@ -390,10 +390,7 @@ class GroupByOp { std::integer_sequence>>) { auto& builder = std::get<0>(value_set_builder_tuple); auto size = ctx.GetHead().Size(); - std::tuple> empty_tuple; - for (size_t i = 0; i < size; ++i) { - builder.insert(0, empty_tuple, empty_tuple); - } + builder.inc_count(0, size); } else { for (auto iter : ctx) { auto ele_tuple = iter.GetAllIndexElement(); @@ -410,22 +407,6 @@ class GroupByOp { std::make_index_sequence()); return RES_T(std::move(std::get<0>(value_set_built)), ctx.get_sub_task_start_tag()); - - // // create offset array with one-one mapping. - // if (grouped_value_num == 1) { - // } else { - // auto offset_vec = make_offset_vector( - // grouped_value_num - 1, std::get<0>(value_set_built).size() + 1); - // VLOG(10) << "after group by, the set size: " << keyed_set_built.Size(); - // VLOG(10) << "offset vec: " << offset_vec.size(); - // VLOG(10) << "," << offset_vec[0].size(); - - // RES_T res(std::move(std::get(value_set_built)), - // std::move(gs::tuple_slice<0, grouped_value_num - 1>( - // std::move(value_set_built))), - // std::move(offset_vec)); - // return res; - // } } // group by only one key_alias diff --git a/flex/engines/hqps_db/structures/collection.h b/flex/engines/hqps_db/structures/collection.h index 3eba157d38ce..0c9addabc95e 100644 --- a/flex/engines/hqps_db/structures/collection.h +++ b/flex/engines/hqps_db/structures/collection.h @@ -426,8 +426,15 @@ class CountBuilder { return true; } + bool inc_count(size_t ind, int32_t v) { + while (vec_.size() <= ind) { + vec_.emplace_back(0); + } + vec_[ind] += v; + return true; + } + Collection Build() { - // VLOG(10) << "Finish building counter" << gs::to_string(vec_); return Collection(std::move(vec_)); } diff --git a/flex/engines/http_server/actor/admin_actor.act.cc b/flex/engines/http_server/actor/admin_actor.act.cc index c0bd8a5ecac8..304202199c23 100644 --- a/flex/engines/http_server/actor/admin_actor.act.cc +++ b/flex/engines/http_server/actor/admin_actor.act.cc @@ -50,6 +50,20 @@ std::string merge_graph_and_plugin_meta( return res.empty() ? "{}" : res.dump(); } +gs::Result preprocess_vertex_schema(YAML::Node root, + const std::string& type_name) { + // 1. To support open a empty graph, we should check if the x_csr_params is + // set for each vertex type, if not set, we set it to a rather small max_vnum, + // to avoid to much memory usage. + auto types = root[type_name]; + for (auto type : types) { + if (!type["x_csr_params"]) { + type["x_csr_params"]["max_vertex_num"] = 8192; + } + } + return types; +} + gs::Result preprocess_vertex_edge_types( YAML::Node root, const std::string& type_name) { auto types = root[type_name]; @@ -94,13 +108,16 @@ gs::Result preprocess_vertex_edge_types( // vertex/edge types should all set. // 2. If property_id or type_id is not set, then set them according to the order gs::Result preprocess_graph_schema(YAML::Node&& node) { - if (node["schema"] && node["schema"]["vertex_types"] && - node["schema"]["edge_types"]) { + if (node["schema"] && node["schema"]["vertex_types"]) { // First check whether property_id or type_id is set in the schema RETURN_IF_NOT_OK( preprocess_vertex_edge_types(node["schema"], "vertex_types")); - RETURN_IF_NOT_OK( - preprocess_vertex_edge_types(node["schema"], "edge_types")); + RETURN_IF_NOT_OK(preprocess_vertex_schema(node["schema"], "vertex_types")); + if (node["schema"]["edge_types"]) { + // edge_type could be optional. + RETURN_IF_NOT_OK( + preprocess_vertex_edge_types(node["schema"], "edge_types")); + } return node; } else { return gs::Status(gs::StatusCode::InvalidSchema, "Invalid graph schema: "); @@ -925,6 +942,7 @@ seastar::future admin_actor::start_service( // use the previous thread num auto thread_num = db.SessionNum(); db.Close(); + VLOG(10) << "Closed the previous graph db"; if (!db.Open(schema_value, data_dir_value, thread_num).ok()) { LOG(ERROR) << "Fail to load graph from data directory: " << data_dir_value; @@ -938,6 +956,8 @@ seastar::future admin_actor::start_service( gs::StatusCode::InternalError, "Fail to load graph from data directory: " + data_dir_value))); } + LOG(INFO) << "Successfully load graph from data directory: " + << data_dir_value; // unlock the previous graph if (graph_name != cur_running_graph) { auto unlock_res = @@ -951,6 +971,7 @@ seastar::future admin_actor::start_service( gs::Result(unlock_res.status())); } } + LOG(INFO) << "Update running graph to: " << graph_name; auto set_res = metadata_store_->SetRunningGraph(graph_name); if (!set_res.ok()) { LOG(ERROR) << "Fail to set running graph: " << graph_name; diff --git a/flex/engines/http_server/workdir_manipulator.cc b/flex/engines/http_server/workdir_manipulator.cc index 5fc2ee5cb8b8..4c80e7600f09 100644 --- a/flex/engines/http_server/workdir_manipulator.cc +++ b/flex/engines/http_server/workdir_manipulator.cc @@ -631,12 +631,16 @@ std::string WorkDirManipulator::GetCompilerLogFile() { return log_path; } -std::string WorkDirManipulator::CommitTempIndices(const std::string& graph_id) { +gs::Result WorkDirManipulator::CommitTempIndices( + const std::string& graph_id) { auto temp_indices_dir = GetTempIndicesDir(graph_id); auto indices_dir = GetGraphIndicesDir(graph_id); if (std::filesystem::exists(indices_dir)) { std::filesystem::remove_all(indices_dir); } + if (!std::filesystem::exists(temp_indices_dir)) { + return {gs::Status(gs::StatusCode::NotFound, "Temp indices dir not found")}; + } std::filesystem::rename(temp_indices_dir, indices_dir); return indices_dir; } @@ -774,7 +778,7 @@ gs::Result WorkDirManipulator::load_graph_impl( VLOG(10) << "Graph loader finished, job_id: " << internal_job_id << ", res: " << res; - LOG(INFO) << "Updating graph meta"; + LOG(INFO) << "Updating job meta and graph meta"; auto exit_request = gs::UpdateJobMetaRequest::NewFinished(res); auto update_exit_res = metadata_store->UpdateJobMeta(internal_job_id, exit_request); @@ -800,7 +804,13 @@ gs::Result WorkDirManipulator::load_graph_impl( LOG(INFO) << "Committing temp indices for graph: " << copied_graph_id; - WorkDirManipulator::CommitTempIndices(copied_graph_id); + auto commit_res = + WorkDirManipulator::CommitTempIndices(copied_graph_id); + if (!commit_res.ok()) { + LOG(ERROR) << "Fail to commit temp indices for graph: " + << copied_graph_id; + return gs::Result(commit_res.status()); + } return gs::Result( "Finish Loading and commit temp " "indices"); diff --git a/flex/engines/http_server/workdir_manipulator.h b/flex/engines/http_server/workdir_manipulator.h index 347197090e14..98766b437aa0 100644 --- a/flex/engines/http_server/workdir_manipulator.h +++ b/flex/engines/http_server/workdir_manipulator.h @@ -166,7 +166,8 @@ class WorkDirManipulator { static std::string CleanTempIndicesDir(const std::string& graph_name); // Move the temp indices to the graph indices dir. - static std::string CommitTempIndices(const std::string& graph_name); + static gs::Result CommitTempIndices( + const std::string& graph_name); private: static std::string get_tmp_bulk_loading_job_log_path( diff --git a/flex/storages/metadata/default_graph_meta_store.cc b/flex/storages/metadata/default_graph_meta_store.cc index 711a07a79132..e1d9d30bef2b 100644 --- a/flex/storages/metadata/default_graph_meta_store.cc +++ b/flex/storages/metadata/default_graph_meta_store.cc @@ -272,6 +272,7 @@ Result DefaultGraphMetaStore::DeleteJobMeta(const JobId& job_id) { Result DefaultGraphMetaStore::UpdateJobMeta( const JobId& job_id, const UpdateJobMetaRequest& update_request) { + LOG(INFO) << "Update job meta: " << job_id; return base_store_->UpdateMeta( JOB_META, job_id, [&update_request](const std::string& old_meta) { nlohmann::json json; @@ -282,6 +283,7 @@ Result DefaultGraphMetaStore::UpdateJobMeta( return Result( Status(StatusCode::InternalError, "Fail to parse old job meta")); } + LOG(INFO) << "old job meta: " << json.dump(); auto job_meta = JobMeta::FromJson(json); if (update_request.status.has_value()) { job_meta.status = update_request.status.value(); diff --git a/flex/storages/metadata/graph_meta_store.cc b/flex/storages/metadata/graph_meta_store.cc index 4bb34554081d..ce575db6c1f2 100644 --- a/flex/storages/metadata/graph_meta_store.cc +++ b/flex/storages/metadata/graph_meta_store.cc @@ -73,6 +73,7 @@ std::string GraphMeta::ToJson() const { json["stored_procedures"].push_back( nlohmann::json::parse(plugin_meta.ToJson())); } + json["store_type"] = store_type; return json.dump(); } @@ -107,6 +108,11 @@ GraphMeta GraphMeta::FromJson(const nlohmann::json& json) { meta.plugin_metas.push_back(PluginMeta::FromJson(plugin)); } } + if (json.contains("store_type")) { + meta.store_type = json["store_type"].get(); + } else { + meta.store_type = "mutable_csr"; + } return meta; } @@ -636,8 +642,8 @@ CreateJobMetaRequest CreateJobMetaRequest::NewRunning( std::string CreateJobMetaRequest::ToString() const { nlohmann::json json; - json["graph_id"] = graph_id; - json["process_id"] = process_id; + json["detail"]["graph_id"] = graph_id; + json["detail"]["process_id"] = process_id; json["start_time"] = start_time; json["status"] = std::to_string(status); json["log_path"] = log_path; diff --git a/flex/storages/metadata/graph_meta_store.h b/flex/storages/metadata/graph_meta_store.h index 5bf9461fd947..6ab649e6a5d4 100644 --- a/flex/storages/metadata/graph_meta_store.h +++ b/flex/storages/metadata/graph_meta_store.h @@ -63,6 +63,7 @@ struct GraphMeta { uint64_t data_update_time; std::string data_import_config; std::string schema; + std::string store_type{"mutable_csr"}; std::vector plugin_metas; diff --git a/flex/storages/rt_mutable_graph/schema.cc b/flex/storages/rt_mutable_graph/schema.cc index f88454d107da..5340e88d2d40 100644 --- a/flex/storages/rt_mutable_graph/schema.cc +++ b/flex/storages/rt_mutable_graph/schema.cc @@ -515,21 +515,23 @@ static bool parse_property_type(YAML::Node node, PropertyType& type) { } } -static bool parse_vertex_properties(YAML::Node node, - const std::string& label_name, - std::vector& types, - std::vector& names, - std::vector& strategies, - const std::string& version) { +static Status parse_vertex_properties(YAML::Node node, + const std::string& label_name, + std::vector& types, + std::vector& names, + std::vector& strategies, + const std::string& version) { if (!node || !node.IsSequence()) { LOG(ERROR) << "Expect properties for " << label_name << " to be a sequence"; - return false; + return Status(StatusCode::InvalidSchema, + "Expect properties for " + label_name + " to be a sequence"); } int prop_num = node.size(); if (prop_num == 0) { LOG(ERROR) << "At least one property is needed for " << label_name; - return false; + return Status(StatusCode::InvalidSchema, + "At least one property is needed for " + label_name); } for (int i = 0; i < prop_num; ++i) { @@ -537,19 +539,25 @@ static bool parse_vertex_properties(YAML::Node node, if (!get_scalar(node[i], "property_name", prop_name_str)) { LOG(ERROR) << "Name of vertex-" << label_name << " prop-" << i - 1 << " is not specified..."; - return false; + return Status(StatusCode::InvalidSchema, + "Name of vertex-" + label_name + " prop-" + + std::to_string(i - 1) + " is not specified..."); } if (!node[i]["property_type"]) { LOG(ERROR) << "type of vertex-" << label_name << " prop-" << i - 1 << " is not specified..."; - return false; + return Status(StatusCode::InvalidSchema, + "type of vertex-" + label_name + " prop-" + + std::to_string(i - 1) + " is not specified..."); } auto prop_type_node = node[i]["property_type"]; PropertyType prop_type; if (!parse_property_type(prop_type_node, prop_type)) { LOG(ERROR) << "Fail to parse property type of vertex-" << label_name << " prop-" << i - 1; - return false; + return Status(StatusCode::InvalidSchema, + "Fail to parse property type of vertex-" + label_name + + " prop-" + std::to_string(i - 1)); } { if (node[i]["x_csr_params"]) { @@ -563,22 +571,24 @@ static bool parse_vertex_properties(YAML::Node node, names.push_back(prop_name_str); } - return true; + return Status::OK(); } -static bool parse_edge_properties(YAML::Node node, - const std::string& label_name, - std::vector& types, - std::vector& names, - const std::string& version) { +static Status parse_edge_properties(YAML::Node node, + const std::string& label_name, + std::vector& types, + std::vector& names, + const std::string& version) { if (!node) { VLOG(10) << "Found no edge properties specified for edge: " << label_name; - return true; + return Status::OK(); } if (!node.IsSequence()) { LOG(ERROR) << "properties of edge -" << label_name << " not set properly, should be a sequence..."; - return false; + return Status(StatusCode::InvalidSchema, + "properties of edge -" + label_name + + " not set properly, should be a sequence..."); } int prop_num = node.size(); @@ -588,37 +598,44 @@ static bool parse_edge_properties(YAML::Node node, if (!node[i]["property_type"]) { LOG(ERROR) << "type of edge-" << label_name << " prop-" << i - 1 << " is not specified..."; - return false; + return Status(StatusCode::InvalidSchema, + "type of edge-" + label_name + " prop-" + + std::to_string(i - 1) + " is not specified..."); } auto prop_type_node = node[i]["property_type"]; PropertyType prop_type; if (!parse_property_type(prop_type_node, prop_type)) { LOG(ERROR) << "type of edge-" << label_name << " prop-" << i - 1 << " is not specified..."; - return false; + return Status(StatusCode::InvalidSchema, + "type of edge-" + label_name + " prop-" + + std::to_string(i - 1) + " is not specified..."); } if (!get_scalar(node[i], "property_name", prop_name_str)) { LOG(ERROR) << "name of edge-" << label_name << " prop-" << i - 1 << " is not specified..."; - return false; + return Status(StatusCode::InvalidSchema, + "name of edge-" + label_name + " prop-" + + std::to_string(i - 1) + " is not specified..."); } types.push_back(prop_type); names.push_back(prop_name_str); } - return true; + return Status::OK(); } -static bool parse_vertex_schema(YAML::Node node, Schema& schema) { +static Status parse_vertex_schema(YAML::Node node, Schema& schema) { std::string label_name; if (!get_scalar(node, "type_name", label_name)) { - return false; + return Status(StatusCode::InvalidSchema, "vertex type_name is not set"); } // Cannot add two vertex label with same name if (schema.has_vertex_label(label_name)) { LOG(ERROR) << "Vertex label " << label_name << " already exists"; - return false; + return Status(StatusCode::InvalidSchema, + "Vertex label " + label_name + " already exists"); } size_t max_num = ((size_t) 1) << 32; @@ -637,27 +654,28 @@ static bool parse_vertex_schema(YAML::Node node, Schema& schema) { if (node["nullable"]) { LOG(ERROR) << "nullable is not supported yet"; - return false; + return Status(StatusCode::Unimplemented, "nullable is not supported yet"); } if (node["default_value"]) { LOG(ERROR) << "default_value is not supported yet"; - return false; + return Status(StatusCode::Unimplemented, + "default_value is not supported yet"); } - if (!parse_vertex_properties(node["properties"], label_name, property_types, - property_names, strategies, - schema.GetVersion())) { - return false; - } + RETURN_IF_NOT_OK(parse_vertex_properties(node["properties"], label_name, + property_types, property_names, + strategies, schema.GetVersion())); if (!node["primary_keys"]) { LOG(ERROR) << "Expect field primary_keys for " << label_name; - return false; + return Status(StatusCode::InvalidSchema, + "Expect field primary_keys for " + label_name); } auto primary_key_node = node["primary_keys"]; if (!primary_key_node.IsSequence()) { LOG(ERROR) << "[Primary_keys] should be sequence"; - return false; + return Status(StatusCode::InvalidSchema, + "[Primary_keys] should be sequence"); } // remove primary key from properties. @@ -675,7 +693,9 @@ static bool parse_vertex_schema(YAML::Node node, Schema& schema) { if (primary_key_inds[i] == -1) { LOG(ERROR) << "Primary key " << primary_key_name << " is not found in properties"; - return false; + return Status( + StatusCode::InvalidSchema, + "Primary key " + primary_key_name + " is not found in properties"); } if (property_types[primary_key_inds[i]] != PropertyType::kInt64 && property_types[primary_key_inds[i]] != PropertyType::kString && @@ -684,7 +704,9 @@ static bool parse_vertex_schema(YAML::Node node, Schema& schema) { property_types[primary_key_inds[i]] != PropertyType::kUInt32) { LOG(ERROR) << "Primary key " << primary_key_name << " should be int64 or string"; - return false; + return Status( + StatusCode::InvalidSchema, + "Primary key " + primary_key_name + " should be int64 or string"); } primary_keys.emplace_back(property_types[primary_key_inds[i]], property_names[primary_key_inds[i]], @@ -701,56 +723,57 @@ static bool parse_vertex_schema(YAML::Node node, Schema& schema) { int32_t type_id; if (!get_scalar(node, "type_id", type_id)) { LOG(ERROR) << "type_id is not set properly for type: " << label_name; - return false; + return Status(StatusCode::InvalidSchema, + "type_id is not set properly for type: " + label_name); } auto label_id = schema.get_vertex_label_id(label_name); if (label_id != type_id) { LOG(ERROR) << "type_id is not equal to label_id for type: " << label_name; - return false; + return Status(StatusCode::InvalidSchema, + "type_id is not equal to label_id for type: " + label_name); } - return true; + return Status::OK(); } -static bool parse_vertices_schema(YAML::Node node, Schema& schema) { +static Status parse_vertices_schema(YAML::Node node, Schema& schema) { if (!node.IsSequence()) { LOG(ERROR) << "vertex is not set properly"; - return false; + return Status(StatusCode::InvalidSchema, "vertex is not set properly"); } int num = node.size(); for (int i = 0; i < num; ++i) { - if (!parse_vertex_schema(node[i], schema)) { - return false; - } + RETURN_IF_NOT_OK(parse_vertex_schema(node[i], schema)); } - return true; + return Status::OK(); } -static bool parse_edge_schema(YAML::Node node, Schema& schema) { +static Status parse_edge_schema(YAML::Node node, Schema& schema) { std::string edge_label_name; if (!node["type_name"]) { LOG(ERROR) << "edge type_name is not set properly"; - return false; + return Status(StatusCode::InvalidSchema, + "edge type_name is not set properly"); } edge_label_name = node["type_name"].as(); std::vector property_types; std::vector prop_names; std::string description; // default is empty string - if (!parse_edge_properties(node["properties"], edge_label_name, - property_types, prop_names, schema.GetVersion())) { - return false; - } + RETURN_IF_NOT_OK(parse_edge_properties(node["properties"], edge_label_name, + property_types, prop_names, + schema.GetVersion())); if (node["description"]) { description = node["description"].as(); } if (node["nullable"]) { LOG(ERROR) << "nullable is not supported yet"; - return false; + return Status(StatusCode::Unimplemented, "nullable is not supported yet"); } if (node["default_value"]) { LOG(ERROR) << "default_value is not supported yet"; - return false; + return Status(StatusCode::Unimplemented, + "default_value is not supported yet"); } EdgeStrategy default_ie = EdgeStrategy::kMultiple; @@ -762,11 +785,13 @@ static bool parse_edge_schema(YAML::Node node, Schema& schema) { // vertex_type_pair_node can be a list or a map if (!vertex_type_pair_node) { LOG(ERROR) << "edge [vertex_type_pair_relations] is not set"; - return false; + return Status(StatusCode::InvalidSchema, + "edge [vertex_type_pair_relations] is not set"); } if (!vertex_type_pair_node.IsSequence()) { LOG(ERROR) << "edge [vertex_type_pair_relations] should be a sequence"; - return false; + return Status(StatusCode::InvalidSchema, + "edge [vertex_type_pair_relations] should be a sequence"); } for (size_t i = 0; i < vertex_type_pair_node.size(); ++i) { std::string src_label_name, dst_label_name; @@ -777,19 +802,25 @@ static bool parse_edge_schema(YAML::Node node, Schema& schema) { if (!get_scalar(cur_node, "source_vertex", src_label_name)) { LOG(ERROR) << "Expect field source_vertex for edge [" << edge_label_name << "] in vertex_type_pair_relations"; - return false; + return Status(StatusCode::InvalidSchema, + "Expect field source_vertex for edge [" + edge_label_name + + "] in vertex_type_pair_relations"); } if (!get_scalar(cur_node, "destination_vertex", dst_label_name)) { LOG(ERROR) << "Expect field destination_vertex for edge [" << edge_label_name << "] in vertex_type_pair_relations"; - return false; + return Status(StatusCode::InvalidSchema, + "Expect field destination_vertex for edge [" + + edge_label_name + "] in vertex_type_pair_relations"); } // check whether edge triplet exists in current schema if (schema.has_edge_label(src_label_name, dst_label_name, edge_label_name)) { LOG(ERROR) << "Edge [" << edge_label_name << "] from [" << src_label_name << "] to [" << dst_label_name << "] already exists"; - return false; + return Status(StatusCode::InvalidSchema, + "Edge [" + edge_label_name + "] from [" + src_label_name + + "] to [" + dst_label_name + "] already exists"); } std::string relation_str; @@ -824,7 +855,11 @@ static bool parse_edge_schema(YAML::Node node, Schema& schema) { LOG(ERROR) << "edge_storage_strategy is not set properly for edge: " << src_label_name << "-[" << edge_label_name << "]->" << dst_label_name; - return false; + return Status( + StatusCode::InvalidSchema, + "edge_storage_strategy is not set properly for edge: " + + src_label_name + "-[" + edge_label_name + "]->" + + dst_label_name); } } } @@ -847,7 +882,10 @@ static bool parse_edge_schema(YAML::Node node, Schema& schema) { LOG(ERROR) << "sort_on_compaction is not set properly for edge: " << src_label_name << "-[" << edge_label_name << "]->" << dst_label_name << "expect TRUE/FALSE"; - return false; + return Status(StatusCode::InvalidSchema, + "sort_on_compaction is not set properly for edge: " + + src_label_name + "-[" + edge_label_name + "]->" + + dst_label_name + "expect TRUE/FALSE"); } } } else { @@ -870,7 +908,10 @@ static bool parse_edge_schema(YAML::Node node, Schema& schema) { << src_label_name << "-[" << edge_label_name << "]->" << dst_label_name << ", expect IMMUTABLE/MUTABLE, got:" << mutability_str; - return false; + return Status(StatusCode::InvalidSchema, + "oe_mutability is not set properly for edge: " + + src_label_name + "-[" + edge_label_name + "]->" + + dst_label_name + ", expect IMMUTABLE/MUTABLE"); } } } @@ -889,7 +930,10 @@ static bool parse_edge_schema(YAML::Node node, Schema& schema) { << src_label_name << "-[" << edge_label_name << "]->" << dst_label_name << ", expect IMMUTABLE/MUTABLE, got:" << mutability_str; - return false; + return Status(StatusCode::InvalidSchema, + "ie_mutability is not set properly for edge: " + + src_label_name + "-[" + edge_label_name + "]->" + + dst_label_name + ", expect IMMUTABLE/MUTABLE"); } } } @@ -908,35 +952,36 @@ static bool parse_edge_schema(YAML::Node node, Schema& schema) { int32_t type_id; if (!get_scalar(node, "type_id", type_id)) { LOG(ERROR) << "type_id is not set properly for type: " << edge_label_name; - return false; + return Status(StatusCode::InvalidSchema, + "type_id is not set properly for type: " + edge_label_name); } auto label_id = schema.get_edge_label_id(edge_label_name); if (label_id != type_id) { LOG(ERROR) << "type_id is not equal to label_id for type: " << edge_label_name; - return false; + return Status( + StatusCode::InvalidSchema, + "type_id is not equal to label_id for type: " + edge_label_name); } - return true; + return Status::OK(); } -static bool parse_edges_schema(YAML::Node node, Schema& schema) { +static Status parse_edges_schema(YAML::Node node, Schema& schema) { if (!node.IsSequence()) { LOG(ERROR) << "edge is not set properly"; - return false; + return Status(StatusCode::InvalidSchema, "edge is not set properly"); } int num = node.size(); VLOG(10) << "Try to parse " << num << "edge configuration"; for (int i = 0; i < num; ++i) { - if (!parse_edge_schema(node[i], schema)) { - return false; - } + RETURN_IF_NOT_OK(parse_edge_schema(node[i], schema)); } - return true; + return Status::OK(); } -static bool parse_stored_procedures_v00(const YAML::Node& stored_procedure_node, - const std::string& parent_dir, - Schema& schema) { +static Status parse_stored_procedures_v00( + const YAML::Node& stored_procedure_node, const std::string& parent_dir, + Schema& schema) { std::string directory = "plugins"; // default plugin directory if (stored_procedure_node["directory"]) { directory = stored_procedure_node["directory"].as(); @@ -957,7 +1002,8 @@ static bool parse_stored_procedures_v00(const YAML::Node& stored_procedure_node, std::vector plugin_names; if (!get_sequence(stored_procedure_node, "enable_lists", plugin_names)) { LOG(ERROR) << "stored_procedures is not set properly"; - return false; + return Status(StatusCode::InvalidSchema, + "stored_procedures is not set properly"); } for (auto& plugin_name : plugin_names) { plugin_name_or_path.emplace_back(plugin_name, ""); @@ -969,16 +1015,17 @@ static bool parse_stored_procedures_v00(const YAML::Node& stored_procedure_node, // for name, we try to find the plugin in the directory if (!schema.EmplacePlugins(plugin_name_or_path)) { LOG(ERROR) << "Fail to emplace all plugins"; - return false; + return Status(StatusCode::InvalidSchema, "Fail to emplace all plugins"); } - return true; + return Status::OK(); } -static bool parse_stored_procedures_v01(const YAML::Node& stored_procedure_node, - Schema& schema) { +static Status parse_stored_procedures_v01( + const YAML::Node& stored_procedure_node, Schema& schema) { if (!stored_procedure_node.IsSequence()) { LOG(ERROR) << "stored_procedures is not set properly"; - return false; + return Status(StatusCode::InvalidSchema, + "stored_procedures is not set properly"); } std::vector> plugin_name_and_path; for (auto& cur_node : stored_procedure_node) { @@ -988,20 +1035,21 @@ static bool parse_stored_procedures_v01(const YAML::Node& stored_procedure_node, cur_node["library"].as())); } else { LOG(ERROR) << "Library or name set properly for stored procedure"; - return false; + return Status(StatusCode::InvalidSchema, + "Library or name set properly for stored procedure"); } } // emplace all the plugins if (!schema.EmplacePlugins(plugin_name_and_path)) { LOG(ERROR) << "Fail to emplace all plugins"; - return false; + return Status(StatusCode::InvalidSchema, "Fail to emplace all plugins"); } - return true; + return Status::OK(); } -static bool parse_stored_procedures(const YAML::Node& stored_procedure_node, - const std::string& parent_dir, - Schema& schema) { +static Status parse_stored_procedures(const YAML::Node& stored_procedure_node, + const std::string& parent_dir, + Schema& schema) { auto version = schema.GetVersion(); if (version == "v0.0") { return parse_stored_procedures_v00(stored_procedure_node, parent_dir, @@ -1010,16 +1058,19 @@ static bool parse_stored_procedures(const YAML::Node& stored_procedure_node, return parse_stored_procedures_v01(stored_procedure_node, schema); } else { LOG(ERROR) << "Unrecognized version: " << version; - return false; + return Status( + StatusCode::InvalidSchema, + "Unsupported version when parsing stored procedures: " + version); } } -static bool parse_schema_from_yaml_node(const YAML::Node& graph_node, - Schema& schema, - const std::string& parent_dir = "") { +static Status parse_schema_from_yaml_node(const YAML::Node& graph_node, + Schema& schema, + const std::string& parent_dir = "") { if (!graph_node || !graph_node.IsMap()) { - LOG(ERROR) << "graph is not set properly"; - return false; + LOG(ERROR) << "graph schema is not set properly"; + return Status(StatusCode::InvalidSchema, + "graph schema is not set properly"); } if (!expect_config(graph_node, "store_type", std::string("mutable_csr"))) { LOG(WARNING) << "store_type is not set properly, use default value: " @@ -1039,7 +1090,8 @@ static bool parse_schema_from_yaml_node(const YAML::Node& graph_node, if (std::find(supported_versions.begin(), supported_versions.end(), version) == supported_versions.end()) { LOG(ERROR) << "Unsupported schema version: " << version; - return false; + return Status(StatusCode::InvalidSchema, + "Unsupported schema version: " + version); } schema.SetVersion(version); } else { @@ -1050,31 +1102,27 @@ static bool parse_schema_from_yaml_node(const YAML::Node& graph_node, auto schema_node = graph_node["schema"]; if (!graph_node["schema"]) { - LOG(ERROR) << "schema is not set in scheme yaml file"; - return false; + LOG(ERROR) << "expect schema field, but not found"; + return Status(StatusCode::InvalidSchema, + "expect schema field, but not found"); } - if (!parse_vertices_schema(schema_node["vertex_types"], schema)) { - return false; - } + RETURN_IF_NOT_OK(parse_vertices_schema(schema_node["vertex_types"], schema)); if (schema_node["edge_types"]) { - if (!parse_edges_schema(schema_node["edge_types"], schema)) { - return false; - } + RETURN_IF_NOT_OK(parse_edges_schema(schema_node["edge_types"], schema)); } LOG(INFO) << "Parse stored_procedures"; if (graph_node["stored_procedures"]) { - if (!parse_stored_procedures(graph_node["stored_procedures"], parent_dir, - schema)) { - LOG(ERROR) << "Fail to parse stored procedures"; - } + RETURN_IF_NOT_OK(parse_stored_procedures(graph_node["stored_procedures"], + parent_dir, schema)); } - return true; + return Status::OK(); } -static bool parse_schema_config_file(const std::string& path, Schema& schema) { +static Status parse_schema_config_file(const std::string& path, + Schema& schema) { YAML::Node graph_node = YAML::LoadFile(path); // get the directory of path auto parent_dir = std::filesystem::path(path).parent_path().string(); @@ -1260,22 +1308,27 @@ bool Schema::has_edge_label(const std::string& src_label, Result Schema::LoadFromYaml(const std::string& schema_config) { Schema schema; if (!schema_config.empty() && std::filesystem::exists(schema_config)) { - if (!config_parsing::parse_schema_config_file(schema_config, schema)) { - LOG(ERROR) << "Failed to parse schema config file: " << schema_config; - return Result( - Status(StatusCode::InvalidSchema, "Failed to parse schema"), schema); + auto status = + config_parsing::parse_schema_config_file(schema_config, schema); + if (status.ok()) { + return schema; + } else { + return Result(status); } } - return schema; + return Result( + Status(StatusCode::InvalidSchema, "Schema config file not found")); } Result Schema::LoadFromYamlNode(const YAML::Node& schema_yaml_node) { Schema schema; - if (!config_parsing::parse_schema_from_yaml_node(schema_yaml_node, schema)) { - return Result( - Status(StatusCode::InvalidSchema, "Failed to parse schema"), schema); + auto status = + config_parsing::parse_schema_from_yaml_node(schema_yaml_node, schema); + if (status.ok()) { + return schema; + } else { + return Result(status); } - return schema; } const std::vector& Schema::GetCompatibleVersions() { diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/meta/schema/Utils.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/meta/schema/Utils.java index 8557a387f424..9bee31bec865 100644 --- a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/meta/schema/Utils.java +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/meta/schema/Utils.java @@ -61,15 +61,18 @@ public static final GraphSchema buildSchemaFromYaml(String schemaYaml) { edgeMap, propNameToIdMap, typeConvertor); - builderGraphElementFromYaml( - (List) - Objects.requireNonNull( - schemaMap.get("edge_types"), "edge_types not exist in yaml config"), - "EDGE", - vertexMap, - edgeMap, - propNameToIdMap, - typeConvertor); + if (schemaMap.get("edge_types") != null) { + builderGraphElementFromYaml( + (List) + Objects.requireNonNull( + schemaMap.get("edge_types"), + "edge_types not exist in yaml config"), + "EDGE", + vertexMap, + edgeMap, + propNameToIdMap, + typeConvertor); + } return new DefaultGraphSchema(vertexMap, edgeMap, propNameToIdMap); } From d350374e2600a4aca92222cb29208480e6e724c8 Mon Sep 17 00:00:00 2001 From: "xiaolei.zl" Date: Wed, 15 May 2024 06:35:58 +0000 Subject: [PATCH 2/4] don't return error if parsing stored_procedures failed --- flex/storages/rt_mutable_graph/schema.cc | 2 -- 1 file changed, 2 deletions(-) diff --git a/flex/storages/rt_mutable_graph/schema.cc b/flex/storages/rt_mutable_graph/schema.cc index 5340e88d2d40..9a7bf11e0a7e 100644 --- a/flex/storages/rt_mutable_graph/schema.cc +++ b/flex/storages/rt_mutable_graph/schema.cc @@ -1002,8 +1002,6 @@ static Status parse_stored_procedures_v00( std::vector plugin_names; if (!get_sequence(stored_procedure_node, "enable_lists", plugin_names)) { LOG(ERROR) << "stored_procedures is not set properly"; - return Status(StatusCode::InvalidSchema, - "stored_procedures is not set properly"); } for (auto& plugin_name : plugin_names) { plugin_name_or_path.emplace_back(plugin_name, ""); From 4cb0ed66aca789cfba61645051d7c33a43748a28 Mon Sep 17 00:00:00 2001 From: "xiaolei.zl" Date: Thu, 16 May 2024 06:43:11 +0000 Subject: [PATCH 3/4] fix --- flex/storages/metadata/default_graph_meta_store.cc | 2 -- 1 file changed, 2 deletions(-) diff --git a/flex/storages/metadata/default_graph_meta_store.cc b/flex/storages/metadata/default_graph_meta_store.cc index e1d9d30bef2b..711a07a79132 100644 --- a/flex/storages/metadata/default_graph_meta_store.cc +++ b/flex/storages/metadata/default_graph_meta_store.cc @@ -272,7 +272,6 @@ Result DefaultGraphMetaStore::DeleteJobMeta(const JobId& job_id) { Result DefaultGraphMetaStore::UpdateJobMeta( const JobId& job_id, const UpdateJobMetaRequest& update_request) { - LOG(INFO) << "Update job meta: " << job_id; return base_store_->UpdateMeta( JOB_META, job_id, [&update_request](const std::string& old_meta) { nlohmann::json json; @@ -283,7 +282,6 @@ Result DefaultGraphMetaStore::UpdateJobMeta( return Result( Status(StatusCode::InternalError, "Fail to parse old job meta")); } - LOG(INFO) << "old job meta: " << json.dump(); auto job_meta = JobMeta::FromJson(json); if (update_request.status.has_value()) { job_meta.status = update_request.status.value(); From 2b059c223ba756090f202ed4b69fe8bd58e53358 Mon Sep 17 00:00:00 2001 From: "xiaolei.zl" Date: Thu, 16 May 2024 07:55:12 +0000 Subject: [PATCH 4/4] fix ci --- .github/workflows/flex.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/flex.yml b/.github/workflows/flex.yml index 3eb7baea5644..4690c2ad654e 100644 --- a/.github/workflows/flex.yml +++ b/.github/workflows/flex.yml @@ -196,8 +196,8 @@ jobs: rm -rf /tmp/csr-data-dir/ cd ${GITHUB_WORKSPACE}/flex/build/ - SCHEMA_FILE=${GS_TEST_DIR}/type_test/graph.yaml - BULK_LOAD_FILE=${GS_TEST_DIR}/type_test/import.yaml + SCHEMA_FILE=${GS_TEST_DIR}/flex/type_test/graph.yaml + BULK_LOAD_FILE=${GS_TEST_DIR}/flex/type_test/import.yaml GLOG_v=10 ./bin/bulk_loader -g ${SCHEMA_FILE} -l ${BULK_LOAD_FILE} -d /tmp/csr-data-dir/ -p 2 - name: Test Graph Loading on LDBC SNB sf0.1