Skip to content

Commit

Permalink
[networkx] refact clear api and implment number_of_selfloops api (#266)
Browse files Browse the repository at this point in the history
  • Loading branch information
acezen committed Apr 29, 2021
1 parent ed97d6c commit f1c87e3
Show file tree
Hide file tree
Showing 10 changed files with 356 additions and 191 deletions.
375 changes: 223 additions & 152 deletions analytical_engine/core/fragment/dynamic_fragment.h

Large diffs are not rendered by default.

10 changes: 10 additions & 0 deletions analytical_engine/core/fragment/dynamic_fragment_reporter.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ class DynamicGraphReporter : public grape::Communicator {
case rpc::EDGE_NUM: {
return std::to_string(reportEdgeNum(fragment));
}
case rpc::SELFLOOPS_NUM: {
return std::to_string(reportSelfloopsNum(fragment));
}
case rpc::HAS_NODE: {
BOOST_LEAF_AUTO(node_in_json, params.Get<std::string>(rpc::NODE));
oid_t node_id = folly::parseJson(node_in_json, json_opts_)[0];
Expand Down Expand Up @@ -143,6 +146,13 @@ class DynamicGraphReporter : public grape::Communicator {
return total_enum;
}

inline size_t reportSelfloopsNum(std::shared_ptr<fragment_t>& fragment) {
size_t frag_selfloops_num = 0, total_selfloops_num = 0;
frag_selfloops_num = fragment->selfloops_num();
Sum(frag_selfloops_num, total_selfloops_num);
return total_selfloops_num;
}

bool hasNode(std::shared_ptr<fragment_t>& fragment, const oid_t& node) {
bool ret = false;
bool to_send = fragment->HasNode(node);
Expand Down
2 changes: 2 additions & 0 deletions analytical_engine/core/fragment/dynamic_fragment_view.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ class DynamicFragmentView : public DynamicFragment {

inline int fid_offset() const { return fragment_->fid_offset(); }

inline size_t selfloops_num() const { return fragment_->selfloops_num(); }

inline bool directed() const {
switch (view_type_) {
case FragmentViewType::DIRECTED: {
Expand Down
35 changes: 35 additions & 0 deletions analytical_engine/core/grape_instance.cc
Original file line number Diff line number Diff line change
Expand Up @@ -701,6 +701,32 @@ bl::result<rpc::GraphDef> GrapeInstance::induceSubGraph(
}
#endif // EXPERIMENTAL_ON

bl::result<void> GrapeInstance::clearGraph(const rpc::GSParams& params) {
#ifdef EXPERIMENTAL_ON
BOOST_LEAF_AUTO(graph_name, params.Get<std::string>(rpc::GRAPH_NAME));
BOOST_LEAF_AUTO(wrapper,
object_manager_.GetObject<IFragmentWrapper>(graph_name));
auto graph_type = wrapper->graph_def().graph_type();

if (graph_type != rpc::DYNAMIC_PROPERTY) {
RETURN_GS_ERROR(vineyard::ErrorCode::kInvalidValueError,
"Error graph type: " + std::to_string(graph_type) +
", graph id: " + graph_name);
}

auto vm_ptr = std::shared_ptr<DynamicFragment::vertex_map_t>(
new DynamicFragment::vertex_map_t(comm_spec_));
vm_ptr->Init();
auto fragment =
std::static_pointer_cast<DynamicFragment>(wrapper->fragment());
fragment->ClearGraph(vm_ptr);
#else
RETURN_GS_ERROR(vineyard::ErrorCode::kUnimplementedMethod,
"GS is compiled without folly");
#endif // EXPERIMENTAL_ON
return {};
}

bl::result<void> GrapeInstance::clearEdges(const rpc::GSParams& params) {
#ifdef EXPERIMENTAL_ON
BOOST_LEAF_AUTO(graph_name, params.Get<std::string>(rpc::GRAPH_NAME));
Expand Down Expand Up @@ -986,6 +1012,15 @@ bl::result<std::shared_ptr<DispatchResult>> GrapeInstance::OnReceive(
#else
RETURN_GS_ERROR(vineyard::ErrorCode::kInvalidOperationError,
"GS is built with experimental off");
#endif
break;
}
case rpc::CLEAR_GRAPH: {
#ifdef EXPERIMENTAL_ON
BOOST_LEAF_CHECK(clearGraph(params));
#else
RETURN_GS_ERROR(vineyard::ErrorCode::kInvalidOperationError,
"GS is built with experimental off");
#endif
break;
}
Expand Down
2 changes: 2 additions & 0 deletions analytical_engine/core/grape_instance.h
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@ class GrapeInstance : public Subscriber {

bl::result<void> clearEdges(const rpc::GSParams& params);

bl::result<void> clearGraph(const rpc::GSParams& params);

bl::result<std::shared_ptr<grape::InArchive>> contextToNumpy(
const rpc::GSParams& params);

Expand Down
6 changes: 4 additions & 2 deletions proto/types.proto
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,9 @@ enum OperationType {
TO_DIRECTED = 17; // return graph, generate directed graph from undirected graph
TO_UNDIRECTED = 18; // return graph, generate undirected graph from directed graph
CLEAR_EDGES = 19; // clear edges
VIEW_GRAPH = 20;
INDUCE_SUBGRAPH = 21; // clear edges
CLEAR_GRAPH = 20; // clear graph
VIEW_GRAPH = 21; // create graph view
INDUCE_SUBGRAPH = 22; // induce subgraph

// data
CONTEXT_TO_NUMPY = 50;
Expand Down Expand Up @@ -232,4 +233,5 @@ enum ReportType {
IN_DEG_BY_LOC = 16;
OUT_DEG_BY_LOC = 17;
NODES_BY_LOC = 18;
SELFLOOPS_NUM = 19;
}
65 changes: 31 additions & 34 deletions python/graphscope/experimental/nx/classes/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -246,36 +246,40 @@ def __init__(self, incoming_graph_data=None, **attr):
>>> G = nx.Graph(g) # or DiGraph, etc
"""
sess = get_default_session()
if sess is None:
raise ValueError(
"Cannot find a default session. "
"Please register a session using graphscope.session(...).as_default()"
)
self._session_id = sess.session_id
self.graph_attr_dict_factory = self.graph_attr_dict_factory
self.node_dict_factory = self.node_dict_factory
self.adjlist_dict_factory = self.adjlist_dict_factory
self.graph = self.graph_attr_dict_factory()
self._node = self.node_dict_factory(self)
self._adj = self.adjlist_dict_factory(self)

self._key = None
self._op = None
self._session_id = None
self._schema = GraphSchema()
self._schema.init_nx_schema()

if self._is_gs_graph(incoming_graph_data):
self._session_id = incoming_graph_data.session_id
else:
sess = get_default_session()
if sess is None:
raise ValueError(
"Cannot find a default session. "
"Please register a session using graphscope.session(...).as_default()"
)
self._session_id = sess.session_id

create_empty_in_engine = attr.pop(
"create_empty_in_engine", True
) # a hidden parameter
if not self.is_gs_graph(incoming_graph_data) and create_empty_in_engine:
if not self._is_gs_graph(incoming_graph_data) and create_empty_in_engine:
graph_def = empty_graph_in_engine(self, self.is_directed())
self._key = graph_def.key

self.graph_attr_dict_factory = self.graph_attr_dict_factory
self.node_dict_factory = self.node_dict_factory
self.adjlist_dict_factory = self.adjlist_dict_factory

self.graph = self.graph_attr_dict_factory()
self._node = self.node_dict_factory(self)
self._adj = self.adjlist_dict_factory(self)

# attempt to load graph with data
if incoming_graph_data is not None:
if self.is_gs_graph(incoming_graph_data):
if self._is_gs_graph(incoming_graph_data):
graph_def = from_gs_graph(incoming_graph_data, self)
self._key = graph_def.key
self._schema.init_nx_schema(incoming_graph_data.schema)
Expand All @@ -287,7 +291,7 @@ def __init__(self, incoming_graph_data=None, **attr):
self.graph.update(attr)
self._saved_signature = self.signature

def is_gs_graph(self, incoming_graph_data):
def _is_gs_graph(self, incoming_graph_data):
return (
hasattr(incoming_graph_data, "graph_type")
and incoming_graph_data.graph_type == types_pb2.ARROW_PROPERTY
Expand Down Expand Up @@ -1188,15 +1192,8 @@ def size(self, weight=None):
if weight:
return sum(d for v, d in self.degree(weight=weight)) / 2
else:
return sum(d for v, d in self.degree(weight=weight)) // 2
# TODO: make the selfloop edge number correct.
# else:
# config = dict()
# config['graph_name'] = self._graph_name
# config['graph_type'] = self._graph_type
# config['report_type'] = 'edge_num'
# op = report_graph(self, config=config)
# return int(get_default_session().run(op)) // 2
op = dag_utils.report_graph(self, types_pb2.EDGE_NUM)
return int(op.eval()) // 2

def number_of_edges(self, u=None, v=None):
"""Returns the number of edges between two nodes.
Expand Down Expand Up @@ -1245,13 +1242,16 @@ def number_of_edges(self, u=None, v=None):
"""
if u is None:
op = dag_utils.report_graph(self, types_pb2.EDGE_NUM)
return int(op.eval()) // 2
return self.size()
elif self.has_edge(u, v):
return 1
else:
return 0

def number_of_selfloops(self):
op = dag_utils.report_graph(self, types_pb2.SELFLOOPS_NUM)
return int(op.eval())

def has_edge(self, u, v):
"""Returns True if the edge (u, v) is in the graph.
Expand Down Expand Up @@ -1526,14 +1526,11 @@ def degree(self):

def clear(self):
"""Remove all nodes and edges from the graph."""
# unload graph in grape, then create a new empty graph.
op = dag_utils.unload_graph(self)
op.eval()
self.graph.clear()
self.schema.clear()
graph_def = empty_graph_in_engine(self, self.is_directed())
self._key = graph_def.key
self.schema.init_nx_schema()
op = dag_utils.clear_graph(self)
op.eval()

def clear_edges(self):
op = dag_utils.clear_edges(self)
Expand Down
24 changes: 24 additions & 0 deletions python/graphscope/experimental/nx/tests/classes/test_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,30 @@ def test_node_type(self):
assert G["n"][3.14]["weight"] == 3.14
assert G[True][False]["weight"] == True

def test_selfloops(self):
G = self.Graph()
G.add_edge(0, 0)
assert G.number_of_edges() == 1
G.add_edge(0, 1)
assert G.number_of_selfloops() == 1
G.add_edge(2, 2)
assert G.number_of_edges() == 3
assert G.number_of_selfloops() == 2
SG = G.subgraph([0, 1])
assert SG.number_of_edges() == 2
assert SG.number_of_selfloops() == 1
ESG = G.edge_subgraph([(0, 0), (2, 2)])
assert ESG.number_of_edges() == 2
assert ESG.number_of_selfloops() == 2
H = G.copy()
assert H.number_of_selfloops() == 2
Gv = G.copy(as_view=True)
assert Gv.number_of_selfloops() == 2
G.remove_node(0)
assert G.number_of_selfloops() == 1
G.remove_edge(2, 2)
assert G.number_of_selfloops() == 0


@pytest.mark.usefixtures("graphscope_session")
class TestEdgeSubgraph(_TestEdgeSubgraph):
Expand Down
22 changes: 22 additions & 0 deletions python/graphscope/framework/dag_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -559,6 +559,28 @@ def create_graph_view(graph, view_type):
return op


def clear_graph(graph):
"""Create clear graph operation for nx graph.
Args:
graph (:class:`nx.Graph`): A nx graph.
Returns:
An op to modify edges on the graph.
"""
check_argument(graph.graph_type == types_pb2.DYNAMIC_PROPERTY)
config = {
types_pb2.GRAPH_NAME: utils.s_to_attr(graph.key),
}
op = Operation(
graph.session_id,
types_pb2.CLEAR_GRAPH,
config=config,
output_types=types_pb2.GRAPH,
)
return op


def clear_edges(graph):
"""Create clear edges operation for nx graph.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,12 +123,12 @@
"source": [
"from graphscope.framework.loader import Loader\n",
"graph = graphscope.g()\n",
"graph = graph.add_vertex(Loader(\"/home/jovyan/datasets/ldbc_sample/person_0_0.csv\", delimiter=\"|\"), \"person\"\n",
" .add_edges(Loader(\"/home/jovyan/datasets/ldbc_sample/person_knows_person_0_0.csv\", delimiter=\"|\"),\n",
"graph = graph.add_vertices(Loader(\"/home/jovyan/datasets/ldbc_sample/person_0_0.csv\", delimiter=\"|\"), \"person\")\n",
"graph = graph.add_edges(Loader(\"/home/jovyan/datasets/ldbc_sample/person_knows_person_0_0.csv\", delimiter=\"|\"),\n",
" \"knows\",\n",
" src_label=\"person\",\n",
" dst_label=\"person\",\n",
" )\n",
" )\n",
"G3 = nx.Graph(graph)"
]
},
Expand Down

0 comments on commit f1c87e3

Please sign in to comment.