diff --git a/analytical_engine/core/context/labeled_vertex_property_context.h b/analytical_engine/core/context/labeled_vertex_property_context.h index cd372a09f94b..9fc8be6fd17a 100644 --- a/analytical_engine/core/context/labeled_vertex_property_context.h +++ b/analytical_engine/core/context/labeled_vertex_property_context.h @@ -224,7 +224,7 @@ class LabeledVertexPropertyContextWrapper switch (selector.type()) { case SelectorType::kVertexId: { - auto type_id = trans_utils.GetOidTypeId(); + BOOST_LEAF_AUTO(type_id, trans_utils.GetOidTypeId()); if (comm_spec.fid() == 0) { *arc << static_cast(type_id); *arc << total_num; @@ -316,7 +316,7 @@ class LabeledVertexPropertyContextWrapper switch (selector.type()) { case SelectorType::kVertexId: { - auto type_id = trans_utils.GetOidTypeId(); + BOOST_LEAF_AUTO(type_id, trans_utils.GetOidTypeId()); if (comm_spec.fid() == 0) { *arc << static_cast(type_id); diff --git a/analytical_engine/core/context/vertex_data_context.h b/analytical_engine/core/context/vertex_data_context.h index c16b9290901a..49167cc3abd0 100644 --- a/analytical_engine/core/context/vertex_data_context.h +++ b/analytical_engine/core/context/vertex_data_context.h @@ -222,7 +222,7 @@ class VertexDataContextWrapper : public IVertexDataContextWrapper { switch (selector.type()) { case SelectorType::kVertexId: { // N.B. This method must be invoked on every worker! - auto type_id = trans_utils.GetOidTypeId(); + BOOST_LEAF_AUTO(type_id, trans_utils.GetOidTypeId()); if (comm_spec.fid() == 0) { *arc << static_cast(type_id); *arc << total_num; @@ -304,7 +304,7 @@ class VertexDataContextWrapper : public IVertexDataContextWrapper { size_t old_size; switch (selector.type()) { case SelectorType::kVertexId: { - auto type_id = trans_utils.GetOidTypeId(); + BOOST_LEAF_AUTO(type_id, trans_utils.GetOidTypeId()); if (comm_spec.fid() == 0) { *arc << static_cast(type_id); } diff --git a/analytical_engine/core/context/vertex_property_context.h b/analytical_engine/core/context/vertex_property_context.h index 52a93bb2ce75..74ee57d9f532 100644 --- a/analytical_engine/core/context/vertex_property_context.h +++ b/analytical_engine/core/context/vertex_property_context.h @@ -183,7 +183,7 @@ class VertexPropertyContextWrapper : public IVertexPropertyContextWrapper { switch (selector.type()) { case SelectorType::kVertexId: { - auto type_id = trans_utils.GetOidTypeId(); + BOOST_LEAF_AUTO(type_id, trans_utils.GetOidTypeId()); if (comm_spec.fid() == 0) { *arc << static_cast(type_id); *arc << total_num; @@ -263,7 +263,7 @@ class VertexPropertyContextWrapper : public IVertexPropertyContextWrapper { switch (selector.type()) { case SelectorType::kVertexId: { - auto type_id = trans_utils.GetOidTypeId(); + BOOST_LEAF_AUTO(type_id, trans_utils.GetOidTypeId()); if (comm_spec.fid() == 0) { *arc << static_cast(type_id); } diff --git a/analytical_engine/core/fragment/dynamic_fragment.h b/analytical_engine/core/fragment/dynamic_fragment.h index 312f083a4ee6..a297a74625a1 100644 --- a/analytical_engine/core/fragment/dynamic_fragment.h +++ b/analytical_engine/core/fragment/dynamic_fragment.h @@ -687,117 +687,6 @@ class DynamicFragment const dynamic::Value& GetSchema() { return schema_; } - auto CollectPropertyKeysOnVertices() - -> bl::result> { - std::map prop_keys; - - for (const auto& v : InnerVertices()) { - auto& data = ivdata_[v.GetValue()]; - - for (auto member = data.MemberBegin(); member != data.MemberEnd(); - ++member) { - std::string s_k = member->name.GetString(); - - if (prop_keys.find(s_k) == prop_keys.end()) { - prop_keys[s_k] = dynamic::GetType(member->value); - } else { - auto seen_type = prop_keys[s_k]; - auto curr_type = dynamic::GetType(member->value); - - if (seen_type != curr_type) { - std::stringstream ss; - ss << "OID: " << GetId(v) << " has key " << s_k << " with type " - << curr_type << " but previous type is: " << seen_type; - RETURN_GS_ERROR(vineyard::ErrorCode::kDataTypeError, ss.str()); - } - } - } - } - - return prop_keys; - } - - auto CollectPropertyKeysOnEdges() - -> bl::result> { - std::map prop_keys; - - auto extract_keys = [this, &prop_keys]( - const vertex_t& u, - const adj_list_t& es) -> bl::result { - for (auto& e : es) { - auto& data = e.data; - - for (auto member = data.MemberBegin(); member != data.MemberEnd(); - ++member) { - std::string s_k = member->name.GetString(); - - if (prop_keys.find(s_k) == prop_keys.end()) { - prop_keys[s_k] = dynamic::GetType(member->value); - } else { - auto seen_type = prop_keys[s_k]; - auto curr_type = dynamic::GetType(member->value); - - if (seen_type != curr_type) { - std::stringstream ss; - ss << "Edge (OID): " << GetId(u) << " " << GetId(e.neighbor) - << " has key " << s_k << " with type " << curr_type - << " but previous type is: " << seen_type; - RETURN_GS_ERROR(vineyard::ErrorCode::kDataTypeError, ss.str()); - } - } - } - } - return {}; - }; - - for (const auto& v : InnerVertices()) { - if (load_strategy_ == grape::LoadStrategy::kOnlyIn || - load_strategy_ == grape::LoadStrategy::kBothOutIn) { - auto es = this->GetIncomingAdjList(v); - if (es.NotEmpty()) { - BOOST_LEAF_CHECK(extract_keys(v, es)); - } - } - - if (load_strategy_ == grape::LoadStrategy::kOnlyOut || - load_strategy_ == grape::LoadStrategy::kBothOutIn) { - auto es = this->GetOutgoingAdjList(v); - if (es.NotEmpty()) { - BOOST_LEAF_CHECK(extract_keys(v, es)); - } - } - } - - return prop_keys; - } - - bl::result GetOidType(const grape::CommSpec& comm_spec) const { - auto oid_type = dynamic::Type::kNullType; - if (this->alive_ivnum_ > 0) { - // Get first alive vertex oid type. - for (vid_t lid = 0; lid < ivnum_; ++lid) { - if (iv_alive_.get_bit(lid)) { - oid_t oid; - vm_ptr_->GetOid(fid_, lid, oid); - oid_type = dynamic::GetType(oid); - } - } - } - grape::Communicator comm; - dynamic::Type max_type; - comm.InitCommunicator(comm_spec.comm()); - comm.Max(oid_type, max_type); - - if (max_type != dynamic::Type::kInt64Type && - max_type != dynamic::Type::kDoubleType && - max_type != dynamic::Type::kStringType && - max_type != dynamic::Type::kNullType) { - LOG(ERROR) << "Unsupported oid type."; - return dynamic::Type::kNullType; - } - return max_type; - } - public: using base_t::GetOutgoingAdjList; inline adj_list_t GetIncomingAdjList(const vertex_t& v) override { diff --git a/analytical_engine/core/fragment/dynamic_projected_fragment.h b/analytical_engine/core/fragment/dynamic_projected_fragment.h index e6ebc67072db..cfd59f3aafeb 100644 --- a/analytical_engine/core/fragment/dynamic_projected_fragment.h +++ b/analytical_engine/core/fragment/dynamic_projected_fragment.h @@ -577,10 +577,6 @@ class DynamicProjectedFragment { return fragment_->HasNode(node); } - bl::result GetOidType(const grape::CommSpec& comm_spec) const { - return fragment_->GetOidType(comm_spec); - } - private: fragment_t* fragment_; std::string v_prop_key_; @@ -816,10 +812,6 @@ class DynamicProjectedFragment { return fragment_->HasNode(node); } - bl::result GetOidType(const grape::CommSpec& comm_spec) const { - return fragment_->GetOidType(comm_spec); - } - private: fragment_t* fragment_; std::string v_prop_key_; diff --git a/analytical_engine/core/loader/dynamic_to_arrow_converter.h b/analytical_engine/core/loader/dynamic_to_arrow_converter.h index 0e0c79b9ce95..1544e4715148 100644 --- a/analytical_engine/core/loader/dynamic_to_arrow_converter.h +++ b/analytical_engine/core/loader/dynamic_to_arrow_converter.h @@ -632,16 +632,16 @@ class DynamicToArrowConverter { const std::shared_ptr& src_frag) { std::vector> schema_vector; std::vector> arrays; - // TODO(weibin): Replace with schema of DynamicFragment. - BOOST_LEAF_AUTO(prop_keys, src_frag->CollectPropertyKeysOnVertices()); + const auto& vertex_schema = src_frag->GetSchema()["vertex"]; // build schema and array - for (const auto& p : prop_keys) { - auto key = p.first; - auto type = p.second; + for (const auto& p : vertex_schema.GetObject()) { + std::string key = p.name.GetString(); + int type = p.value.GetInt(); + LOG(INFO) << key << " got type " << p.value.GetInt() << " " << type; switch (type) { - case dynamic::Type::kInt64Type: { + case rpc::graph::DataTypePb::LONG: { auto r = VertexArrayBuilder::build(src_frag, key); BOOST_LEAF_AUTO(array, r); @@ -649,7 +649,7 @@ class DynamicToArrowConverter { arrays.push_back(array); break; } - case dynamic::Type::kDoubleType: { + case rpc::graph::DataTypePb::DOUBLE: { auto r = VertexArrayBuilder::build(src_frag, key); BOOST_LEAF_AUTO(array, r); @@ -657,7 +657,7 @@ class DynamicToArrowConverter { arrays.push_back(array); break; } - case dynamic::Type::kStringType: { + case rpc::graph::DataTypePb::STRING: { auto r = VertexArrayBuilder::build(src_frag, key); @@ -668,7 +668,7 @@ class DynamicToArrowConverter { } default: RETURN_GS_ERROR(vineyard::ErrorCode::kDataTypeError, - "Unsupported dynamic type: " + std::to_string(type)); + "Unsupported type: " + std::to_string(type)); } } @@ -696,14 +696,13 @@ class DynamicToArrowConverter { CHECK_EQ(src_array->length(), dst_array->length()); std::vector> arrays{src_array, dst_array}; - BOOST_LEAF_AUTO(prop_keys, src_frag->CollectPropertyKeysOnEdges()); // build schema and array - for (const auto& e : prop_keys) { - auto key = e.first; - auto type = e.second; - + const auto& edge_schema = src_frag->GetSchema()["edge"]; + for (const auto& p : edge_schema.GetObject()) { + std::string key = p.name.GetString(); + int type = p.value.GetInt(); switch (type) { - case dynamic::Type::kInt64Type: { + case rpc::graph::DataTypePb::LONG: { auto r = EdgeArrayBuilder::build(src_frag, key); BOOST_LEAF_AUTO(array, r); @@ -712,7 +711,7 @@ class DynamicToArrowConverter { arrays.push_back(array); break; } - case dynamic::Type::kDoubleType: { + case rpc::graph::DataTypePb::DOUBLE: { auto r = EdgeArrayBuilder::build(src_frag, key); BOOST_LEAF_AUTO(array, r); @@ -720,7 +719,7 @@ class DynamicToArrowConverter { arrays.push_back(array); break; } - case dynamic::Type::kStringType: { + case rpc::graph::DataTypePb::STRING: { auto r = EdgeArrayBuilder::build(src_frag, key); @@ -731,7 +730,7 @@ class DynamicToArrowConverter { } default: RETURN_GS_ERROR(vineyard::ErrorCode::kDataTypeError, - "Unsupported dynamic type: " + std::to_string(type)); + "Unsupported type: " + std::to_string(type)); } } diff --git a/analytical_engine/core/object/fragment_wrapper.h b/analytical_engine/core/object/fragment_wrapper.h index ed933386f2a9..d0ad168ab8b9 100644 --- a/analytical_engine/core/object/fragment_wrapper.h +++ b/analytical_engine/core/object/fragment_wrapper.h @@ -534,7 +534,7 @@ class FragmentWrapper> switch (selector.type()) { case SelectorType::kVertexId: { - auto oid_type = trans_utils.GetOidTypeId(); + BOOST_LEAF_AUTO(oid_type, trans_utils.GetOidTypeId()); if (comm_spec.fid() == 0) { *arc << static_cast(oid_type); *arc << total_num; diff --git a/analytical_engine/core/utils/transform_utils.h b/analytical_engine/core/utils/transform_utils.h index 30eaf1bc44f7..68f8d1977942 100644 --- a/analytical_engine/core/utils/transform_utils.h +++ b/analytical_engine/core/utils/transform_utils.h @@ -25,6 +25,7 @@ limitations under the License. #include "boost/foreach.hpp" #include "boost/lexical_cast.hpp" +#include "grape/communication/communicator.h" #include "vineyard/basic/ds/dataframe.h" #include "vineyard/basic/ds/tensor.h" #include "vineyard/graph/fragment/fragment_traits.h" @@ -560,7 +561,7 @@ class TransformUtils::value; } + bl::result GetOidTypeId() { return vineyard::TypeToInt::value; } std::vector SelectVertices( label_id_t label_id, const std::pair& range) { @@ -786,7 +787,7 @@ class TransformUtils< explicit TransformUtils(const grape::CommSpec& comm_spec, const FRAG_T& frag) : comm_spec_(comm_spec), frag_(frag) {} - int GetOidTypeId() { return vineyard::TypeToInt::value; } + bl::result GetOidTypeId() { return vineyard::TypeToInt::value; } std::vector SelectVertices( const std::pair& range) { @@ -909,19 +910,38 @@ class TransformUtils< * @param comm_spec * @return */ - int GetOidTypeId() { - auto r = frag_.GetOidType(comm_spec_); - if (!r) { - return -1; + bl::result GetOidTypeId() { + dynamic::Type oid_type = dynamic::Type::kNullType; + auto vm_ptr = frag_.GetVertexMap(); + if (frag_.GetInnerVerticesNum() > 0) { + for (auto& v : frag_.InnerVertices()) { + if (frag_.IsAliveInnerVertex(v)) { + dynamic::Value oid; + vm_ptr->GetOid(frag_.fid(), v.GetValue(), oid); + oid_type = dynamic::GetType(oid); + break; + } + } + } + + grape::Communicator comm; + comm.InitCommunicator(comm_spec_.comm()); + std::vector gather_type; + comm.AllGather(oid_type, gather_type); + for (auto& t : gather_type) { + if (oid_type != t) { + std::stringstream ss; + ss << "Exist different oid type between fragments"; + RETURN_GS_ERROR(vineyard::ErrorCode::kDataTypeError, ss.str()); + } } - auto dynamic_type = r.value(); - if (dynamic_type == dynamic::Type::kInt64Type) { + + if (oid_type == dynamic::Type::kInt64Type) { return vineyard::TypeToInt::value; - } else if (dynamic_type == dynamic::Type::kStringType) { + } else if (oid_type == dynamic::Type::kStringType) { return vineyard::TypeToInt::value; - } else { - // if is null, return 0, means np.dtype(void) in numpy - return 0; + } else if (oid_type == dynamic::Type::kNullType) { + return vineyard::TypeToInt::value; } return -1; } @@ -952,9 +972,9 @@ class TransformUtils< bl::result> VertexIdToArrowArray() { auto inner_vertices = frag_.InnerVertices(); - BOOST_LEAF_AUTO(oid_type, frag_.GetOidType(comm_spec_)); + BOOST_LEAF_AUTO(oid_type, GetOidTypeId()); - if (oid_type == dynamic::Type::kInt64Type) { + if (oid_type == vineyard::TypeToInt::value) { typename vineyard::ConvertToArrowType::BuilderType builder; for (auto& v : inner_vertices) { ARROW_OK_OR_RAISE(builder.Append(frag_.GetId(v).GetInt64())); @@ -963,7 +983,7 @@ class TransformUtils< ret; ARROW_OK_OR_RAISE(builder.Finish(&ret)); return std::dynamic_pointer_cast(ret); - } else if (oid_type == dynamic::Type::kStringType) { + } else if (oid_type == vineyard::TypeToInt::value) { typename vineyard::ConvertToArrowType::BuilderType builder; for (auto v : inner_vertices) { ARROW_OK_OR_RAISE(builder.Append(frag_.GetId(v).GetString())); @@ -984,9 +1004,9 @@ class TransformUtils< std::vector shape{static_cast(vertices.size())}; std::vector part_idx{comm_spec_.fid()}; - BOOST_LEAF_AUTO(oid_type, frag_.GetOidType(comm_spec_)); + BOOST_LEAF_AUTO(oid_type, GetOidTypeId()); - if (oid_type == dynamic::Type::kInt64Type) { + if (oid_type == vineyard::TypeToInt::value) { auto tensor_builder = std::make_shared>( client, shape, part_idx); for (size_t i = 0; i < vertices.size(); i++) { @@ -994,7 +1014,7 @@ class TransformUtils< } return std::dynamic_pointer_cast( tensor_builder); - } else if (oid_type == dynamic::Type::kStringType) { + } else if (oid_type == vineyard::TypeToInt::value) { auto tensor_builder = std::make_shared>(client, shape, part_idx); @@ -1012,16 +1032,16 @@ class TransformUtils< bl::result VertexIdToVYTensor( vineyard::Client& client, const std::vector& vertices) { BOOST_LEAF_AUTO(base_builder, VertexIdToVYTensorBuilder(client, vertices)); - BOOST_LEAF_AUTO(oid_type, frag_.GetOidType(comm_spec_)); + BOOST_LEAF_AUTO(oid_type, GetOidTypeId()); - if (oid_type == dynamic::Type::kInt64Type) { + if (oid_type == vineyard::TypeToInt::value) { auto builder = std::dynamic_pointer_cast>( base_builder); auto tensor = builder->Seal(client); VY_OK_OR_RAISE(tensor->Persist(client)); return tensor->id(); - } else if (oid_type == dynamic::Type::kStringType) { + } else if (oid_type == vineyard::TypeToInt::value) { auto builder = std::dynamic_pointer_cast>( base_builder); diff --git a/analytical_engine/frame/property_graph_frame.cc b/analytical_engine/frame/property_graph_frame.cc index 507c8a4cdb57..17d677681b58 100644 --- a/analytical_engine/frame/property_graph_frame.cc +++ b/analytical_engine/frame/property_graph_frame.cc @@ -172,9 +172,11 @@ void ToArrowFragment( auto dynamic_frag = std::static_pointer_cast( wrapper_in->fragment()); - BOOST_LEAF_AUTO(oid_type, dynamic_frag->GetOidType(comm_spec)); + gs::TransformUtils trans_utils(comm_spec, + *dynamic_frag); + BOOST_LEAF_AUTO(oid_type, trans_utils.GetOidTypeId()); - if (oid_type == gs::dynamic::Type::kInt64Type && + if (oid_type == vineyard::TypeToInt::value && !std::is_same::value && !std::is_same::value) { RETURN_GS_ERROR(vineyard::ErrorCode::kInvalidOperationError, @@ -183,7 +185,7 @@ void ToArrowFragment( std::string(vineyard::type_name())); } - if (oid_type == gs::dynamic::Type::kStringType && + if (oid_type == vineyard::TypeToInt::value && !std::is_same::value) { RETURN_GS_ERROR(vineyard::ErrorCode::kInvalidOperationError, "The oid type of DynamicFragment is string, but the "