Skip to content

Commit

Permalink
Unload unreachable op after run (#916)
Browse files Browse the repository at this point in the history
* Unload unreachable op after run
  • Loading branch information
siyuan0322 committed Oct 29, 2021
1 parent 0bc8393 commit a29a964
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 32 deletions.
6 changes: 2 additions & 4 deletions analytical_engine/core/object/object_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,9 @@ class ObjectManager {
}

bl::result<void> RemoveObject(const std::string& id) {
if (objects.find(id) == objects.end()) {
RETURN_GS_ERROR(vineyard::ErrorCode::kInvalidOperationError,
"Object " + id + " does not exist");
if (objects.find(id) != objects.end()) {
objects.erase(id);
}
objects.erase(id);
return {};
}

Expand Down
38 changes: 31 additions & 7 deletions python/graphscope/client/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import atexit
import base64
import contextlib
import copy
import json
import logging
import os
Expand All @@ -31,8 +30,8 @@
import sys
import threading
import time
import uuid
import warnings
from queue import Empty as EmptyQueue

try:
from kubernetes import client as kube_client
Expand All @@ -51,13 +50,10 @@
from graphscope.deploy.hosts.cluster import HostsClusterLauncher
from graphscope.deploy.kubernetes.cluster import KubernetesClusterLauncher
from graphscope.framework.dag import Dag
from graphscope.framework.errors import ConnectionError
from graphscope.framework.errors import FatalError
from graphscope.framework.errors import GRPCError
from graphscope.framework.errors import InteractiveEngineInternalError
from graphscope.framework.errors import InvalidArgumentError
from graphscope.framework.errors import K8sError
from graphscope.framework.errors import check_argument
from graphscope.framework.graph import Graph
from graphscope.framework.graph import GraphDAGNode
from graphscope.framework.operation import Operation
Expand Down Expand Up @@ -187,7 +183,7 @@ def _rebuild_gremlin_results(
result_set_dag_node = self._fetches[seq]
return ResultSet(result_set_dag_node)

def wrapper_results(self, response: message_pb2.RunStepResponse):
def wrap_results(self, response: message_pb2.RunStepResponse):
rets = list()
for seq, op in enumerate(self._ops):
for op_result in response.results:
Expand Down Expand Up @@ -235,6 +231,26 @@ def wrapper_results(self, response: message_pb2.RunStepResponse):
break
return rets[0] if self._unpack else rets

def get_dag_for_unload(self):
"""Unload operations (graph, app, context) in dag which are not
existed in fetches.
"""
unload_dag = op_def_pb2.DagDef()
keys_of_fetches = set([op.key for op in self._ops])
mapping = {
types_pb2.CREATE_GRAPH: types_pb2.UNLOAD_GRAPH,
types_pb2.CREATE_APP: types_pb2.UNLOAD_APP,
types_pb2.RUN_APP: types_pb2.UNLOAD_CONTEXT,
}
for op_def in self._sub_dag.op:
if op_def.op in mapping and op_def.key not in keys_of_fetches:
unload_op_def = op_def_pb2.OpDef(
op=mapping[op_def.op], key=uuid.uuid4().hex
)
unload_op_def.parents.extend([op_def.key])
unload_dag.op.extend([unload_op_def])
return unload_dag


class Session(object):
"""A class for interacting with GraphScope graph computation service cluster.
Expand Down Expand Up @@ -936,7 +952,15 @@ def run(self, fetches, debug=False):
except FatalError:
self.close()
raise
return fetch_handler.wrapper_results(response)
if not self.eager():
# Unload operations that cannot be touched anymore
dag_to_unload = fetch_handler.get_dag_for_unload()
try:
self._grpc_client.run(dag_to_unload)
except FatalError:
self.close()
raise
return fetch_handler.wrap_results(response)

def _connect(self):
if self._config_params["addr"] is not None:
Expand Down
35 changes: 14 additions & 21 deletions python/graphscope/framework/dag_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -677,6 +677,17 @@ def create_subgraph(graph, nodes=None, edges=None):
return op


def create_unload_op(session_id, op_type, inputs):
"""Uility method to create a unload `Operation` based on op type and op."""
op = Operation(
session_id,
op_type,
inputs=inputs,
output_types=types_pb2.NULL_OUTPUT,
)
return op


def unload_app(app):
"""Unload a loaded app.
Expand All @@ -686,13 +697,7 @@ def unload_app(app):
Returns:
An op to unload the `app`.
"""
op = Operation(
app.session_id,
types_pb2.UNLOAD_APP,
inputs=[app.op],
output_types=types_pb2.NULL_OUTPUT,
)
return op
return create_unload_op(app.session_id, types_pb2.UNLOAD_APP, [app.op])


def unload_graph(graph):
Expand All @@ -704,23 +709,11 @@ def unload_graph(graph):
Returns:
An op to unload the `graph`.
"""
op = Operation(
graph.session_id,
types_pb2.UNLOAD_GRAPH,
inputs=[graph.op],
output_types=types_pb2.NULL_OUTPUT,
)
return op
return create_unload_op(graph.session_id, types_pb2.UNLOAD_GRAPH, [graph.op])


def unload_context(context):
op = Operation(
context.session_id,
types_pb2.UNLOAD_CONTEXT,
inputs=[context.op],
output_types=types_pb2.NULL_OUTPUT,
)
return op
return create_unload_op(context.session_id, types_pb2.UNLOAD_CONTEXT, [context.op])


def context_to_numpy(context, selector=None, vertex_range=None, axis=0):
Expand Down

0 comments on commit a29a964

Please sign in to comment.