Skip to content

Commit

Permalink
Polymorphic ContextDAGNode to guide selector for user (#461)
Browse files Browse the repository at this point in the history
* Polymorphic ContextDAGNode to guide selector for user
* update doc
* Wraps to make docs correct.
* support graph to numpy/dataframe in lazy mode
* fix udf error and upload gie log in local ci

Signed-off-by: DongZe Li <9546726@qq.com>
Co-authored-by: Tao He <linzhu.ht@alibaba-inc.com>
  • Loading branch information
lidongze0629 and sighingnow committed Jul 5, 2021
1 parent ba47539 commit e0395fb
Show file tree
Hide file tree
Showing 36 changed files with 890 additions and 375 deletions.
7 changes: 7 additions & 0 deletions .github/workflows/local.yml
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,13 @@ jobs:
rm -rf ~/.cargo
python3 -m pytest -s -v /tmp/test_run_locally.py
- name: Upload GIE Log
if: always()
uses: actions/upload-artifact@v2
with:
name: gie-log
path: /tmp/graphscope/

- name: Setup tmate session debug
if: false
uses: mxschmitt/action-tmate@v2
2 changes: 2 additions & 0 deletions coordinator/gscoordinator/dag_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ class DAGManager(object):
types_pb2.RUN_APP, # need loaded app
types_pb2.CONTEXT_TO_NUMPY, # need loaded graph to transform selector
types_pb2.CONTEXT_TO_DATAFRAME, # need loaded graph to transform selector
types_pb2.GRAPH_TO_NUMPY, # need loaded graph to transform selector
types_pb2.GRAPH_TO_DATAFRAME, # need loaded graph to transform selector
types_pb2.TO_VINEYARD_TENSOR, # need loaded graph to transform selector
types_pb2.TO_VINEYARD_DATAFRAME, # need loaded graph to transform selector
types_pb2.PROJECT_GRAPH, # need loaded graph to transform selector
Expand Down
171 changes: 167 additions & 4 deletions coordinator/gscoordinator/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,6 @@
import yaml
from graphscope.framework import utils
from graphscope.framework.graph_schema import GraphSchema
from graphscope.framework.utils import transform_labeled_vertex_data_selector
from graphscope.framework.utils import transform_labeled_vertex_property_data_selector
from graphscope.framework.utils import transform_vertex_data_selector
from graphscope.framework.utils import transform_vertex_property_data_selector
from graphscope.proto import attr_value_pb2
from graphscope.proto import graph_def_pb2
from graphscope.proto import op_def_pb2
Expand Down Expand Up @@ -363,6 +359,8 @@ def op_pre_process(op, op_result_pool, key_to_op):
types_pb2.TO_VINEYARD_DATAFRAME,
):
_pre_process_for_context_op(op, op_result_pool, key_to_op)
if op.op in (types_pb2.GRAPH_TO_NUMPY, types_pb2.GRAPH_TO_DATAFRAME):
_pre_process_for_output_graph_op(op, op_result_pool, key_to_op)
if op.op == types_pb2.UNLOAD_APP:
_pre_process_for_unload_app_op(op, op_result_pool, key_to_op)

Expand Down Expand Up @@ -522,6 +520,30 @@ def __backtrack_key_of_graph_op(key):
)


def _pre_process_for_output_graph_op(op, op_result_pool, key_to_op):
assert len(op.parents) == 1
key_of_parent_op = op.parents[0]
r = op_result_pool[key_of_parent_op]
schema = GraphSchema()
schema.from_graph_def(r.graph_def)
graph_name = r.graph_def.key
selector = op.attr[types_pb2.SELECTOR].s.decode("utf-8")
if op.op == types_pb2.GRAPH_TO_DATAFRAME:
selector = _tranform_dataframe_selector(
"labeled_vertex_property", schema, selector
)
else:
# to numpy
selector = _tranform_numpy_selector("labeled_vertex_property", schema, selector)
if selector is not None:
op.attr[types_pb2.SELECTOR].CopyFrom(
attr_value_pb2.AttrValue(s=selector.encode("utf-8"))
)
op.attr[types_pb2.GRAPH_NAME].CopyFrom(
attr_value_pb2.AttrValue(s=graph_name.encode("utf-8"))
)


def _pre_process_for_project_to_simple_op(op, op_result_pool, key_to_op):
# for nx graph
if op.attr[types_pb2.GRAPH_TYPE].graph_type == graph_def_pb2.DYNAMIC_PROJECTED:
Expand Down Expand Up @@ -687,6 +709,147 @@ def _tranform_dataframe_selector(context_type, schema, selector):
return json.dumps(selector)


def _transform_vertex_data_v(selector):
if selector not in ("v.id", "v.data"):
raise SyntaxError("selector of v must be 'id' or 'data'")
return selector


def _transform_vertex_data_e(selector):
if selector not in ("e.src", "e.dst", "e.data"):
raise SyntaxError("selector of e must be 'src', 'dst' or 'data'")
return selector


def _transform_vertex_data_r(selector):
if selector != "r":
raise SyntaxError("selector of r must be 'r'")
return selector


def _transform_vertex_property_data_r(selector):
# The second part of selector or r is user defined name.
# So we will allow any str
return selector


def _transform_labeled_vertex_data_v(schema, label, prop):
label_id = schema.get_vertex_label_id(label)
if prop == "id":
return "label{}.{}".format(label_id, prop)
else:
prop_id = schema.get_vertex_property_id(label, prop)
return "label{}.property{}".format(label_id, prop_id)


def _transform_labeled_vertex_data_e(schema, label, prop):
label_id = schema.get_edge_label_id(label)
if prop in ("src", "dst"):
return "label{}.{}".format(label_id, prop)
else:
prop_id = schema.get_vertex_property_id(label, prop)
return "label{}.property{}".format(label_id, prop_id)


def _transform_labeled_vertex_data_r(schema, label):
label_id = schema.get_vertex_label_id(label)
return "label{}".format(label_id)


def _transform_labeled_vertex_property_data_r(schema, label, prop):
label_id = schema.get_vertex_label_id(label)
return "label{}.{}".format(label_id, prop)


def transform_vertex_data_selector(selector):
"""Optional values:
vertex selector: 'v.id', 'v.data'
edge selector: 'e.src', 'e.dst', 'e.data'
result selector: 'r'
"""
if selector is None:
raise RuntimeError("selector cannot be None")
segments = selector.split(".")
if len(segments) > 2:
raise SyntaxError("Invalid selector: %s." % selector)
if segments[0] == "v":
selector = _transform_vertex_data_v(selector)
elif segments[0] == "e":
selector = _transform_vertex_data_e(selector)
elif segments[0] == "r":
selector = _transform_vertex_data_r(selector)
else:
raise SyntaxError("Invalid selector: %s, choose from v / e / r." % selector)
return selector


def transform_vertex_property_data_selector(selector):
"""Optional values:
vertex selector: 'v.id', 'v.data'
edge selector: 'e.src', 'e.dst', 'e.data'
result selector format: 'r.y', y denotes property name.
"""
if selector is None:
raise RuntimeError("selector cannot be None")
segments = selector.split(".")
if len(segments) != 2:
raise SyntaxError("Invalid selector: %s." % selector)
if segments[0] == "v":
selector = _transform_vertex_data_v(selector)
elif segments[0] == "e":
selector = _transform_vertex_data_e(selector)
elif segments[0] == "r":
selector = _transform_vertex_property_data_r(selector)
else:
raise SyntaxError("Invalid selector: %s, choose from v / e / r." % selector)
return selector


def transform_labeled_vertex_data_selector(schema, selector):
"""Formats: 'v:x.y/id', 'e:x.y/src/dst', 'r:label',
x denotes label name, y denotes property name.
Returned selector will change label name to 'label{id}', where id is x's id in labels.
And change property name to 'property{id}', where id is y's id in properties.
"""
if selector is None:
raise RuntimeError("selector cannot be None")

ret_type, segments = selector.split(":")
if ret_type not in ("v", "e", "r"):
raise SyntaxError("Invalid selector: " + selector)
segments = segments.split(".")
ret = ""
if ret_type == "v":
ret = _transform_labeled_vertex_data_v(schema, *segments)
elif ret_type == "e":
ret = _transform_labeled_vertex_data_e(schema, *segments)
elif ret_type == "r":
ret = _transform_labeled_vertex_data_r(schema, *segments)
return "{}:{}".format(ret_type, ret)


def transform_labeled_vertex_property_data_selector(schema, selector):
"""Formats: 'v:x.y/id', 'e:x.y/src/dst', 'r:x.y',
x denotes label name, y denotes property name.
Returned selector will change label name to 'label{id}', where id is x's id in labels.
And change property name to 'property{id}', where id is y's id in properties.
"""
if selector is None:
raise RuntimeError("selector cannot be None")
ret_type, segments = selector.split(":")
if ret_type not in ("v", "e", "r"):
raise SyntaxError("Invalid selector: " + selector)
segments = segments.split(".")
ret = ""
if ret_type == "v":
ret = _transform_labeled_vertex_data_v(schema, *segments)
elif ret_type == "e":
ret = _transform_labeled_vertex_data_e(schema, *segments)
elif ret_type == "r":
ret = _transform_labeled_vertex_property_data_r(schema, *segments)
return "{}:{}".format(ret_type, ret)


def _extract_gar(app_dir: str, attr):
"""Extract gar to workspace
Args:
Expand Down
2 changes: 1 addition & 1 deletion docs/installation.rst
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ For quickly getting started, we set up a local Kubernetes cluster and take advan
To run GraphScope on your local computer, the following dependencies or tools are required.

- Docker
- Python 3.8 (with pip)
- Python 3.6+ (with pip)
- Local Kubernetes cluster set-up tool (e.g. `Kind <https://kind.sigs.k8s.io>`_)

On Windows and macOS, you can follow the official guides to install them and enable Kubernetes in Docker.
Expand Down
39 changes: 22 additions & 17 deletions docs/reference/app.rst
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,28 @@ AppAssets
.. autoclass:: graphscope.framework.app.AppAssets
:special-members:
:members:
:exclude-members: __call__
:exclude-members: __call__, __repr__


App object
----------

.. currentmodule:: graphscope
.. autoclass:: graphscope.framework.app.AppDAGNode
:members: unload

.. autoclass:: graphscope.framework.app.App
:special-members:
:members:
:exclude-members: __repr__, __call__, _query, __weakref__

Functions
---------
.. autosummary::
:toctree: generated/

graphscope.framework.app.load_app


BuiltIn apps
------------
Expand All @@ -30,19 +51,3 @@ BuiltIn apps
.. autofunction:: graphscope.triangles
.. autofunction:: graphscope.louvain

App object
----------

.. currentmodule:: graphscope
.. autoclass:: graphscope.framework.app.App
:special-members:
:members:
:exclude-members: __repr__, __call__, _query

Functions
---------
.. autosummary::
:toctree: generated/

graphscope.framework.app.load_app

20 changes: 13 additions & 7 deletions docs/reference/context.rst
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,22 @@ Context object
--------------
.. currentmodule:: graphscope.framework.context

.. autoclass:: BaseContext
.. autoclass:: BaseContextDAGNode
:special-members:
:members: to_numpy, to_dataframe, to_vineyard_tensor, to_vineyard_dataframe, output
:members: to_numpy, to_dataframe, to_vineyard_tensor, to_vineyard_dataframe

.. autoclass:: TensorContext
.. autoclass:: TensorContextDAGNode

.. autoclass:: VertexDataContext
.. autoclass:: VertexDataContextDAGNode

.. autoclass:: LabeledVertexDataContext
.. autoclass:: LabeledVertexDataContextDAGNode

.. autoclass:: VertexPropertyContext
.. autoclass:: VertexPropertyContextDAGNode

.. autoclass:: LabeledVertexPropertyContext
.. autoclass:: LabeledVertexPropertyContextDAGNode

.. autoclass:: Context

.. autoclass:: DynamicVertexDataContext

.. autoclass:: ResultDAGNode
4 changes: 4 additions & 0 deletions docs/reference/graph.rst
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ Graph object
------------
.. currentmodule:: graphscope.framework.graph

.. autoclass:: GraphDAGNode
:special-members: __init__
:members: add_vertices, add_edges, add_column, project, unload

.. autoclass:: Graph
:special-members: __init__
:members:
Expand Down
2 changes: 1 addition & 1 deletion docs/zh/installation.rst
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ GraphScope 被设计为运行在 Kubernetes 管理的群集上。
本地运行 GraphScope 需要预先安装以下依赖。

- Docker
- Python 3.8 (with pip)
- Python 3.6+ (with pip)
- Local Kubernetes cluster set-up tool (e.g. `Kind <https://kind.sigs.k8s.io>`_)

对于 Windows 和 MacOS 的用户,可通过官方文档来安装上述依赖, 并在Docker中开启Kubernetes功能。
Expand Down
10 changes: 6 additions & 4 deletions python/graphscope/analytical/app/bfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ def bfs(graph, src=0):
src (int, optional): Source vertex of breadth first search. Defaults to 0.
Returns:
:class:`VertexDataContext`: a context with each vertex with a distance from the source.
:class:`graphscope.framework.context.VertexDataContextDAGNode`:
A context with each vertex with a distance from the source, evaluated in eager mode.
Examples:
Expand All @@ -48,7 +49,7 @@ def bfs(graph, src=0):
s.close()
"""
return AppAssets(algo="bfs")(graph, src)
return AppAssets(algo="bfs", context="vertex_data")(graph, src)


@not_compatible_for("dynamic_property", "arrow_projected", "dynamic_projected")
Expand All @@ -60,7 +61,8 @@ def property_bfs(graph, src=0):
src (int, optional): Source vertex of breadth first search. Defaults to 0.
Returns:
:class:`LabeledVertexPropertyContext`: A context with each vertex with a distance from the source.
:class:`graphscope.framework.context.LabeledVertexDataContextDAGNode`:
A context with each vertex with a distance from the source, evaluated in eager mode.
Examples:
Expand All @@ -73,4 +75,4 @@ def property_bfs(graph, src=0):
s.close()
"""
return AppAssets(algo="property_bfs")(graph, src)
return AppAssets(algo="property_bfs", context="labeled_vertex_data")(graph, src)
5 changes: 3 additions & 2 deletions python/graphscope/analytical/app/cdlp.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ def cdlp(graph, max_round=10):
max_round (int, optional): Maximum rounds. Defaults to 10.
Returns:
:class:`VertexDataContext`: A context with each vertex assigned with a community ID.
:class:`graphscope.framework.context.VertexDataContextDAGNode`:
A context with each vertex assigned with a community ID, evaluated in eager mode.
Examples:
Expand All @@ -49,4 +50,4 @@ def cdlp(graph, max_round=10):
"""
max_round = int(max_round)
return AppAssets(algo="cdlp")(graph, max_round)
return AppAssets(algo="cdlp", context="vertex_data")(graph, max_round)
5 changes: 3 additions & 2 deletions python/graphscope/analytical/app/clustering.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ def clustering(graph):
graph (:class:`Graph`): A projected simple graph.
Returns:
:class:`VertexDataContext`: A context with each vertex assigned the computed clustering value.
:class:`graphscope.framework.context.VertexDataContextDAGNode`:
A context with each vertex assigned the computed clustering value, evaluated in eager mode.
Examples:
Expand All @@ -49,4 +50,4 @@ def clustering(graph):
s.close()
"""
return AppAssets(algo="clustering")(graph)
return AppAssets(algo="clustering", context="vertex_data")(graph)
Loading

0 comments on commit e0395fb

Please sign in to comment.