Skip to content

Commit

Permalink
Revise DynamicToArrow with existed engine schema of `DynamicFragmen…
Browse files Browse the repository at this point in the history
…t` (#1501)
  • Loading branch information
acezen committed May 6, 2022
1 parent 779c93d commit 9357e13
Show file tree
Hide file tree
Showing 9 changed files with 70 additions and 168 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ class LabeledVertexPropertyContextWrapper

switch (selector.type()) {
case SelectorType::kVertexId: {
auto type_id = trans_utils.GetOidTypeId();
BOOST_LEAF_AUTO(type_id, trans_utils.GetOidTypeId());
if (comm_spec.fid() == 0) {
*arc << static_cast<int>(type_id);
*arc << total_num;
Expand Down Expand Up @@ -316,7 +316,7 @@ class LabeledVertexPropertyContextWrapper

switch (selector.type()) {
case SelectorType::kVertexId: {
auto type_id = trans_utils.GetOidTypeId();
BOOST_LEAF_AUTO(type_id, trans_utils.GetOidTypeId());

if (comm_spec.fid() == 0) {
*arc << static_cast<int>(type_id);
Expand Down
4 changes: 2 additions & 2 deletions analytical_engine/core/context/vertex_data_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ class VertexDataContextWrapper : public IVertexDataContextWrapper {
switch (selector.type()) {
case SelectorType::kVertexId: {
// N.B. This method must be invoked on every worker!
auto type_id = trans_utils.GetOidTypeId();
BOOST_LEAF_AUTO(type_id, trans_utils.GetOidTypeId());
if (comm_spec.fid() == 0) {
*arc << static_cast<int>(type_id);
*arc << total_num;
Expand Down Expand Up @@ -304,7 +304,7 @@ class VertexDataContextWrapper : public IVertexDataContextWrapper {
size_t old_size;
switch (selector.type()) {
case SelectorType::kVertexId: {
auto type_id = trans_utils.GetOidTypeId();
BOOST_LEAF_AUTO(type_id, trans_utils.GetOidTypeId());
if (comm_spec.fid() == 0) {
*arc << static_cast<int>(type_id);
}
Expand Down
4 changes: 2 additions & 2 deletions analytical_engine/core/context/vertex_property_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ class VertexPropertyContextWrapper : public IVertexPropertyContextWrapper {

switch (selector.type()) {
case SelectorType::kVertexId: {
auto type_id = trans_utils.GetOidTypeId();
BOOST_LEAF_AUTO(type_id, trans_utils.GetOidTypeId());
if (comm_spec.fid() == 0) {
*arc << static_cast<int>(type_id);
*arc << total_num;
Expand Down Expand Up @@ -263,7 +263,7 @@ class VertexPropertyContextWrapper : public IVertexPropertyContextWrapper {

switch (selector.type()) {
case SelectorType::kVertexId: {
auto type_id = trans_utils.GetOidTypeId();
BOOST_LEAF_AUTO(type_id, trans_utils.GetOidTypeId());
if (comm_spec.fid() == 0) {
*arc << static_cast<int>(type_id);
}
Expand Down
111 changes: 0 additions & 111 deletions analytical_engine/core/fragment/dynamic_fragment.h
Original file line number Diff line number Diff line change
Expand Up @@ -687,117 +687,6 @@ class DynamicFragment

const dynamic::Value& GetSchema() { return schema_; }

auto CollectPropertyKeysOnVertices()
-> bl::result<std::map<std::string, dynamic::Type>> {
std::map<std::string, dynamic::Type> prop_keys;

for (const auto& v : InnerVertices()) {
auto& data = ivdata_[v.GetValue()];

for (auto member = data.MemberBegin(); member != data.MemberEnd();
++member) {
std::string s_k = member->name.GetString();

if (prop_keys.find(s_k) == prop_keys.end()) {
prop_keys[s_k] = dynamic::GetType(member->value);
} else {
auto seen_type = prop_keys[s_k];
auto curr_type = dynamic::GetType(member->value);

if (seen_type != curr_type) {
std::stringstream ss;
ss << "OID: " << GetId(v) << " has key " << s_k << " with type "
<< curr_type << " but previous type is: " << seen_type;
RETURN_GS_ERROR(vineyard::ErrorCode::kDataTypeError, ss.str());
}
}
}
}

return prop_keys;
}

auto CollectPropertyKeysOnEdges()
-> bl::result<std::map<std::string, dynamic::Type>> {
std::map<std::string, dynamic::Type> prop_keys;

auto extract_keys = [this, &prop_keys](
const vertex_t& u,
const adj_list_t& es) -> bl::result<void> {
for (auto& e : es) {
auto& data = e.data;

for (auto member = data.MemberBegin(); member != data.MemberEnd();
++member) {
std::string s_k = member->name.GetString();

if (prop_keys.find(s_k) == prop_keys.end()) {
prop_keys[s_k] = dynamic::GetType(member->value);
} else {
auto seen_type = prop_keys[s_k];
auto curr_type = dynamic::GetType(member->value);

if (seen_type != curr_type) {
std::stringstream ss;
ss << "Edge (OID): " << GetId(u) << " " << GetId(e.neighbor)
<< " has key " << s_k << " with type " << curr_type
<< " but previous type is: " << seen_type;
RETURN_GS_ERROR(vineyard::ErrorCode::kDataTypeError, ss.str());
}
}
}
}
return {};
};

for (const auto& v : InnerVertices()) {
if (load_strategy_ == grape::LoadStrategy::kOnlyIn ||
load_strategy_ == grape::LoadStrategy::kBothOutIn) {
auto es = this->GetIncomingAdjList(v);
if (es.NotEmpty()) {
BOOST_LEAF_CHECK(extract_keys(v, es));
}
}

if (load_strategy_ == grape::LoadStrategy::kOnlyOut ||
load_strategy_ == grape::LoadStrategy::kBothOutIn) {
auto es = this->GetOutgoingAdjList(v);
if (es.NotEmpty()) {
BOOST_LEAF_CHECK(extract_keys(v, es));
}
}
}

return prop_keys;
}

bl::result<dynamic::Type> GetOidType(const grape::CommSpec& comm_spec) const {
auto oid_type = dynamic::Type::kNullType;
if (this->alive_ivnum_ > 0) {
// Get first alive vertex oid type.
for (vid_t lid = 0; lid < ivnum_; ++lid) {
if (iv_alive_.get_bit(lid)) {
oid_t oid;
vm_ptr_->GetOid(fid_, lid, oid);
oid_type = dynamic::GetType(oid);
}
}
}
grape::Communicator comm;
dynamic::Type max_type;
comm.InitCommunicator(comm_spec.comm());
comm.Max(oid_type, max_type);

if (max_type != dynamic::Type::kInt64Type &&
max_type != dynamic::Type::kDoubleType &&
max_type != dynamic::Type::kStringType &&
max_type != dynamic::Type::kNullType) {
LOG(ERROR) << "Unsupported oid type.";
return dynamic::Type::kNullType;
}
return max_type;
}

public:
using base_t::GetOutgoingAdjList;
inline adj_list_t GetIncomingAdjList(const vertex_t& v) override {
Expand Down
8 changes: 0 additions & 8 deletions analytical_engine/core/fragment/dynamic_projected_fragment.h
Original file line number Diff line number Diff line change
Expand Up @@ -577,10 +577,6 @@ class DynamicProjectedFragment {
return fragment_->HasNode(node);
}

bl::result<dynamic::Type> GetOidType(const grape::CommSpec& comm_spec) const {
return fragment_->GetOidType(comm_spec);
}

private:
fragment_t* fragment_;
std::string v_prop_key_;
Expand Down Expand Up @@ -816,10 +812,6 @@ class DynamicProjectedFragment<grape::EmptyType, grape::EmptyType> {
return fragment_->HasNode(node);
}

bl::result<dynamic::Type> GetOidType(const grape::CommSpec& comm_spec) const {
return fragment_->GetOidType(comm_spec);
}

private:
fragment_t* fragment_;
std::string v_prop_key_;
Expand Down
35 changes: 17 additions & 18 deletions analytical_engine/core/loader/dynamic_to_arrow_converter.h
Original file line number Diff line number Diff line change
Expand Up @@ -632,32 +632,32 @@ class DynamicToArrowConverter {
const std::shared_ptr<src_fragment_t>& src_frag) {
std::vector<std::shared_ptr<arrow::Field>> schema_vector;
std::vector<std::shared_ptr<arrow::Array>> arrays;
// TODO(weibin): Replace with schema of DynamicFragment.
BOOST_LEAF_AUTO(prop_keys, src_frag->CollectPropertyKeysOnVertices());
const auto& vertex_schema = src_frag->GetSchema()["vertex"];

// build schema and array
for (const auto& p : prop_keys) {
auto key = p.first;
auto type = p.second;
for (const auto& p : vertex_schema.GetObject()) {
std::string key = p.name.GetString();
int type = p.value.GetInt();
LOG(INFO) << key << " got type " << p.value.GetInt() << " " << type;

switch (type) {
case dynamic::Type::kInt64Type: {
case rpc::graph::DataTypePb::LONG: {
auto r = VertexArrayBuilder<arrow::Int64Builder>::build(src_frag, key);

BOOST_LEAF_AUTO(array, r);
schema_vector.push_back(arrow::field(key, arrow::int64()));
arrays.push_back(array);
break;
}
case dynamic::Type::kDoubleType: {
case rpc::graph::DataTypePb::DOUBLE: {
auto r = VertexArrayBuilder<arrow::DoubleBuilder>::build(src_frag, key);

BOOST_LEAF_AUTO(array, r);
schema_vector.push_back(arrow::field(key, arrow::float64()));
arrays.push_back(array);
break;
}
case dynamic::Type::kStringType: {
case rpc::graph::DataTypePb::STRING: {
auto r =
VertexArrayBuilder<arrow::LargeStringBuilder>::build(src_frag, key);

Expand All @@ -668,7 +668,7 @@ class DynamicToArrowConverter {
}
default:
RETURN_GS_ERROR(vineyard::ErrorCode::kDataTypeError,
"Unsupported dynamic type: " + std::to_string(type));
"Unsupported type: " + std::to_string(type));
}
}

Expand Down Expand Up @@ -696,14 +696,13 @@ class DynamicToArrowConverter {
CHECK_EQ(src_array->length(), dst_array->length());
std::vector<std::shared_ptr<arrow::Array>> arrays{src_array, dst_array};

BOOST_LEAF_AUTO(prop_keys, src_frag->CollectPropertyKeysOnEdges());
// build schema and array
for (const auto& e : prop_keys) {
auto key = e.first;
auto type = e.second;

const auto& edge_schema = src_frag->GetSchema()["edge"];
for (const auto& p : edge_schema.GetObject()) {
std::string key = p.name.GetString();
int type = p.value.GetInt();
switch (type) {
case dynamic::Type::kInt64Type: {
case rpc::graph::DataTypePb::LONG: {
auto r = EdgeArrayBuilder<arrow::Int64Builder>::build(src_frag, key);

BOOST_LEAF_AUTO(array, r);
Expand All @@ -712,15 +711,15 @@ class DynamicToArrowConverter {
arrays.push_back(array);
break;
}
case dynamic::Type::kDoubleType: {
case rpc::graph::DataTypePb::DOUBLE: {
auto r = EdgeArrayBuilder<arrow::DoubleBuilder>::build(src_frag, key);

BOOST_LEAF_AUTO(array, r);
schema_vector.push_back(arrow::field(key, arrow::float64()));
arrays.push_back(array);
break;
}
case dynamic::Type::kStringType: {
case rpc::graph::DataTypePb::STRING: {
auto r =
EdgeArrayBuilder<arrow::LargeStringBuilder>::build(src_frag, key);

Expand All @@ -731,7 +730,7 @@ class DynamicToArrowConverter {
}
default:
RETURN_GS_ERROR(vineyard::ErrorCode::kDataTypeError,
"Unsupported dynamic type: " + std::to_string(type));
"Unsupported type: " + std::to_string(type));
}
}

Expand Down
2 changes: 1 addition & 1 deletion analytical_engine/core/object/fragment_wrapper.h
Original file line number Diff line number Diff line change
Expand Up @@ -534,7 +534,7 @@ class FragmentWrapper<vineyard::ArrowFragment<OID_T, VID_T>>

switch (selector.type()) {
case SelectorType::kVertexId: {
auto oid_type = trans_utils.GetOidTypeId();
BOOST_LEAF_AUTO(oid_type, trans_utils.GetOidTypeId());
if (comm_spec.fid() == 0) {
*arc << static_cast<int>(oid_type);
*arc << total_num;
Expand Down

0 comments on commit 9357e13

Please sign in to comment.