Skip to content

Commit

Permalink
Support running built-in app on the property graph by projecting to f…
Browse files Browse the repository at this point in the history
…lattened fragment implicitly (#1258)
  • Loading branch information
lidongze0629 committed Jan 17, 2022
1 parent 0ba3d5e commit 06e3392
Show file tree
Hide file tree
Showing 34 changed files with 712 additions and 625 deletions.
458 changes: 199 additions & 259 deletions analytical_engine/core/fragment/arrow_flattened_fragment.h

Large diffs are not rendered by default.

154 changes: 116 additions & 38 deletions coordinator/gscoordinator/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -830,67 +830,145 @@ def _pre_process_for_output_graph_op(op, op_result_pool, key_to_op, **kwargs):
)


def _pre_process_for_project_to_simple_op(op, op_result_pool, key_to_op, **kwargs):
def _pre_process_for_project_to_simple_op( # noqa: C901
op, op_result_pool, key_to_op, **kwargs
):
# for nx graph
if op.attr[types_pb2.GRAPH_TYPE].graph_type in (
graph_def_pb2.DYNAMIC_PROJECTED,
graph_def_pb2.ARROW_FLATTENED,
):
return
assert len(op.parents) == 1

def _check_v_prop_exists_in_all_v_labels(schema, prop):
exists = True
for v_label in schema.vertex_labels:
exists = exists and schema.vertex_property_exists(v_label, prop)
return exists

def _check_e_prop_exists_in_all_e_labels(schema, prop):
exists = True
for e_label in schema.edge_labels:
exists = exists and schema.edge_property_exists(e_label, prop)
return exists

# get parent graph schema
assert len(op.parents) == 1
key_of_parent_op = op.parents[0]
r = op_result_pool[key_of_parent_op]
schema = GraphSchema()
schema.from_graph_def(r.graph_def)
graph_name = r.graph_def.key
check_argument(
schema.vertex_label_num == 1,
"Cannot project to simple, vertex label number is not one.",
)
check_argument(
schema.edge_label_num == 1,
"Cannot project to simple, edge label number is not one.",
)
v_label = schema.vertex_labels[0]
e_label = schema.edge_labels[0]
relation = (v_label, v_label)
check_argument(
relation in schema.get_relationships(e_label),
f"Cannot project to simple, Graph doesn't contain such relationship: {v_label} -> {e_label} <- {v_label}.",
)
v_props = schema.get_vertex_properties(v_label)
e_props = schema.get_edge_properties(e_label)
check_argument(len(v_props) <= 1)
check_argument(len(e_props) <= 1)
v_label_id = schema.get_vertex_label_id(v_label)
e_label_id = schema.get_edge_label_id(e_label)
v_prop_id, vdata_type = (v_props[0].id, v_props[0].type) if v_props else (-1, None)
e_prop_id, edata_type = (e_props[0].id, e_props[0].type) if e_props else (-1, None)
oid_type = schema.oid_type
vid_type = schema.vid_type

if schema.vertex_label_num == 0:
raise RuntimeError(
"Failed to project to simple graph as no vertex exists in this graph."
)
if schema.edge_label_num == 0:
raise RuntimeError(
"Failed to project to simple graph as no edge exists in this graph."
)

need_flatten_graph = False
if schema.vertex_label_num > 1 or schema.edge_label_num > 1:
need_flatten_graph = True

# check and get vertex property
v_prop = op.attr[types_pb2.V_PROP_KEY].s.decode("utf-8")
if v_prop == "None":
v_prop_id = -1
v_prop_type = graph_def_pb2.NULLVALUE
if not need_flatten_graph:
# for projected graph
# if there is only one property on the label, uses this property
v_label = schema.vertex_labels[0]
if schema.vertex_properties_num(v_label) == 1:
v_prop = schema.get_vertex_properties(v_label)[0]
v_prop_id = v_prop.id
v_prop_type = v_prop.type
else:
# v_prop should exists in all labels
if not _check_v_prop_exists_in_all_v_labels(schema, v_prop):
raise RuntimeError(
"Property {0} doesn't exists in all vertex labels".format(v_prop)
)
# get vertex property id
v_prop_id = schema.get_vertex_property_id(schema.vertex_labels[0], v_prop)
# get vertex property type
v_prop_type = graph_def_pb2.NULLVALUE
v_props = schema.get_vertex_properties(schema.vertex_labels[0])
for v_prop in v_props:
if v_prop.id == v_prop_id:
v_prop_type = v_prop.type
break

# check and get edge property
e_prop = op.attr[types_pb2.E_PROP_KEY].s.decode("utf-8")
if e_prop == "None":
e_prop_id = -1
e_prop_type = graph_def_pb2.NULLVALUE
if not need_flatten_graph:
# for projected graph
# if there is only one property on the label, uses this property
e_label = schema.edge_labels[0]
if schema.edge_properties_num(e_label) == 1:
e_prop = schema.get_edge_properties(e_label)[0]
e_prop_id = e_prop.id
e_prop_type = e_prop.type
else:
# e_prop should exists in all labels
if not _check_e_prop_exists_in_all_e_labels(schema, e_prop):
raise RuntimeError(
"Property {0} doesn't exists in all edge labels".format(e_prop)
)
# get edge property id
e_prop_id = schema.get_edge_property_id(schema.edge_labels[0], e_prop)
# get edge property type
e_props = schema.get_edge_properties(schema.edge_labels[0])
e_prop_type = graph_def_pb2.NULLVALUE
for e_prop in e_props:
if e_prop.id == e_prop_id:
e_prop_type = e_prop.type
break

op.attr[types_pb2.GRAPH_NAME].CopyFrom(
attr_value_pb2.AttrValue(s=graph_name.encode("utf-8"))
)
op.attr[types_pb2.GRAPH_TYPE].CopyFrom(
utils.graph_type_to_attr(graph_def_pb2.ARROW_PROJECTED)
)
op.attr[types_pb2.V_LABEL_ID].CopyFrom(utils.i_to_attr(v_label_id))
op.attr[types_pb2.V_PROP_ID].CopyFrom(utils.i_to_attr(v_prop_id))
op.attr[types_pb2.E_LABEL_ID].CopyFrom(utils.i_to_attr(e_label_id))
op.attr[types_pb2.E_PROP_ID].CopyFrom(utils.i_to_attr(e_prop_id))
op.attr[types_pb2.OID_TYPE].CopyFrom(
utils.s_to_attr(utils.data_type_to_cpp(oid_type))
utils.s_to_attr(utils.data_type_to_cpp(schema.oid_type))
)
op.attr[types_pb2.VID_TYPE].CopyFrom(
utils.s_to_attr(utils.data_type_to_cpp(vid_type))
utils.s_to_attr(utils.data_type_to_cpp(schema.vid_type))
)
op.attr[types_pb2.V_DATA_TYPE].CopyFrom(
utils.s_to_attr(utils.data_type_to_cpp(vdata_type))
utils.s_to_attr(utils.data_type_to_cpp(v_prop_type))
)
op.attr[types_pb2.E_DATA_TYPE].CopyFrom(
utils.s_to_attr(utils.data_type_to_cpp(edata_type))
utils.s_to_attr(utils.data_type_to_cpp(e_prop_type))
)
if need_flatten_graph:
op.attr[types_pb2.GRAPH_TYPE].CopyFrom(
utils.graph_type_to_attr(graph_def_pb2.ARROW_FLATTENED)
)
op.attr[types_pb2.V_PROP_KEY].CopyFrom(utils.s_to_attr(str(v_prop_id)))
op.attr[types_pb2.E_PROP_KEY].CopyFrom(utils.s_to_attr(str(e_prop_id)))
else:
v_label = schema.vertex_labels[0]
e_label = schema.edge_labels[0]
relation = (v_label, v_label)
check_argument(
relation in schema.get_relationships(e_label),
f"Cannot project to simple, Graph doesn't contain such relationship: {v_label} -> {e_label} <- {v_label}.",
)
v_label_id = schema.get_vertex_label_id(v_label)
e_label_id = schema.get_edge_label_id(e_label)
op.attr[types_pb2.GRAPH_TYPE].CopyFrom(
utils.graph_type_to_attr(graph_def_pb2.ARROW_PROJECTED)
)
op.attr[types_pb2.V_LABEL_ID].CopyFrom(utils.i_to_attr(v_label_id))
op.attr[types_pb2.V_PROP_ID].CopyFrom(utils.i_to_attr(v_prop_id))
op.attr[types_pb2.E_LABEL_ID].CopyFrom(utils.i_to_attr(e_label_id))
op.attr[types_pb2.E_PROP_ID].CopyFrom(utils.i_to_attr(e_prop_id))


def _pre_process_for_project_op(op, op_result_pool, key_to_op, **kwargs):
Expand Down
9 changes: 9 additions & 0 deletions docs/analytics_engine.rst
Original file line number Diff line number Diff line change
Expand Up @@ -46,15 +46,24 @@ property graph or not is described in its docstring.

.. autosummary::

- :func:`average_degree_connectivity`
- :func:`average_shortest_path_length`
- :func:`attribute_assortativity_coefficient`
- :func:`bfs`
- :func:`clustering`
- :func:`degree_centrality`
- :func:`degree_assortativity_coefficient`
- :func:`eigenvector_centrality`
- :func:`hits`
- :func:`is_simple_path`
- :func:`k_core`
- :func:`k_shell`
- :func:`katz_centrality`
- :func:`louvain`
- :func:`lpa`
- :func:`numeric_assortativity_coefficient`
- :func:`pagerank`
- :func:`pagerank_nx`
- :func:`sssp`
- :func:`triangles`
- :func:`wcc`
Expand Down
9 changes: 9 additions & 0 deletions docs/zh/analytics_engine.rst
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,24 @@ GraphScope 图分析引擎内置了许多常用的图分析算法,包括连通

.. autosummary::

- :func:`average_degree_connectivity`
- :func:`average_shortest_path_length`
- :func:`attribute_assortativity_coefficient`
- :func:`bfs`
- :func:`clustering`
- :func:`degree_centrality`
- :func:`degree_assortativity_coefficient`
- :func:`eigenvector_centrality`
- :func:`hits`
- :func:`is_simple_path`
- :func:`k_core`
- :func:`k_shell`
- :func:`katz_centrality`
- :func:`louvain`
- :func:`lpa`
- :func:`numeric_assortativity_coefficient`
- :func:`pagerank`
- :func:`pagerank_nx`
- :func:`sssp`
- :func:`triangles`
- :func:`wcc`
Expand Down
2 changes: 1 addition & 1 deletion python/graphscope/analytical/app/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
from graphscope.analytical.app.lpa import lpa
from graphscope.analytical.app.lpa import lpa_u2i
from graphscope.analytical.app.pagerank import pagerank
from graphscope.analytical.app.pagerank_nx import pagerank_nx
from graphscope.analytical.app.pagerank import pagerank_nx
from graphscope.analytical.app.sssp import sssp
from graphscope.analytical.app.triangles import triangles
from graphscope.analytical.app.wcc import wcc
88 changes: 39 additions & 49 deletions python/graphscope/analytical/app/attribute_assortativity.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,81 +28,71 @@

@project_to_simple
@not_compatible_for("arrow_property")
def attribute_assortativity_coefficient(G):
def attribute_assortativity_coefficient(graph, attribute):
"""Compute assortativity for node attributes.
Assortativity measures the similarity of connections
in the graph with respect to the given attribute.
Assortativity measures the similarity of connections in the graph with
respect to the given attribute.
Parameters
----------
G : NetworkX graph
Args:
graph (:class:`graphscope.Graph`): A simple graph.
attribute (str): Node attribute key.
Returns
-------
r: float
Assortativity of graph for given attribute
Returns:
r (float): Assortativity of graph for given attribute
Examples
--------
.. code:: python
Notes:
This computes Eq. (2) in Ref. [1]_ , (trace(M)-sum(M^2))/(1-sum(M^2)),
where M is the joint probability distribution (mixing matrix)
of the specified attribute.
import graphscope as gs
sess = gs.session()
g = sess.g()
pg = g.project(vertices={"vlabel": []}, edges={"elabel": []})
r = gs.attribute_assortativity_coefficient(pg)
s.close()
References:
[1] M. E. J. Newman, Mixing patterns in networks, Physical Review E, 67 026126, 2003
Notes
-----
This computes Eq. (2) in Ref. [1]_ , (trace(M)-sum(M^2))/(1-sum(M^2)),
where M is the joint probability distribution (mixing matrix)
of the specified attribute.
Examples:
References
----------
.. [1] M. E. J. Newman, Mixing patterns in networks,
Physical Review E, 67 026126, 2003
"""
.. code:: python
>>> import graphscope
>>> from graphscope.dataset import load_modern_graph
>>> sess = graphscope.session(cluster_type="hosts", mode="eager")
>>> g = load_modern_graph(sess)
>>> g.schema
>>> c = graphscope.attribute_assortativity_coefficient(g, attribute="name")
>>> sess.close()
"""
ctx = AppAssets(algo="attribute_assortativity_coefficient", context="tensor")(
G, False
graph, False
)
return ctx.to_numpy("r", axis=0)[0]


@project_to_simple
@not_compatible_for("arrow_property")
def numeric_assortativity_coefficient(G):
def numeric_assortativity_coefficient(graph, attribute):
"""Compute assortativity for numerical node attributes.
Assortativity measures the similarity of connections
in the graph with respect to the given numeric attribute.
Parameters
----------
G : NetworkX graph
attribute : string
Node attribute key.
Args:
graph (:class:`graphscope.Graph`): A simple graph.
attribute (str): Node attribute key.
Returns
-------
r: float
Assortativity of graph for given attribute
Returns:
r (float): Assortativity of graph for given attribute
Examples
--------
.. code:: python
import graphscope as gs
sess = gs.session()
g = sess.g()
pg = g.project(vertices={"vlabel": []}, edges={"elabel": []})
r = gs.numeric_assortativity_coefficient(pg)
s.close()
>>> import graphscope
>>> from graphscope.dataset import load_modern_graph
>>> sess = graphscope.session(cluster_type="hosts", mode="eager")
>>> g = load_modern_graph(sess)
>>> g.schema
>>> c = graphscope.numeric_assortativity_coefficient(g, attribute="name")
>>> sess.close()
Notes
-----
Expand All @@ -116,6 +106,6 @@ def numeric_assortativity_coefficient(G):
"""

ctx = AppAssets(algo="attribute_assortativity_coefficient", context="tensor")(
G, True
graph, True
)
return ctx.to_numpy("r", axis=0)[0]
Loading

0 comments on commit 06e3392

Please sign in to comment.