Skip to content

Commit

Permalink
[networkx] implement deepcopy subgraph/edge_subgraph in engine, not r…
Browse files Browse the repository at this point in the history
…eturn a view (#258)
  • Loading branch information
acezen committed Apr 25, 2021
1 parent 31922c1 commit 67a18a7
Show file tree
Hide file tree
Showing 10 changed files with 451 additions and 146 deletions.
189 changes: 189 additions & 0 deletions analytical_engine/core/fragment/dynamic_fragment.h
Original file line number Diff line number Diff line change
Expand Up @@ -817,6 +817,67 @@ class DynamicFragment {
InvalidCache();
}

// induce a subgraph that contains the induced_vertices and the edges between
// those vertices or a edge subgraph that contains the induced_edges and the
// nodes incident to induced_edges.
void InduceSubgraph(
std::shared_ptr<DynamicFragment> origin,
const std::unordered_set<oid_t>& induced_vertices,
const std::vector<std::pair<oid_t, oid_t>>& induced_edges) {
// copy base elements
directed_ = origin->directed();
directed() ? load_strategy_ = grape::LoadStrategy::kBothOutIn
: load_strategy_ = grape::LoadStrategy::kOnlyOut;
fid_ = origin->fid();
fnum_ = vm_ptr_->GetFragmentNum();
calcFidBitWidth(fnum_, id_mask_, fid_offset_);

ivnum_ = vm_ptr_->GetInnerVertexSize(fid_);
ovnum_ = 0;
oenum_ = 0;
ienum_ = 0;

inner_ie_pos_.clear();
inner_oe_pos_.clear();
inner_ie_pos_.resize(ivnum_, -1);
inner_oe_pos_.resize(ivnum_, -1);
inner_vertex_alive_.resize(ivnum_, false);

vdata_.clear();
vdata_.resize(ivnum_);

// induce active edges
std::vector<edge_t> edges;
if (induced_edges.empty()) {
induceFromVertices(origin, induced_vertices, edges);
} else {
induceFromEdges(origin, induced_edges, edges);
}

{
// find out outer vertices and calculate outer vertices num
std::vector<vid_t> outer_vertices =
getOuterVerticesAndInvalidEdges(edges, load_strategy_);

grape::DistinctSort(outer_vertices);

ovgid_.resize(outer_vertices.size());
memcpy(&ovgid_[0], &outer_vertices[0],
outer_vertices.size() * sizeof(vid_t));
for (auto gid : ovgid_) {
ovg2i_.emplace(gid, ovnum_);
++ovnum_;
}
}

tvnum_ = ivnum_ + ovnum_;
alive_ovnum_ = ovnum_;
outer_vertex_alive_.resize(ovnum_, true);

AddEdges(edges, load_strategy_);
initOuterVerticesOfFragment();
}

void ClearEdges() {
// clear outer vertices.
ovgid_.clear();
Expand Down Expand Up @@ -1346,6 +1407,16 @@ class DynamicFragment {
return true;
}
}
} else if ((vid >> fid_offset_) == fid_ && Gid2Lid(uid, ulid) &&
Gid2Lid(vid, vlid) && isAlive(vlid)) {
int32_t pos;
directed() ? pos = inner_ie_pos_[vlid] : pos = inner_oe_pos_[vlid];
if (pos != -1) {
auto& es = inner_edge_space_[pos];
if (es.find(ulid) != es.end()) {
return true;
}
}
}
}
return false;
Expand Down Expand Up @@ -1380,6 +1451,36 @@ class DynamicFragment {
return false;
}

inline bool GetEdgeData(const oid_t& u, const oid_t& v, edata_t& data) {
vid_t uid, vid;
if (Oid2Gid(u, uid) && Oid2Gid(v, vid)) {
vid_t ulid, vlid;
if ((uid >> fid_offset_) == fid_ && Gid2Lid(uid, ulid) &&
Gid2Lid(vid, vlid) && isAlive(ulid)) {
auto pos = inner_oe_pos_[ulid];
if (pos != -1) {
auto& oe = inner_edge_space_[pos];
if (oe.find(vlid) != oe.end()) {
data = oe[vlid].data();
return true;
}
}
} else if ((vid >> fid_offset_) == fid_ && Gid2Lid(uid, ulid) &&
Gid2Lid(vid, vlid) && isAlive(vlid)) {
int32_t pos;
directed() ? pos = inner_ie_pos_[vlid] : pos = inner_oe_pos_[vlid];
if (pos != -1) {
auto& es = inner_edge_space_[pos];
if (es.find(ulid) != es.end()) {
data = es[ulid].data();
return true;
}
}
}
}
return false;
}

void ModifyEdges(const std::vector<std::string>& edges_to_modify,
const rpc::ModifyType modify_type) {
std::vector<internal_vertex_t> vertices;
Expand Down Expand Up @@ -2364,6 +2465,94 @@ class DynamicFragment {
}
}

// induce subgraph from induced_nodes
void induceFromVertices(std::shared_ptr<DynamicFragment>& origin,
const std::unordered_set<oid_t>& induced_vertices,
std::vector<edge_t>& edges) {
vertex_t vertex;
vid_t gid, dst_gid;
for (auto& oid : induced_vertices) {
if (origin->GetVertex(oid, vertex) && origin->IsInnerVertex(vertex)) {
// store the vertex data
CHECK(vm_ptr_->GetGid(fid_, oid, gid));
vdata_[(gid & id_mask_)] = origin->GetData(vertex);
inner_vertex_alive_[(gid & id_mask_)] = true;

for (auto& e : origin->GetOutgoingAdjList(vertex)) {
auto dst_oid = origin->GetId(e.get_neighbor());
if (induced_vertices.find(dst_oid) != induced_vertices.end()) {
CHECK(Oid2Gid(dst_oid, dst_gid));
edges.emplace_back(gid, dst_gid, e.get_data());
}
}
if (directed()) {
// filter the cross-fragment incoming edges
for (auto& e : origin->GetIncomingAdjList(vertex)) {
if (origin->IsOuterVertex(e.get_neighbor())) {
auto dst_oid = origin->GetId(e.get_neighbor());
if (induced_vertices.find(dst_oid) != induced_vertices.end()) {
CHECK(Oid2Gid(dst_oid, dst_gid));
edges.emplace_back(dst_gid, gid, e.get_data());
}
}
}
}
}
}
// since we filtering alive vertex in vertex map construct, alive_ivnum
// equal to ivnum
alive_ivnum_ = ivnum_;
}

// induce edge_subgraph from induced_edges
void induceFromEdges(
std::shared_ptr<DynamicFragment>& origin,
const std::vector<std::pair<oid_t, oid_t>>& induced_edges,
std::vector<edge_t>& edges) {
vertex_t vertex;
vid_t gid, dst_gid;
edata_t edata;
for (auto& e : induced_edges) {
const auto& src_oid = e.first;
const auto& dst_oid = e.second;
if (origin->HasEdge(src_oid, dst_oid)) {
if (vm_ptr_->GetGid(fid_, src_oid, gid)) {
// src is inner vertex
CHECK(origin->GetVertex(src_oid, vertex));
vdata_[(gid & id_mask_)] = origin->GetData(vertex);
inner_vertex_alive_[(gid & id_mask_)] = true;
CHECK(vm_ptr_->GetGid(dst_oid, dst_gid));
CHECK(origin->GetEdgeData(src_oid, dst_oid, edata));
edges.emplace_back(gid, dst_gid, edata);
if ((dst_gid >> fid_offset_) == fid_ && gid != dst_gid) {
// dst is inner vertex too
CHECK(origin->GetVertex(dst_oid, vertex));
vdata_[(dst_gid & id_mask_)] = origin->GetData(vertex);
inner_vertex_alive_[(dst_gid & id_mask_)] = true;
if (!directed_) {
edges.emplace_back(dst_gid, gid, edata);
}
}
} else if (vm_ptr_->GetGid(fid_, dst_oid, dst_gid)) {
// dst is inner vertex but src is outer vertex
CHECK(origin->GetVertex(dst_oid, vertex));
vdata_[(dst_gid & id_mask_)] = origin->GetData(vertex);
inner_vertex_alive_[(dst_gid & id_mask_)] = true;
CHECK(vm_ptr_->GetGid(src_oid, gid));
origin->GetEdgeData(src_oid, dst_oid, edata);
directed() ? edges.emplace_back(gid, dst_gid, edata)
: edges.emplace_back(dst_gid, gid, edata);
}
}
}
// init alive_ivnum with inner_vertex_alive_ array
for (size_t i = 0; i < ivnum_; ++i) {
if (inner_vertex_alive_[i]) {
++alive_ivnum_;
}
}
}

inline const char* getTypeName(folly::dynamic::Type type) const {
switch (type) {
case folly::dynamic::Type::INT64:
Expand Down
88 changes: 87 additions & 1 deletion analytical_engine/core/grape_instance.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
*/

#include <memory>
#include <unordered_set>
#include <utility>
#include <vector>

Expand Down Expand Up @@ -637,7 +638,6 @@ bl::result<rpc::GraphDef> GrapeInstance::toUnDirected(
const rpc::GSParams& params) {
#ifdef EXPERIMENTAL_ON
BOOST_LEAF_AUTO(src_graph_name, params.Get<std::string>(rpc::GRAPH_NAME));
// BOOST_LEAF_AUTO(copy_type, params.Get<std::string>(rpc::COPY_TYPE));

BOOST_LEAF_AUTO(src_wrapper,
object_manager_.GetObject<IFragmentWrapper>(src_graph_name));
Expand All @@ -653,6 +653,52 @@ bl::result<rpc::GraphDef> GrapeInstance::toUnDirected(
#endif // EXPERIMENTAL_ON
}

#ifdef EXPERIMENTAL_ON
bl::result<rpc::GraphDef> GrapeInstance::induceSubGraph(
const rpc::GSParams& params,
const std::unordered_set<typename DynamicFragment::oid_t>& induced_vertices,
const std::vector<std::pair<typename DynamicFragment::oid_t,
typename DynamicFragment::oid_t>>&
induced_edges) {
BOOST_LEAF_AUTO(src_graph_name, params.Get<std::string>(rpc::GRAPH_NAME));

BOOST_LEAF_AUTO(src_wrapper,
object_manager_.GetObject<IFragmentWrapper>(src_graph_name));
std::string sub_graph_name = "graph_" + generateId();

VLOG(1) << "Inducing subgraph from " << src_graph_name
<< ", graph name: " << sub_graph_name;

auto fragment =
std::static_pointer_cast<DynamicFragment>(src_wrapper->fragment());

auto sub_vm_ptr =
std::make_shared<typename DynamicFragment::vertex_map_t>(comm_spec_);
sub_vm_ptr->Init();
typename DynamicFragment::partitioner_t partitioner;
partitioner.Init(fragment->fnum());
typename DynamicFragment::vid_t gid;
for (auto& v : induced_vertices) {
auto fid = partitioner.GetPartitionId(v);
if (fid == fragment->fid() && fragment->HasNode(v)) {
sub_vm_ptr->AddVertex(fid, v, gid);
}
}
sub_vm_ptr->Construct();

auto sub_graph_def = src_wrapper->graph_def();
sub_graph_def.set_key(sub_graph_name);
auto sub_frag = std::make_shared<DynamicFragment>(sub_vm_ptr);
sub_frag->InduceSubgraph(fragment, induced_vertices, induced_edges);

auto wrapper = std::make_shared<FragmentWrapper<DynamicFragment>>(
sub_graph_name, sub_graph_def, sub_frag);

BOOST_LEAF_CHECK(object_manager_.PutObject(wrapper));
return wrapper->graph_def();
}
#endif // EXPERIMENTAL_ON

bl::result<void> GrapeInstance::clearEdges(const rpc::GSParams& params) {
#ifdef EXPERIMENTAL_ON
BOOST_LEAF_AUTO(graph_name, params.Get<std::string>(rpc::GRAPH_NAME));
Expand Down Expand Up @@ -898,6 +944,46 @@ bl::result<std::shared_ptr<DispatchResult>> GrapeInstance::OnReceive(
#else
RETURN_GS_ERROR(vineyard::ErrorCode::kInvalidOperationError,
"GS is built with experimental off");
#endif
break;
}
case rpc::INDUCE_SUBGRAPH: {
#ifdef EXPERIMENTAL_ON
std::unordered_set<DynamicFragment::oid_t> induced_vertices;
std::vector<std::pair<DynamicFragment::oid_t, DynamicFragment::oid_t>>
induced_edges;
auto line_parser_ptr = std::make_unique<DynamicLineParser>();
if (params.HasKey(rpc::NODES)) {
// induce subgraph from nodes.
int size = cmd.params.at(rpc::NODES).list().s_size();
induced_vertices.reserve(size);
DynamicFragment::oid_t oid;
DynamicFragment::vdata_t vdata;
for (int i = 0; i < size; ++i) {
line_parser_ptr->LineParserForVFile(
cmd.params.at(rpc::NODES).list().s(i), oid, vdata);
induced_vertices.insert(oid);
}
} else if (params.HasKey(rpc::EDGES)) {
// induce subgraph from edges.
int size = cmd.params.at(rpc::EDGES).list().s_size();
induced_edges.reserve(size);
DynamicFragment::oid_t u_oid, v_oid;
DynamicFragment::edata_t edata;
for (int i = 0; i < size; ++i) {
line_parser_ptr->LineParserForEFile(
cmd.params.at(rpc::EDGES).list().s(i), u_oid, v_oid, edata);
induced_vertices.insert(u_oid);
induced_vertices.insert(v_oid);
induced_edges.emplace_back(u_oid, v_oid);
}
}
BOOST_LEAF_AUTO(graph_def,
induceSubGraph(params, induced_vertices, induced_edges));
r->set_graph_def(graph_def);
#else
RETURN_GS_ERROR(vineyard::ErrorCode::kInvalidOperationError,
"GS is built with experimental off");
#endif
break;
}
Expand Down
12 changes: 12 additions & 0 deletions analytical_engine/core/grape_instance.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <map>
#include <memory>
#include <string>
#include <unordered_set>
#include <utility>
#include <vector>

Expand All @@ -33,6 +34,7 @@
#include "grape/worker/comm_spec.h"

#include "core/context/i_context.h"
#include "core/fragment/dynamic_fragment.h"
#include "core/object/object_manager.h"
#include "core/server/dispatcher.h"
#include "core/server/graphscope_service.h"
Expand Down Expand Up @@ -126,6 +128,16 @@ class GrapeInstance : public Subscriber {

bl::result<rpc::GraphDef> createGraphView(const rpc::GSParams& params);

#ifdef EXPERIMENTAL_ON
bl::result<rpc::GraphDef> induceSubGraph(
const rpc::GSParams& params,
const std::unordered_set<typename DynamicFragment::oid_t>&
induced_vertices,
const std::vector<std::pair<typename DynamicFragment::oid_t,
typename DynamicFragment::oid_t>>&
induced_edges);
#endif // EXPERIMENTAL_ON

bl::result<rpc::GraphDef> addLabelsToGraph(const rpc::GSParams& params);

bl::result<std::shared_ptr<grape::InArchive>> graphToNumpy(
Expand Down
1 change: 1 addition & 0 deletions proto/types.proto
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ enum OperationType {
TO_UNDIRECTED = 18; // return graph, generate undirected graph from directed graph
CLEAR_EDGES = 19; // clear edges
VIEW_GRAPH = 20;
INDUCE_SUBGRAPH = 21; // clear edges

// data
CONTEXT_TO_NUMPY = 50;
Expand Down
Loading

0 comments on commit 67a18a7

Please sign in to comment.