Skip to content

Commit

Permalink
Add unload graph/app test cases for lazy mode execution (#428)
Browse files Browse the repository at this point in the history
Add unload graph/app test cases for lazy mode execution
  • Loading branch information
lidongze0629 committed Jun 24, 2021
1 parent f3a4850 commit efb0728
Show file tree
Hide file tree
Showing 8 changed files with 316 additions and 120 deletions.
1 change: 1 addition & 0 deletions coordinator/gscoordinator/dag_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ class DAGManager(object):
types_pb2.PROJECT_TO_SIMPLE, # need loaded graph schema information
types_pb2.ADD_COLUMN, # need ctx result
types_pb2.UNLOAD_GRAPH, # need loaded graph information
types_pb2.UNLOAD_APP, # need loaded app information
]

def __init__(self, dag_def: op_def_pb2.DagDef):
Expand Down
9 changes: 9 additions & 0 deletions coordinator/gscoordinator/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,8 @@ def op_pre_process(op, op_result_pool, key_to_op):
types_pb2.TO_VINEYARD_DATAFRAME,
):
_pre_process_for_context_op(op, op_result_pool, key_to_op)
if op.op == types_pb2.UNLOAD_APP:
_pre_process_for_unload_app_op(op, op_result_pool, key_to_op)


def _pre_process_for_add_labels_op(op, op_result_pool, key_to_op):
Expand Down Expand Up @@ -440,6 +442,13 @@ def _pre_process_for_unload_graph_op(op, op_result_pool, key_to_op):
op.attr[types_pb2.VINEYARD_ID].CopyFrom(utils.i_to_attr(vy_info.vineyard_id))


def _pre_process_for_unload_app_op(op, op_result_pool, key_to_op):
assert len(op.parents) == 1
key_of_parent_op = op.parents[0]
result = op_result_pool[key_of_parent_op]
op.attr[types_pb2.APP_NAME].CopyFrom(utils.s_to_attr(result.result.decode("utf-8")))


def _pre_process_for_add_column_op(op, op_result_pool, key_to_op):
for key_of_parent_op in op.parents:
parent_op = key_to_op[key_of_parent_op]
Expand Down
34 changes: 25 additions & 9 deletions python/graphscope/framework/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,12 +113,12 @@ def wrapper(*args, **kwargs):


class AppAssets(DAGNode):
"""A class holds the bytes of the gar resource.
"""A class represents a app assert node in a DAG that holds the bytes of the gar resource.
Assets includes name (for builtin algorithm), and gar (for user defined algorithm),
and its type (one of `cpp_pie`, `cython_pie`, `cython_pregel`.
The instance of this class can be passed to init :class:`graphscope.App`.
The instance of this class can be passed to init :class:`graphscope.framework.app.AppDAGNode`.
Attributes:
algo (str): Name of the algorithm
Expand Down Expand Up @@ -148,7 +148,7 @@ def __init__(self, algo, gar=None, **kwargs):
self._op = create_app(self)

def __repr__(self) -> str:
return f"graphscope.AppAssets <type: {self._type}, algorithm: {self._algo}>"
return f"graphscope.framework.app.AppAssets <type: {self._type}, algorithm: {self._algo}>"

@property
def algo(self):
Expand Down Expand Up @@ -193,6 +193,7 @@ def signature(self):
"""Generate a signature of the app assets by its algo name (and gar resources).
Used to uniquely identify a app assets.
Returns:
str: signature of this assets
"""
Expand Down Expand Up @@ -245,7 +246,7 @@ def __call__(self, graph, *args, **kwargs):


class AppDAGNode(DAGNode):
"""App node in a DAG.
"""A class represents a app node in a DAG.
An application that can run on graphs and produce results.
Expand Down Expand Up @@ -330,8 +331,13 @@ def __call__(self, *args, **kwargs):
return ContextDAGNode(self, self._graph, *args, **kwargs)

def unload(self):
# do nothing for dag node
pass
"""Unload this app from graphscope engine.
Returns:
:class:`graphscope.framework.app.UnloadedApp`: Evaluated in eager mode.
"""
op = unload_app(self)
return UnloadedApp(self._session, op)


class App(object):
Expand Down Expand Up @@ -367,15 +373,25 @@ def signature(self):

def unload(self):
"""Unload app. Both on engine side and python side. Set the key to None."""
op = unload_app(self)
op.eval()
rlt = self._session._wrapper(self._app_node.unload())
self._key = None
self._session = None
return rlt

def __call__(self, *args, **kwargs):
return self._session._wrapper(self._app_node(*args, **kwargs))


class UnloadedApp(DAGNode):
"""Unloaded app node in a DAG."""

def __init__(self, session, op):
self._session = session
self._op = op
# add op to dag
self._session.dag.add_op(self._op)


def load_app(algo, gar=None, **kwargs):
"""Load an app from gar.
bytes orthe resource of the specified path or bytes.
Expand All @@ -387,7 +403,7 @@ def load_app(algo, gar=None, **kwargs):
str represent the path of resource.
Returns:
Instance of <graphscope.AppAssets>
Instance of <graphscope.framework.app.AppAssets>
Raises:
FileNotFoundError: File not exist.
Expand Down
6 changes: 4 additions & 2 deletions python/graphscope/framework/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@ class Dag(object):
"""

def __init__(self):
# the order in which op joins the dag, starting by 1.
self._seq = 1
# mapping from op's key to op
self._ops_by_key = dict()
self._ops_seq_by_key = dict()

Expand Down Expand Up @@ -149,7 +151,7 @@ def evaluated(self, value):

@property
def session(self):
"""Get the session that the graph belogs to."""
"""Get the session that the dag node belongs to."""
assert self._session is not None
return self._session

Expand All @@ -159,6 +161,6 @@ def session(self, value):

@property
def session_id(self):
"""Get the session id that the graph belogs to."""
"""Get the session id that the dag node belongs to."""
assert self._session is not None
return self._session.session_id
11 changes: 3 additions & 8 deletions python/graphscope/framework/dag_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -639,14 +639,14 @@ def unload_app(app):
"""Unload a loaded app.
Args:
app (:class:`App`): The app to unload.
app (:class:`AppDAGNode`): The app to unload.
Returns:
An op to unload the `app`.
"""
config = {}
op = Operation(
app._session_id,
app.session_id,
types_pb2.UNLOAD_APP,
config=config,
inputs=[app.op],
Expand All @@ -659,17 +659,12 @@ def unload_graph(graph):
"""Unload a graph.
Args:
graph (:class:`Graph`): The graph to unload.
graph (:class:`GraphDAGNode`): The graph to unload.
Returns:
An op to unload the `graph`.
"""
config = {}
if not isinstance(graph, DAGNode):
config.update({types_pb2.GRAPH_NAME: utils.s_to_attr(graph.key)})
# Dynamic graph doesn't have a vineyard id
if hasattr(graph, "vineyard_id"):
config[types_pb2.VINEYARD_ID] = utils.i_to_attr(graph.vineyard_id)
op = Operation(
graph.session_id,
types_pb2.UNLOAD_GRAPH,
Expand Down

0 comments on commit efb0728

Please sign in to comment.