Skip to content

Commit

Permalink
Move networkx graph schema from client to engine (#1485)
Browse files Browse the repository at this point in the history
  • Loading branch information
acezen committed May 5, 2022
1 parent 617ae63 commit db579a7
Show file tree
Hide file tree
Showing 26 changed files with 343 additions and 530 deletions.
45 changes: 44 additions & 1 deletion analytical_engine/core/fragment/dynamic_fragment.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@
#include "grape/utils/vertex_set.h"

#include "core/object/dynamic.h"
#include "core/utils/convert_utils.h"
#include "core/utils/partitioner.h"
#include "core/utils/transform_utils.h"
#include "proto/graphscope/proto/types.pb.h"

namespace gs {
Expand Down Expand Up @@ -178,6 +178,8 @@ class DynamicFragment
}
}
}

initSchema();
}

// Init an empty fragment.
Expand Down Expand Up @@ -238,6 +240,8 @@ class DynamicFragment
},
thread_num, 1);
}

initSchema();
}

using base_t::Gid2Lid;
Expand Down Expand Up @@ -535,6 +539,8 @@ class DynamicFragment
} else {
LOG(ERROR) << "Unsupported copy type: " << copy_type;
}

this->schema_.CopyFrom(source->schema_);
}

// generate directed graph from original undirected graph.
Expand All @@ -560,6 +566,8 @@ class DynamicFragment
oe_.put_edge(i, *iter);
}
}

this->schema_.CopyFrom(source->schema_);
}

// generate undirected graph from original directed graph.
Expand All @@ -585,6 +593,7 @@ class DynamicFragment
}

Mutate(mutation);
this->schema_.CopyFrom(source->schema_);
}

// induce a subgraph that contains the induced_vertices and the edges between
Expand Down Expand Up @@ -676,6 +685,8 @@ class DynamicFragment
: ov_alive_.get_bit(outerVertexLidToIndex(v.GetValue()));
}

const dynamic::Value& GetSchema() { return schema_; }

auto CollectPropertyKeysOnVertices()
-> bl::result<std::map<std::string, dynamic::Type>> {
std::map<std::string, dynamic::Type> prop_keys;
Expand Down Expand Up @@ -1472,6 +1483,12 @@ class DynamicFragment
oe_.sort_neighbors_dense(oe_degree);
}

void initSchema() {
schema_.SetObject();
schema_.Insert("vertex", dynamic::Value(rapidjson::kObjectType));
schema_.Insert("edge", dynamic::Value(rapidjson::kObjectType));
}

private:
using base_t::ivnum_;
vid_t ovnum_;
Expand All @@ -1494,13 +1511,17 @@ class DynamicFragment
// allocators for parallel convert
std::shared_ptr<std::vector<dynamic::AllocatorT>> allocators_;

dynamic::Value schema_;

using base_t::outer_vertices_of_frag_;

template <typename _vdata_t, typename _edata_t>
friend class DynamicProjectedFragment;

template <typename FRAG_T>
friend class ArrowToDynamicConverter;

friend class DynamicFragmentMutator;
};

class DynamicFragmentMutator {
Expand Down Expand Up @@ -1545,6 +1566,17 @@ class DynamicFragmentMutator {
v_fid = partitioner.GetPartitionId(oid);
if (modify_type == rpc::NX_ADD_NODES) {
vm_ptr_->AddVertex(std::move(oid), gid);
if (!v_data.Empty()) {
for (const auto& prop : v_data.GetObject()) {
if (!fragment_->schema_["vertex"].HasMember(prop.name)) {
dynamic::Value key(prop.name);
fragment_->schema_["vertex"].AddMember(
key,
dynamic::DynamicType2RpcType(dynamic::GetType(prop.value)),
dynamic::Value::allocator_);
}
}
}
if (v_fid == fid) {
mutation.vertices_to_add.emplace_back(gid, std::move(v_data));
}
Expand Down Expand Up @@ -1603,6 +1635,17 @@ class DynamicFragmentMutator {
vdata_t empty_data(rapidjson::kObjectType);
mutation.vertices_to_add.emplace_back(dst_gid, std::move(empty_data));
}
if (!e_data.Empty()) {
for (const auto& prop : e_data.GetObject()) {
if (!fragment_->schema_["edge"].HasMember(prop.name)) {
dynamic::Value key(prop.name);
fragment_->schema_["edge"].AddMember(
key,
dynamic::DynamicType2RpcType(dynamic::GetType(prop.value)),
dynamic::Value::allocator_);
}
}
}
} else {
if (!vm_ptr_->_GetGid(src_fid, src, src_gid) ||
!vm_ptr_->_GetGid(dst_fid, dst, dst_gid)) {
Expand Down
76 changes: 48 additions & 28 deletions analytical_engine/core/grape_instance.cc
Original file line number Diff line number Diff line change
Expand Up @@ -73,11 +73,9 @@ bl::result<rpc::graph::GraphDefPb> GrapeInstance::loadGraph(
using fragment_t = DynamicFragment;
using vertex_map_t = typename fragment_t::vertex_map_t;
BOOST_LEAF_AUTO(directed, params.Get<bool>(rpc::DIRECTED));
BOOST_LEAF_AUTO(distributed, params.Get<bool>(rpc::DISTRIBUTED));

VLOG(1) << "Loading graph, graph name: " << graph_name
<< ", graph type: DynamicFragment, directed: " << directed
<< ", distributed: " << distributed;
<< ", graph type: DynamicFragment, directed: " << directed;

auto vm_ptr = std::shared_ptr<vertex_map_t>(new vertex_map_t(comm_spec_));
vm_ptr->Init();
Expand All @@ -93,22 +91,13 @@ bl::result<rpc::graph::GraphDefPb> GrapeInstance::loadGraph(
graph_def.set_directed(directed);
graph_def.set_graph_type(rpc::graph::DYNAMIC_PROPERTY);
// dynamic graph doesn't have a vineyard id
gs::rpc::graph::VineyardInfoPb vy_info;
gs::rpc::graph::MutableGraphInfoPb graph_info;
if (graph_def.has_extension()) {
graph_def.extension().UnpackTo(&vy_info);
graph_def.extension().UnpackTo(&graph_info);
}
vy_info.set_vineyard_id(-1);

vy_info.set_oid_type(PropertyTypeToPb(vineyard::normalize_datatype(
vineyard::type_name<typename gs::DynamicFragment::oid_t>())));
vy_info.set_vid_type(PropertyTypeToPb(vineyard::normalize_datatype(
vineyard::type_name<typename gs::DynamicFragment::vid_t>())));
vy_info.set_vdata_type(PropertyTypeToPb(vineyard::normalize_datatype(
vineyard::type_name<typename gs::DynamicFragment::vdata_t>())));
vy_info.set_edata_type(PropertyTypeToPb(vineyard::normalize_datatype(
vineyard::type_name<typename gs::DynamicFragment::edata_t>())));
vy_info.set_property_schema_json("{}");
graph_def.mutable_extension()->PackFrom(vy_info);
graph_info.set_property_schema_json(
dynamic::Stringify(fragment->GetSchema()));
graph_def.mutable_extension()->PackFrom(graph_info);

auto wrapper = std::make_shared<FragmentWrapper<fragment_t>>(
graph_name, graph_def, fragment);
Expand Down Expand Up @@ -177,6 +166,7 @@ bl::result<void> GrapeInstance::unloadGraph(const rpc::GSParams& params) {
}
}
}
VLOG(1) << "Unloading Graph " << graph_name;
return object_manager_.RemoveObject(graph_name);
}

Expand Down Expand Up @@ -222,18 +212,19 @@ bl::result<rpc::graph::GraphDefPb> GrapeInstance::projectGraph(

bl::result<rpc::graph::GraphDefPb> GrapeInstance::projectToSimple(
const rpc::GSParams& params) {
std::string projected_id = "graph_projected_" + generateId();
std::string projected_graph_name = "graph_projected_" + generateId();
BOOST_LEAF_AUTO(graph_name, params.Get<std::string>(rpc::GRAPH_NAME));
BOOST_LEAF_AUTO(type_sig, params.Get<std::string>(rpc::TYPE_SIGNATURE));

VLOG(1) << "Projecting graph, dst graph name: " << graph_name
VLOG(1) << "Projecting graph " << graph_name
<< " to simple graph: " << projected_graph_name
<< ", type sig: " << type_sig;

BOOST_LEAF_AUTO(wrapper,
object_manager_.GetObject<IFragmentWrapper>(graph_name));
BOOST_LEAF_AUTO(projector, object_manager_.GetObject<Projector>(type_sig));
BOOST_LEAF_AUTO(projected_wrapper,
projector->Project(wrapper, projected_id, params));
projector->Project(wrapper, projected_graph_name, params));
BOOST_LEAF_CHECK(object_manager_.PutObject(projected_wrapper));

return projected_wrapper->graph_def();
Expand Down Expand Up @@ -279,13 +270,15 @@ bl::result<std::shared_ptr<grape::InArchive>> GrapeInstance::reportGraph(
return wrapper->ReportGraph(comm_spec_, params);
}

bl::result<void> GrapeInstance::modifyVertices(const rpc::GSParams& params) {
bl::result<rpc::graph::GraphDefPb> GrapeInstance::modifyVertices(
const rpc::GSParams& params) {
#ifdef NETWORKX
BOOST_LEAF_AUTO(modify_type, params.Get<rpc::ModifyType>(rpc::MODIFY_TYPE));
BOOST_LEAF_AUTO(graph_name, params.Get<std::string>(rpc::GRAPH_NAME));
BOOST_LEAF_AUTO(wrapper,
object_manager_.GetObject<IFragmentWrapper>(graph_name));
auto graph_type = wrapper->graph_def().graph_type();
auto& graph_def = wrapper->mutable_graph_def();
auto graph_type = graph_def.graph_type();

if (graph_type != rpc::graph::DYNAMIC_PROPERTY) {
RETURN_GS_ERROR(
Expand All @@ -306,21 +299,31 @@ bl::result<void> GrapeInstance::modifyVertices(const rpc::GSParams& params) {
std::static_pointer_cast<DynamicFragment>(wrapper->fragment());
DynamicFragmentMutator mutator(comm_spec_, fragment);
mutator.ModifyVertices(nodes, common_attr, modify_type);
return {};
// update schema in graph_def
gs::rpc::graph::MutableGraphInfoPb graph_info;
if (graph_def.has_extension()) {
graph_def.extension().UnpackTo(&graph_info);
}
graph_info.set_property_schema_json(
dynamic::Stringify(fragment->GetSchema()));
graph_def.mutable_extension()->PackFrom(graph_info);
return graph_def;
#else
RETURN_GS_ERROR(vineyard::ErrorCode::kUnimplementedMethod,
"GraphScope is built with NETWORKX=OFF, please recompile it "
"with NETWORKX=ON");
#endif
}

bl::result<void> GrapeInstance::modifyEdges(const rpc::GSParams& params) {
bl::result<rpc::graph::GraphDefPb> GrapeInstance::modifyEdges(
const rpc::GSParams& params) {
#ifdef NETWORKX
BOOST_LEAF_AUTO(modify_type, params.Get<rpc::ModifyType>(rpc::MODIFY_TYPE));
BOOST_LEAF_AUTO(graph_name, params.Get<std::string>(rpc::GRAPH_NAME));
BOOST_LEAF_AUTO(wrapper,
object_manager_.GetObject<IFragmentWrapper>(graph_name));
auto graph_type = wrapper->graph_def().graph_type();
auto& graph_def = wrapper->mutable_graph_def();
auto graph_type = graph_def.graph_type();

if (graph_type != rpc::graph::DYNAMIC_PROPERTY) {
RETURN_GS_ERROR(
Expand All @@ -343,12 +346,20 @@ bl::result<void> GrapeInstance::modifyEdges(const rpc::GSParams& params) {
std::static_pointer_cast<DynamicFragment>(wrapper->fragment());
DynamicFragmentMutator mutator(comm_spec_, fragment);
mutator.ModifyEdges(edges, common_attr, modify_type, weight);
// update schema in graph_def
gs::rpc::graph::MutableGraphInfoPb graph_info;
if (graph_def.has_extension()) {
graph_def.extension().UnpackTo(&graph_info);
}
graph_info.set_property_schema_json(
dynamic::Stringify(fragment->GetSchema()));
graph_def.mutable_extension()->PackFrom(graph_info);
#else
RETURN_GS_ERROR(vineyard::ErrorCode::kUnimplementedMethod,
"GraphScope is built with NETWORKX=OFF, please recompile it "
"with NETWORKX=ON");
#endif // NETWORKX
return {};
return graph_def;
}

bl::result<std::shared_ptr<grape::InArchive>> GrapeInstance::contextToNumpy(
Expand Down Expand Up @@ -968,6 +979,13 @@ bl::result<rpc::graph::GraphDefPb> GrapeInstance::induceSubGraph(
sub_graph_def.set_key(sub_graph_name);
auto sub_frag = std::make_shared<DynamicFragment>(sub_vm_ptr);
sub_frag->InduceSubgraph(fragment, induced_vertices, induced_edges);
gs::rpc::graph::MutableGraphInfoPb graph_info;
if (sub_graph_def.has_extension()) {
sub_graph_def.extension().UnpackTo(&graph_info);
}
graph_info.set_property_schema_json(
dynamic::Stringify(sub_frag->GetSchema()));
sub_graph_def.mutable_extension()->PackFrom(graph_info);

auto wrapper = std::make_shared<FragmentWrapper<DynamicFragment>>(
sub_graph_name, sub_graph_def, sub_frag);
Expand Down Expand Up @@ -1211,7 +1229,8 @@ bl::result<std::shared_ptr<DispatchResult>> GrapeInstance::OnReceive(
}
case rpc::MODIFY_VERTICES: {
#ifdef NETWORKX
BOOST_LEAF_CHECK(modifyVertices(params));
BOOST_LEAF_AUTO(graph_def, modifyVertices(params));
r->set_graph_def(graph_def);
#else
RETURN_GS_ERROR(vineyard::ErrorCode::kInvalidOperationError,
"GraphScope is built with NETWORKX=OFF, please recompile "
Expand All @@ -1221,7 +1240,8 @@ bl::result<std::shared_ptr<DispatchResult>> GrapeInstance::OnReceive(
}
case rpc::MODIFY_EDGES: {
#ifdef NETWORKX
BOOST_LEAF_CHECK(modifyEdges(params));
BOOST_LEAF_AUTO(graph_def, modifyEdges(params));
r->set_graph_def(graph_def);
#else
RETURN_GS_ERROR(vineyard::ErrorCode::kInvalidOperationError,
"GraphScope is built with NETWORKX=OFF, please recompile "
Expand Down
5 changes: 3 additions & 2 deletions analytical_engine/core/grape_instance.h
Original file line number Diff line number Diff line change
Expand Up @@ -103,9 +103,10 @@ class GrapeInstance : public Subscriber {
bl::result<rpc::graph::GraphDefPb> projectToSimple(
const rpc::GSParams& params);

bl::result<void> modifyVertices(const rpc::GSParams& params);
bl::result<rpc::graph::GraphDefPb> modifyVertices(
const rpc::GSParams& params);

bl::result<void> modifyEdges(const rpc::GSParams& params);
bl::result<rpc::graph::GraphDefPb> modifyEdges(const rpc::GSParams& params);

bl::result<void> clearEdges(const rpc::GSParams& params);

Expand Down
23 changes: 23 additions & 0 deletions analytical_engine/core/loader/arrow_to_dynamic_converter.h
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,8 @@ class ArrowToDynamicConverter {
dynamic_frag->Init(src_frag->fid(), src_frag->directed(), vertices, edges,
oe_degree, ie_degree, thread_num);

initFragmentSchema(dynamic_frag, src_frag->schema());

return dynamic_frag;
}

Expand All @@ -263,6 +265,27 @@ class ArrowToDynamicConverter {
return dynamic_id_parser_.generate_global_id(fid, offset);
}

void initFragmentSchema(std::shared_ptr<dst_fragment_t> frag,
const vineyard::PropertyGraphSchema& schema) {
// init vertex properties schema
for (label_id_t label_id = 0; label_id < schema.all_vertex_label_num();
++label_id) {
for (auto& p : schema.GetVertexPropertyListByLabel(label_id)) {
dynamic::Value key(p.first);
frag->schema_["vertex"].AddMember(key, dynamic::Str2RpcType(p.second),
dynamic::Value::allocator_);
}
}
for (label_id_t label_id = 0; label_id < schema.all_edge_label_num();
++label_id) {
for (auto& p : schema.GetEdgePropertyListByLabel(label_id)) {
dynamic::Value key(p.first);
frag->schema_["edge"].AddMember(key, dynamic::Str2RpcType(p.second),
dynamic::Value::allocator_);
}
}
}

grape::CommSpec comm_spec_;
label_id_t default_label_id_;
std::shared_ptr<typename src_fragment_t::vertex_map_t> arrow_vm_ptr_;
Expand Down
Loading

0 comments on commit db579a7

Please sign in to comment.