diff --git a/coordinator/gscoordinator/dag_manager.py b/coordinator/gscoordinator/dag_manager.py index 4a86432b0150..274a75d88c77 100644 --- a/coordinator/gscoordinator/dag_manager.py +++ b/coordinator/gscoordinator/dag_manager.py @@ -43,6 +43,7 @@ class DAGManager(object): types_pb2.PROJECT_TO_SIMPLE, # need loaded graph schema information types_pb2.ADD_COLUMN, # need ctx result types_pb2.UNLOAD_GRAPH, # need loaded graph information + types_pb2.UNLOAD_APP, # need loaded app information ] def __init__(self, dag_def: op_def_pb2.DagDef): diff --git a/coordinator/gscoordinator/utils.py b/coordinator/gscoordinator/utils.py index 2b5078a1be07..0920ceffc5b7 100644 --- a/coordinator/gscoordinator/utils.py +++ b/coordinator/gscoordinator/utils.py @@ -363,6 +363,8 @@ def op_pre_process(op, op_result_pool, key_to_op): types_pb2.TO_VINEYARD_DATAFRAME, ): _pre_process_for_context_op(op, op_result_pool, key_to_op) + if op.op == types_pb2.UNLOAD_APP: + _pre_process_for_unload_app_op(op, op_result_pool, key_to_op) def _pre_process_for_add_labels_op(op, op_result_pool, key_to_op): @@ -440,6 +442,13 @@ def _pre_process_for_unload_graph_op(op, op_result_pool, key_to_op): op.attr[types_pb2.VINEYARD_ID].CopyFrom(utils.i_to_attr(vy_info.vineyard_id)) +def _pre_process_for_unload_app_op(op, op_result_pool, key_to_op): + assert len(op.parents) == 1 + key_of_parent_op = op.parents[0] + result = op_result_pool[key_of_parent_op] + op.attr[types_pb2.APP_NAME].CopyFrom(utils.s_to_attr(result.result.decode("utf-8"))) + + def _pre_process_for_add_column_op(op, op_result_pool, key_to_op): for key_of_parent_op in op.parents: parent_op = key_to_op[key_of_parent_op] diff --git a/python/graphscope/framework/app.py b/python/graphscope/framework/app.py index 3e06632fc10d..d7b4f440561f 100644 --- a/python/graphscope/framework/app.py +++ b/python/graphscope/framework/app.py @@ -113,12 +113,12 @@ def wrapper(*args, **kwargs): class AppAssets(DAGNode): - """A class holds the bytes of the gar resource. + """A class represents a app assert node in a DAG that holds the bytes of the gar resource. Assets includes name (for builtin algorithm), and gar (for user defined algorithm), and its type (one of `cpp_pie`, `cython_pie`, `cython_pregel`. - The instance of this class can be passed to init :class:`graphscope.App`. + The instance of this class can be passed to init :class:`graphscope.framework.app.AppDAGNode`. Attributes: algo (str): Name of the algorithm @@ -148,7 +148,7 @@ def __init__(self, algo, gar=None, **kwargs): self._op = create_app(self) def __repr__(self) -> str: - return f"graphscope.AppAssets " + return f"graphscope.framework.app.AppAssets " @property def algo(self): @@ -193,6 +193,7 @@ def signature(self): """Generate a signature of the app assets by its algo name (and gar resources). Used to uniquely identify a app assets. + Returns: str: signature of this assets """ @@ -245,7 +246,7 @@ def __call__(self, graph, *args, **kwargs): class AppDAGNode(DAGNode): - """App node in a DAG. + """A class represents a app node in a DAG. An application that can run on graphs and produce results. @@ -330,8 +331,13 @@ def __call__(self, *args, **kwargs): return ContextDAGNode(self, self._graph, *args, **kwargs) def unload(self): - # do nothing for dag node - pass + """Unload this app from graphscope engine. + + Returns: + :class:`graphscope.framework.app.UnloadedApp`: Evaluated in eager mode. + """ + op = unload_app(self) + return UnloadedApp(self._session, op) class App(object): @@ -367,15 +373,25 @@ def signature(self): def unload(self): """Unload app. Both on engine side and python side. Set the key to None.""" - op = unload_app(self) - op.eval() + rlt = self._session._wrapper(self._app_node.unload()) self._key = None self._session = None + return rlt def __call__(self, *args, **kwargs): return self._session._wrapper(self._app_node(*args, **kwargs)) +class UnloadedApp(DAGNode): + """Unloaded app node in a DAG.""" + + def __init__(self, session, op): + self._session = session + self._op = op + # add op to dag + self._session.dag.add_op(self._op) + + def load_app(algo, gar=None, **kwargs): """Load an app from gar. bytes orthe resource of the specified path or bytes. @@ -387,7 +403,7 @@ def load_app(algo, gar=None, **kwargs): str represent the path of resource. Returns: - Instance of + Instance of Raises: FileNotFoundError: File not exist. diff --git a/python/graphscope/framework/dag.py b/python/graphscope/framework/dag.py index d4c1f65d730f..395f2bcc5ba3 100644 --- a/python/graphscope/framework/dag.py +++ b/python/graphscope/framework/dag.py @@ -34,7 +34,9 @@ class Dag(object): """ def __init__(self): + # the order in which op joins the dag, starting by 1. self._seq = 1 + # mapping from op's key to op self._ops_by_key = dict() self._ops_seq_by_key = dict() @@ -149,7 +151,7 @@ def evaluated(self, value): @property def session(self): - """Get the session that the graph belogs to.""" + """Get the session that the dag node belongs to.""" assert self._session is not None return self._session @@ -159,6 +161,6 @@ def session(self, value): @property def session_id(self): - """Get the session id that the graph belogs to.""" + """Get the session id that the dag node belongs to.""" assert self._session is not None return self._session.session_id diff --git a/python/graphscope/framework/dag_utils.py b/python/graphscope/framework/dag_utils.py index c9d03a61ba92..5b6c1dc1293c 100644 --- a/python/graphscope/framework/dag_utils.py +++ b/python/graphscope/framework/dag_utils.py @@ -639,14 +639,14 @@ def unload_app(app): """Unload a loaded app. Args: - app (:class:`App`): The app to unload. + app (:class:`AppDAGNode`): The app to unload. Returns: An op to unload the `app`. """ config = {} op = Operation( - app._session_id, + app.session_id, types_pb2.UNLOAD_APP, config=config, inputs=[app.op], @@ -659,17 +659,12 @@ def unload_graph(graph): """Unload a graph. Args: - graph (:class:`Graph`): The graph to unload. + graph (:class:`GraphDAGNode`): The graph to unload. Returns: An op to unload the `graph`. """ config = {} - if not isinstance(graph, DAGNode): - config.update({types_pb2.GRAPH_NAME: utils.s_to_attr(graph.key)}) - # Dynamic graph doesn't have a vineyard id - if hasattr(graph, "vineyard_id"): - config[types_pb2.VINEYARD_ID] = utils.i_to_attr(graph.vineyard_id) op = Operation( graph.session_id, types_pb2.UNLOAD_GRAPH, diff --git a/python/graphscope/framework/graph.py b/python/graphscope/framework/graph.py index 76a814b05091..6a08864d0eaf 100644 --- a/python/graphscope/framework/graph.py +++ b/python/graphscope/framework/graph.py @@ -95,6 +95,33 @@ def load_from(cls, path, sess, **kwargs): def project(self, vertices, edges): raise NotImplementedError + def _from_nx_graph(self, g): + """Create a gs graph from a nx graph. + Args: + g (:class:`graphscope.nx.graph`): A nx graph that contains graph data. + + Raises: + RuntimeError: NX graph and gs graph not in the same session. + TypeError: Convert a graph view of nx graph to gs graph. + + Returns: :class:`graphscope.framework.operation.Operation` + that will be used to construct a :class:`graphscope.Graph` + + Examples: + .. code:: python + + >>> import graphscope as gs + >>> nx_g = gs.nx.path_graph(10) + >>> gs_g = gs.Graph(nx_g) + """ + if self.session_id != g.session_id: + raise RuntimeError( + "networkx graph and graphscope graph not in the same session." + ) + if hasattr(g, "_graph"): + raise TypeError("graph view can not convert to gs graph") + return dag_utils.dynamic_to_arrow(g) + def _from_vineyard(self, vineyard_object): """Load a graph from a already existed vineyard graph. @@ -103,7 +130,8 @@ def _from_vineyard(self, vineyard_object): or :class:`vineyard.ObjectName`): vineyard object, which represents a graph. - Returns: :class:`Operation` + Returns: + :class:`graphscope.framework.operation.Operation` """ if isinstance(vineyard_object, vineyard.Object): return self._construct_op_from_vineyard_id(vineyard_object.id) @@ -154,7 +182,33 @@ def _construct_op_of_empty_graph(self): class GraphDAGNode(DAGNode, GraphInterface): - """Graph node in a DAG.""" + """A class represents a graph node in a DAG. + + In GraphScope, all operations that generate a new graph will return + a instance of :class:`GraphDAGNode`, which will be automatically + executed by :method:`sess.run` in `eager` mode. + + The following example demonstrates its usage: + + .. code:: python + + >>> # lazy mode + >>> import graphscope as gs + >>> sess = gs.session(mode="lazy") + >>> g = sess.g() + >>> g1 = g.add_vertices("person.csv","person") + >>> print(g1) # + >>> g2 = sess.run(g1) + >>> print(g2) # + + >>> # eager mode + >>> import graphscope as gs + >>> sess = gs.session(mode="eager") + >>> g = sess.g() + >>> g1 = g.add_vertices("person.csv","person") + >>> print(g1) # + >>> g1.unload() + """ def __init__( self, @@ -170,10 +224,9 @@ def __init__( session (:class:`Session`): A graphscope session instance. incoming_data: Graph can be initialized through various type of sources, which can be one of: - - - :class:`Operation` - - :class:`nx.Graph` - - :class:`Graph` + - :class:`graphscope.framework.operation.Operation` + - :class:`graphscope.nx.Graph` + - :class:`graphscope.Graph` - :class:`vineyard.Object`, :class:`vineyard.ObjectId` or :class:`vineyard.ObjectName` oid_type: (str, optional): Type of vertex original id. Defaults to "int64". directed: (bool, optional): Directed graph or not. Defaults to True. @@ -200,10 +253,6 @@ def __init__( self._resolve_op(incoming_data) self._session.dag.add_op(self._op) - def __del__(self): - # TODO think about the DAGNode has been run. - pass - @property def v_labels(self): return self._v_labels @@ -237,6 +286,14 @@ def graph_type(self): """ return self._graph_type + def _project_to_simple(self): + check_argument(self.graph_type == graph_def_pb2.ARROW_PROPERTY) + op = dag_utils.project_arrow_property_graph_to_simple(self) + # construct dag node + graph_dag_node = GraphDAGNode(self._session, op) + graph_dag_node._base_graph = self + return graph_dag_node + def _resolve_op(self, incoming_data): # Don't import the :code:`NXGraph` in top-level statements to improve the # performance of :code:`import graphscope`. @@ -266,6 +323,21 @@ def _resolve_op(self, incoming_data): raise RuntimeError("Not supported incoming data.") def add_vertices(self, vertices, label="_", properties=None, vid_field=0): + """Add vertices to the graph, and return a new graph. + + Args: + vertices (Union[str, Loader]): Vertex data source. + label (str, optional): Vertex label name. Defaults to "_". + properties (list[str], optional): List of column names loaded as properties. Defaults to None. + vid_field (int or str, optional): Column index or property name used as id field. Defaults to 0. + + Raises: + ValueError: If the given value is invalid or conflict with current graph. + + Returns: + :class:`graphscope.framework.graph.GraphDAGNode`: + A new graph with vertex added, evaluated in eager mode. + """ if label in self._v_labels: raise ValueError(f"Label {label} already existed in graph.") if not self._v_labels and self._e_labels: @@ -307,6 +379,40 @@ def add_edges( src_field=0, dst_field=1, ): + """Add edges to the graph, and return a new graph. + + 1. Add edges to a uninitialized graph. + + i. src_label and dst_label both unspecified. In this case, current graph must + has 0 (we deduce vertex label from edge table, and set vertex label name to '_'), + or 1 vertex label (we set src_label and dst label to this). + ii. src_label and dst_label both specified and existed in current graph's vertex labels. + iii. src_label and dst_label both specified and there is no vertex labels in current graph. + we deduce all vertex labels from edge tables. + Note that you either provide all vertex labels, or let graphscope deduce all vertex labels. + We don't support mixed style. + + 2. Add edges to a existed graph. + Must add a new kind of edge label, not a new relation to builded graph. + But you can add a new relation to uninitialized part of the graph. + src_label and dst_label must be specified and existed in current graph. + + Args: + edges (Union[str, Loader]): Edge data source. + label (str, optional): Edge label name. Defaults to "_". + properties (list[str], optional): List of column names loaded as properties. Defaults to None. + src_label (str, optional): Source vertex label. Defaults to None. + dst_label (str, optional): Destination vertex label. Defaults to None. + src_field (int, optional): Column index or name used as src field. Defaults to 0. + dst_field (int, optional): Column index or name used as dst field. Defaults to 1. + + Raises: + ValueError: If the given value is invalid or conflict with current graph. + + Returns: + :class:`graphscope.framework.graph.GraphDAGNode`: + A new graph with edge added, evaluated in eager mode. + """ if src_label is None and dst_label is None: check_argument( len(self._v_labels) <= 1, @@ -407,30 +513,6 @@ def add_edges( graph_dag_node._base_graph = parent return graph_dag_node - def _from_nx_graph(self, incoming_graph): - """Create a gs graph from a nx graph. - Args: - incoming_graph (:class:`nx.graph`): A nx graph that contains graph data. - - Returns: - that will be used to construct a gs.Graph - - Raises: - TypeError: Raise Error if graph type not match. - - Examples: - >>> nx_g = nx.path_graph(10) - >>> gs_g = gs.Graph(nx_g) - """ - if self.session_id != incoming_graph.session_id: - raise RuntimeError( - "networkx graph and graphscope graph not in the same session." - ) - if hasattr(incoming_graph, "_graph"): - msg = "graph view can not convert to gs graph" - raise TypeError(msg) - return dag_utils.dynamic_to_arrow(incoming_graph) - def _backtrack_graph_dag_node_by_op_key(self, key): if self.op.key == key: return self @@ -441,6 +523,18 @@ def _backtrack_graph_dag_node_by_op_key(self, key): graph_dag_node = graph_dag_node._base_graph def add_column(self, results, selector): + """Add the results as a column to the graph. Modification rules are given by the selector. + + Args: + results (:class:`graphscope.framework.context.ContextDAGNode`): + A context that created by doing an app query on a graph, and holds the corresponding results. + selector (dict): Select results to add as column. + Format is similar to selectors in :class:`graphscope.framework.context.Context` + + Returns: + :class:`graphscope.framework.graph.GraphDAGNode`: + A new graph with new columns, evaluated in eager mode. + """ check_argument( isinstance(selector, Mapping), "selector of add column must be a dict" ) @@ -451,14 +545,35 @@ def add_column(self, results, selector): return graph_dag_node def unload(self): - # do nothing for dag node - pass + """Unload this graph from graphscope engine. + + Returns: + :class:`graphscope.framework.graph.UnloadedGraph`: Evaluated in eager mode. + """ + op = dag_utils.unload_graph(self) + return UnloadedGraph(self._session, op) def project( self, vertices: Mapping[str, Union[List[str], None]], edges: Mapping[str, Union[List[str], None]], ): + """Project a subgraph from the property graph, and return a new graph. + A graph produced by project just like a normal property graph, and can be projected further. + + Args: + vertices (dict): + key is the vertex label name, the value is a list of str, which represents the + name of properties. Specifically, it will select all properties if value is None. + Note that, the label of the vertex in all edges you want to project should be included. + edges (dict): + key is the edge label name, the value is a list of str, which represents the + name of properties. Specifically, it will select all properties if value is None. + + Returns: + :class:`graphscope.framework.graph.GraphDAGNode`: + A new graph projected from the property graph, evaluated in eager mode. + """ check_argument(self.graph_type == graph_def_pb2.ARROW_PROPERTY) op = dag_utils.project_arrow_property_graph( self, json.dumps(vertices), json.dumps(edges) @@ -468,14 +583,6 @@ def project( graph_dag_node._base_graph = self return graph_dag_node - def _project_to_simple(self): - check_argument(self.graph_type == graph_def_pb2.ARROW_PROPERTY) - op = dag_utils.project_arrow_property_graph_to_simple(self) - # construct dag node - graph_dag_node = GraphDAGNode(self._session, op) - graph_dag_node._base_graph = self - return graph_dag_node - class Graph(GraphInterface): """A class for representing metadata of a graph in the GraphScope. @@ -490,7 +597,6 @@ class Graph(GraphInterface): .. code:: python >>> import graphscope as gs - >>> from graphscope.framework.loader import Loader >>> sess = gs.session() >>> graph = sess.g() >>> graph = graph.add_vertices("person.csv","person") @@ -705,25 +811,17 @@ def unload(self): self._close_learning_instances() except Exception as e: logger.error("Failed to close learning instances: %s" % e) + rlt = None if not self._detached: - op = dag_utils.unload_graph(self) - op.eval() + rlt = self._session._wrapper(self._graph_node.unload()) self._key = None self._session = None + return rlt def _project_to_simple(self): return self._session._wrapper(self._graph_node._project_to_simple()) def add_column(self, results, selector): - """Add the results as a column to the graph. Modification rules are given by the selector. - - Args: - results (:class:`Context`): A `Context` that created by doing a query. - selector (dict): Select results to add as column. Format is similar to selectors in `Context` - - Returns: - :class:`Graph`: A new `Graph` with new columns. - """ return self._session._wrapper(self._graph_node.add_column(results, selector)) def to_numpy(self, selector, vertex_range=None): @@ -892,38 +990,6 @@ def add_edges( src_field=0, dst_field=1, ): - """Add edges to graph. - 1. Add edges to a uninitialized graph. - - i. src_label and dst_label both unspecified. In this case, current graph must - has 0 (we deduce vertex label from edge table, and set vertex label name to '_'), - or 1 vertex label (we set src_label and dst label to this). - ii. src_label and dst_label both specified and existed in current graph's vertex labels. - iii. src_label and dst_label both specified and there is no vertex labels in current graph. - we deduce all vertex labels from edge tables. - Note that you either provide all vertex labels, or let graphscope deduce all vertex labels. - We don't support mixed style. - - 2. Add edges to a existed graph. - Must add a new kind of edge label, not a new relation to builded graph. - But you can add a new relation to uninitialized part of the graph. - src_label and dst_label must be specified and existed in current graph. - - Args: - edges (Union[str, Loader]): Edge data source. - label (str, optional): Edge label name. Defaults to "_". - properties (list[str], optional): List of column names loaded as properties. Defaults to None. - src_label (str, optional): Source vertex label. Defaults to None. - dst_label (str, optional): Destination vertex label. Defaults to None. - src_field (int, optional): Column index or name used as src field. Defaults to 0. - dst_field (int, optional): Column index or name used as dst field. Defaults to 1. - - Raises: - ValueError: If the given value is invalid or conflict with current graph. - - Returns: - Graph: A new graph with edge added, not yet evaluated. - """ if not self.loaded(): raise RuntimeError("The graph is not loaded") return self._session._wrapper( @@ -940,3 +1006,13 @@ def project( if not self.loaded(): raise RuntimeError("The graph is not loaded") return self._session._wrapper(self._graph_node.project(vertices, edges)) + + +class UnloadedGraph(DAGNode): + """Unloaded graph node in a DAG.""" + + def __init__(self, session, op): + self._session = session + self._op = op + # add op to dag + self._session.dag.add_op(self._op) diff --git a/python/graphscope/framework/operation.py b/python/graphscope/framework/operation.py index 81221db42964..807c8b94d778 100644 --- a/python/graphscope/framework/operation.py +++ b/python/graphscope/framework/operation.py @@ -30,12 +30,12 @@ class Operation(object): """Represents a dag op that performs computation on tensors. - For example :code:`c = run_app(a, b)` creates an :code:`Operation` of type - "RunApp" that takes operation :code:`a` and :code:`b` as input, and produces :code:`c` - as output. + For example :code:`g2 = g1.add_vertices("path")` create an :code:`Operation` of type + "ADD_LABELS" that takes operation of :code:`g1` as input, and produces a graph dag node + :code:`g2` which contains this operation as output. - After the dag has been launched in a session, an `Operation` can - be executed by passing it to :code:`graphscope.Session.run`. + After the dag has been launched in a session, an `Operation` can be executed by + :code`op.eval()` or passing it to :code:`session.run`. """ def __init__( @@ -47,7 +47,7 @@ def __init__( config=None, query_args=None, ): - """Creates an :code:`Operation`. + """Creates an :code:`graphscope.framework.operation.Operation`. Args: op_type: :code:`types_pb2.OperationType` @@ -85,11 +85,12 @@ def __init__( @property def key(self): - """Unique key for each :code:`types_pb2.OpDef`""" + """Unique key for each :code:`op_def_pb2.OpDef`""" return self._op_def.key @property def parents(self): + """A list of :code:`graphscope.framework.operation.Operation`""" return self._parents @property @@ -111,8 +112,11 @@ def output_types(self): @property def signature(self): """Signature of its parents' signatures and its own parameters. - Used to unique identify one `Operation` with fixed configuration, if the configuration - changed, the signature will be changed accordingly. + + Used to unique identify one `Operation` with fixed configuration, + if the configuration changed, the signature will be changed accordingly. + + Note that this method has not been used. """ content = "" for op in self._parents: @@ -124,6 +128,11 @@ def is_leaf_op(self): return self._leaf def eval(self, leaf=True): + """Evaluate by :code:`sess.run`. + + Args: + leaf (bool, optional): Leaf Operation means there is no successor. + """ # NB: to void cycle import # pylint: disable=import-outside-toplevel, cyclic-import from graphscope.client.session import get_session_by_id @@ -135,9 +144,6 @@ def eval(self, leaf=True): res = sess.run(self) return res - def generate_new_key(self): - self._op_def.key = uuid.uuid4().hex - def add_parent(self, op): self._parents.append(op) self._op_def.parents.extend([op.key]) @@ -149,7 +155,10 @@ def __str__(self): return str(self.as_op_def()) def __repr__(self): - return "" % (self.type, self.key) + return "" % ( + self.type, + self.key, + ) def to_json(self): """Get json represented op.""" diff --git a/python/tests/test_lazy.py b/python/tests/test_lazy.py index 5cfe26385a53..d9a23126566a 100644 --- a/python/tests/test_lazy.py +++ b/python/tests/test_lazy.py @@ -33,6 +33,9 @@ from graphscope import property_sssp from graphscope import sssp +from graphscope.framework.app import AppAssets +from graphscope.framework.app import AppDAGNode +from graphscope.framework.errors import AnalyticalEngineInternalError from graphscope.framework.loader import Loader test_repo_dir = os.path.expandvars("${GS_TEST_DIR}") @@ -92,6 +95,91 @@ def test_vertices_omitted_form_loader(sess, student_group_e): assert g2.loaded() +def test_construct_graph_step_by_step(sess): + _g = sess.g(generate_eid=False) + g = sess.run(_g) + _g1 = g.add_vertices(f"{new_property_dir}/twitter_v_0", "v0") + g1 = sess.run(_g1) + _g2 = g1.add_vertices(f"{new_property_dir}/twitter_v_1", "v1") + g2 = sess.run(_g2) + ug = g.unload() + ug1 = g1.unload() + ug2 = g2.unload() + sess.run([ug, ug1, ug2]) + + +def test_unload_graph(sess, student_v, teacher_v, student_group_e): + # case 1 + # 1. load empty g + # 2. unload g + g = sess.g() + ug = g.unload() + assert sess.run(ug) is None + + # case 2 + g = sess.g() + g1 = g.add_vertices(student_v, "student") + g2 = g.add_vertices(teacher_v, "teacher") + ug1 = g1.unload() + ug2 = g2.unload() + assert sess.run(ug1) is None + assert sess.run(ug2) is None + + # case 3 + g = sess.g() + g1 = g.add_vertices(student_v, "student") + g2 = g1.add_vertices(teacher_v, "teacher") + g3 = g2.add_edges( + student_group_e, "group", src_label="student", dst_label="student" + ) + ug = g.unload() + ug1 = g1.unload() + ug2 = g2.unload() + ug3 = g3.unload() + sess.run([ug, ug1, ug2, ug3]) + + # case 4 + # test unload twice + g = sess.g() + ug = g.unload() + assert sess.run(ug) is None + assert sess.run(ug) is None + + +def test_error_using_unload_graph(sess, student_v): + with pytest.raises(AnalyticalEngineInternalError): + g = sess.g() + ug = g.unload() + g1 = g.add_vertices(student_v, "student") + sess.run([ug, g1]) + + +def test_unload_app(sess): + g = arrow_property_graph(sess) + + # case 1 + a1 = AppDAGNode(g, AppAssets(algo="property_sssp")) + ua1 = a1.unload() + assert sess.run(ua1) is None + + # case 2 + # unload app twice + a1 = AppDAGNode(g, AppAssets(algo="property_sssp")) + ua1 = a1.unload() + assert sess.run(ua1) is None + assert sess.run(ua1) is None + + # case 3 + # load app after unload + a1 = AppDAGNode(g, AppAssets(algo="property_sssp")) + ua1 = a1.unload() + assert sess.run(ua1) is None + c1 = a1(src=20) + r1 = c1.to_numpy("r:v0.dist_0") + r = sess.run(r1) + assert r.shape == (40521,) + + def test_context(sess): g = arrow_property_graph(sess) c = property_sssp(g, 20)