Skip to content

Commit

Permalink
Add methods to unload contexts. (#794)
Browse files Browse the repository at this point in the history
* Add unload context method
* Passed unloading context
* Unify the naming of context key/name.
  • Loading branch information
siyuan0322 committed Sep 13, 2021
1 parent b5ba16c commit 3e2fd7d
Show file tree
Hide file tree
Showing 13 changed files with 93 additions and 37 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -436,8 +436,8 @@ jobs:
export HDFS_HOST=$(hostname -I | awk '{print $1}')
cd ${GITHUB_WORKSPACE}/python
python3 -m pytest ./tests/kubernetes/test_demo_script.py -k test_demo_on_hdfs
# FIXME: context.output file not found
python3 -m pytest -s ./tests/kubernetes/test_demo_script.py -k test_demo_on_hdfs
# Check the result file have successfully written to the given location
hdfs dfs -test -e /ldbc_sample/res.csv_0 && hdfs dfs -test -e /ldbc_sample/res.csv_1
- name: Helm test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ class DynamicGraphReporter : public grape::Communicator {
default:
CHECK(false);
}
return std::string("");
}

private:
Expand Down
37 changes: 23 additions & 14 deletions analytical_engine/core/grape_instance.cc
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,11 @@ bl::result<std::string> GrapeInstance::query(const rpc::GSParams& params,
{"context_schema", context_schema}});
}

bl::result<void> GrapeInstance::unloadContext(const rpc::GSParams& params) {
BOOST_LEAF_AUTO(context_key, params.Get<std::string>(rpc::CONTEXT_KEY));
return object_manager_.RemoveObject(context_key);
}

bl::result<std::string> GrapeInstance::reportGraph(
const rpc::GSParams& params) {
#ifdef NETWORKX
Expand Down Expand Up @@ -330,9 +335,9 @@ bl::result<std::shared_ptr<grape::InArchive>> GrapeInstance::contextToNumpy(
BOOST_LEAF_ASSIGN(s_selector, params.Get<std::string>(rpc::SELECTOR));
}

BOOST_LEAF_AUTO(ctx_name, params.Get<std::string>(rpc::CTX_NAME));
BOOST_LEAF_AUTO(context_key, params.Get<std::string>(rpc::CONTEXT_KEY));
BOOST_LEAF_AUTO(base_ctx_wrapper,
object_manager_.GetObject<IContextWrapper>(ctx_name));
object_manager_.GetObject<IContextWrapper>(context_key));

auto ctx_type = base_ctx_wrapper->context_type();

Expand Down Expand Up @@ -374,9 +379,9 @@ bl::result<std::shared_ptr<grape::InArchive>> GrapeInstance::contextToNumpy(

bl::result<std::string> GrapeInstance::getContextData(
const rpc::GSParams& params) {
BOOST_LEAF_AUTO(ctx_name, params.Get<std::string>(rpc::CTX_NAME));
BOOST_LEAF_AUTO(context_key, params.Get<std::string>(rpc::CONTEXT_KEY));
BOOST_LEAF_AUTO(base_ctx_wrapper,
object_manager_.GetObject<IContextWrapper>(ctx_name));
object_manager_.GetObject<IContextWrapper>(context_key));

auto wrapper =
std::dynamic_pointer_cast<IVertexDataContextWrapper>(base_ctx_wrapper);
Expand All @@ -397,9 +402,9 @@ bl::result<std::shared_ptr<grape::InArchive>> GrapeInstance::contextToDataframe(
BOOST_LEAF_ASSIGN(s_selectors, params.Get<std::string>(rpc::SELECTOR));
}

BOOST_LEAF_AUTO(ctx_name, params.Get<std::string>(rpc::CTX_NAME));
BOOST_LEAF_AUTO(context_key, params.Get<std::string>(rpc::CONTEXT_KEY));
BOOST_LEAF_AUTO(base_ctx_wrapper,
object_manager_.GetObject<IContextWrapper>(ctx_name));
object_manager_.GetObject<IContextWrapper>(context_key));

auto ctx_type = base_ctx_wrapper->context_type();

Expand Down Expand Up @@ -440,9 +445,9 @@ bl::result<std::shared_ptr<grape::InArchive>> GrapeInstance::contextToDataframe(

bl::result<std::string> GrapeInstance::contextToVineyardTensor(
const rpc::GSParams& params) {
BOOST_LEAF_AUTO(ctx_name, params.Get<std::string>(rpc::CTX_NAME));
BOOST_LEAF_AUTO(context_key, params.Get<std::string>(rpc::CONTEXT_KEY));
BOOST_LEAF_AUTO(base_ctx_wrapper,
object_manager_.GetObject<IContextWrapper>(ctx_name));
object_manager_.GetObject<IContextWrapper>(context_key));
auto ctx_type = base_ctx_wrapper->context_type();
std::pair<std::string, std::string> range;

Expand Down Expand Up @@ -508,9 +513,9 @@ bl::result<std::string> GrapeInstance::contextToVineyardDataFrame(
const rpc::GSParams& params) {
std::pair<std::string, std::string> range;

BOOST_LEAF_AUTO(ctx_name, params.Get<std::string>(rpc::CTX_NAME));
BOOST_LEAF_AUTO(context_key, params.Get<std::string>(rpc::CONTEXT_KEY));
BOOST_LEAF_AUTO(base_ctx_wrapper,
object_manager_.GetObject<IContextWrapper>(ctx_name));
object_manager_.GetObject<IContextWrapper>(context_key));
if (params.HasKey(rpc::VERTEX_RANGE)) {
BOOST_LEAF_AUTO(range_in_json, params.Get<std::string>(rpc::VERTEX_RANGE));
range = parseRange(range_in_json);
Expand Down Expand Up @@ -574,7 +579,7 @@ bl::result<std::string> GrapeInstance::contextToVineyardDataFrame(
bl::result<rpc::graph::GraphDefPb> GrapeInstance::addColumn(
const rpc::GSParams& params) {
BOOST_LEAF_AUTO(graph_name, params.Get<std::string>(rpc::GRAPH_NAME));
BOOST_LEAF_AUTO(ctx_name, params.Get<std::string>(rpc::CTX_NAME));
BOOST_LEAF_AUTO(context_key, params.Get<std::string>(rpc::CONTEXT_KEY));
BOOST_LEAF_AUTO(s_selectors, params.Get<std::string>(rpc::SELECTOR));
BOOST_LEAF_AUTO(
frag_wrapper,
Expand All @@ -585,7 +590,7 @@ bl::result<rpc::graph::GraphDefPb> GrapeInstance::addColumn(
"AddColumn is only available for ArrowFragment");
}
BOOST_LEAF_AUTO(ctx_wrapper,
object_manager_.GetObject<IContextWrapper>(ctx_name));
object_manager_.GetObject<IContextWrapper>(context_key));
std::string dst_graph_name = "graph_" + generateId();

BOOST_LEAF_AUTO(new_frag_wrapper,
Expand Down Expand Up @@ -932,8 +937,8 @@ bl::result<std::shared_ptr<DispatchResult>> GrapeInstance::OnReceive(
break;
}
case rpc::RUN_APP: {
BOOST_LEAF_AUTO(ctx_name, query(params, cmd.query_args));
r->set_data(ctx_name);
BOOST_LEAF_AUTO(context_key, query(params, cmd.query_args));
r->set_data(context_key);
break;
}
case rpc::UNLOAD_APP: {
Expand All @@ -944,6 +949,10 @@ bl::result<std::shared_ptr<DispatchResult>> GrapeInstance::OnReceive(
BOOST_LEAF_CHECK(unloadGraph(params));
break;
}
case rpc::UNLOAD_CONTEXT: {
BOOST_LEAF_CHECK(unloadContext(params));
break;
}
case rpc::REPORT_GRAPH: {
BOOST_LEAF_AUTO(report_in_json, reportGraph(params));
r->set_data(report_in_json,
Expand Down
2 changes: 2 additions & 0 deletions analytical_engine/core/grape_instance.h
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ class GrapeInstance : public Subscriber {
bl::result<std::string> query(const rpc::GSParams& params,
const rpc::QueryArgs& query_args);

bl::result<void> unloadContext(const rpc::GSParams& params);

bl::result<std::string> reportGraph(const rpc::GSParams& params);

bl::result<rpc::graph::GraphDefPb> projectGraph(const rpc::GSParams& params);
Expand Down
6 changes: 3 additions & 3 deletions analytical_engine/core/object/app_entry.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ typedef void* CreateWorkerT(const std::shared_ptr<void>& fragment,
typedef void DeleteWorkerT(void* worker_handler);

typedef void QueryT(void* worker_handler, const rpc::QueryArgs& query_args,
const std::string& ctx_name,
const std::string& context_key,
std::shared_ptr<IFragmentWrapper> frag_wrapper,
std::shared_ptr<IContextWrapper>& ctx_wrapper);

Expand Down Expand Up @@ -88,10 +88,10 @@ class AppEntry : public GSObject {

bl::result<std::shared_ptr<IContextWrapper>> Query(
void* worker_handler, const rpc::QueryArgs& query_args,
const std::string& ctx_name,
const std::string& context_key,
std::shared_ptr<IFragmentWrapper>& frag_wrapper) {
std::shared_ptr<IContextWrapper> ctx_wrapper;
query_(worker_handler, query_args, ctx_name, frag_wrapper, ctx_wrapper);
query_(worker_handler, query_args, context_key, frag_wrapper, ctx_wrapper);
return ctx_wrapper;
}

Expand Down
6 changes: 3 additions & 3 deletions analytical_engine/frame/app_frame.cc
Original file line number Diff line number Diff line change
Expand Up @@ -71,16 +71,16 @@ void DeleteWorker(void* worker_handler) {
}

void Query(void* worker_handler, const gs::rpc::QueryArgs& query_args,
const std::string& ctx_name,
const std::string& context_key,
std::shared_ptr<gs::IFragmentWrapper> frag_wrapper,
std::shared_ptr<gs::IContextWrapper>& ctx_wrapper) {
auto worker = static_cast<worker_handler_t*>(worker_handler)->worker;
gs::AppInvoker<_APP_TYPE>::Query(worker, query_args);
auto ctx = worker->GetContext();

if (!ctx_name.empty()) {
if (!context_key.empty()) {
ctx_wrapper = gs::CtxWrapperBuilder<typename _APP_TYPE::context_t>::build(
ctx_name, frag_wrapper, ctx);
context_key, frag_wrapper, ctx);
}
}
}
6 changes: 3 additions & 3 deletions analytical_engine/frame/cython_app_frame.cc
Original file line number Diff line number Diff line change
Expand Up @@ -156,16 +156,16 @@ void DeleteWorker(void* worker_handler) {
}

void Query(void* worker_handler, const gs::rpc::QueryArgs& query_args,
const std::string& ctx_name,
const std::string& context_key,
std::shared_ptr<gs::IFragmentWrapper> frag_wrapper,
std::shared_ptr<gs::IContextWrapper>& ctx_wrapper) {
auto worker = static_cast<worker_handler_t*>(worker_handler)->worker;
gs::AppInvoker<_APP_TYPE>::Query(worker, query_args);
auto ctx = worker->GetContext();

if (!ctx_name.empty()) {
if (!context_key.empty()) {
ctx_wrapper = gs::CtxWrapperBuilder<typename _APP_TYPE::context_t>::build(
ctx_name, frag_wrapper, ctx);
context_key, frag_wrapper, ctx);
}
}
}
6 changes: 3 additions & 3 deletions analytical_engine/frame/cython_pie_app_frame.cc
Original file line number Diff line number Diff line change
Expand Up @@ -144,16 +144,16 @@ void DeleteWorker(void* worker_handler) {
}

void Query(void* worker_handler, const gs::rpc::QueryArgs& query_args,
const std::string& ctx_name,
const std::string& context_key,
std::shared_ptr<gs::IFragmentWrapper> frag_wrapper,
std::shared_ptr<gs::IContextWrapper>& ctx_wrapper) {
auto worker = static_cast<worker_handler_t*>(worker_handler)->worker;
gs::AppInvoker<_APP_TYPE>::Query(worker, query_args);
auto ctx = worker->GetContext();

if (!ctx_name.empty()) {
if (!context_key.empty()) {
ctx_wrapper = gs::CtxWrapperBuilder<typename _APP_TYPE::context_t>::build(
ctx_name, frag_wrapper, ctx);
context_key, frag_wrapper, ctx);
}
}
}
17 changes: 15 additions & 2 deletions coordinator/gscoordinator/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,8 @@ def op_pre_process(op, op_result_pool, key_to_op, **kwargs): # noqa: C901
_pre_process_for_output_graph_op(op, op_result_pool, key_to_op, **kwargs)
if op.op == types_pb2.UNLOAD_APP:
_pre_process_for_unload_app_op(op, op_result_pool, key_to_op, **kwargs)
if op.op == types_pb2.UNLOAD_CONTEXT:
_pre_process_for_unload_context_op(op, op_result_pool, key_to_op, **kwargs)
if op.op == types_pb2.CREATE_INTERACTIVE_QUERY:
_pre_process_for_create_interactive_query_op(
op, op_result_pool, key_to_op, **kwargs
Expand Down Expand Up @@ -600,6 +602,17 @@ def _pre_process_for_unload_app_op(op, op_result_pool, key_to_op, **kwargs):
op.attr[types_pb2.APP_NAME].CopyFrom(utils.s_to_attr(result.result.decode("utf-8")))


def _pre_process_for_unload_context_op(op, op_result_pool, key_to_op, **kwargs):
assert len(op.parents) == 1
key_of_parent_op = op.parents[0]
result = op_result_pool[key_of_parent_op]
parent_op_result = json.loads(result.result.decode("utf-8"))
context_key = parent_op_result["context_key"]
op.attr[types_pb2.CONTEXT_KEY].CopyFrom(
attr_value_pb2.AttrValue(s=context_key.encode("utf-8"))
)


def _pre_process_for_add_column_op(op, op_result_pool, key_to_op, **kwargs):
for key_of_parent_op in op.parents:
parent_op = key_to_op[key_of_parent_op]
Expand All @@ -621,7 +634,7 @@ def _pre_process_for_add_column_op(op, op_result_pool, key_to_op, **kwargs):
selector = _tranform_dataframe_selector(context_type, schema, selector)
op.attr[types_pb2.GRAPH_NAME].CopyFrom(utils.s_to_attr(graph_name))
op.attr[types_pb2.GRAPH_TYPE].CopyFrom(utils.graph_type_to_attr(graph_type))
op.attr[types_pb2.CTX_NAME].CopyFrom(utils.s_to_attr(context_key))
op.attr[types_pb2.CONTEXT_KEY].CopyFrom(utils.s_to_attr(context_key))
op.attr[types_pb2.SELECTOR].CopyFrom(utils.s_to_attr(selector))


Expand Down Expand Up @@ -654,7 +667,7 @@ def __backtrack_key_of_graph_op(key):
parent_op_result = json.loads(r.result.decode("utf-8"))
context_key = parent_op_result["context_key"]
context_type = parent_op_result["context_type"]
op.attr[types_pb2.CTX_NAME].CopyFrom(
op.attr[types_pb2.CONTEXT_KEY].CopyFrom(
attr_value_pb2.AttrValue(s=context_key.encode("utf-8"))
)
r = op_result_pool[graph_op.key]
Expand Down
2 changes: 1 addition & 1 deletion proto/op_def.proto
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ message OpResult {

string error_msg = 4;

// result represents app_name or ctx_name or raw bytes If the op returns a NDArray or DataFrame
// result represents app_name or context_key or raw bytes If the op returns a NDArray or DataFrame
bytes result = 5;

// learning graph handle
Expand Down
3 changes: 2 additions & 1 deletion proto/types.proto
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ enum OperationType {
CLEAR_GRAPH = 20; // clear graph
VIEW_GRAPH = 21; // create graph view
INDUCE_SUBGRAPH = 22; // induce subgraph
UNLOAD_CONTEXT = 23; // unload context

CREATE_INTERACTIVE_QUERY = 31; // interactive query
SUBGRAPH = 32; // subgraph in interactive query
Expand Down Expand Up @@ -136,7 +137,7 @@ enum OperationType {
enum ParamKey {
GRAPH_NAME = 0;
DST_GRAPH_NAME = 1;
CTX_NAME = 2;
CONTEXT_KEY = 2;
GRAPH_TYPE = 3;
DST_GRAPH_TYPE = 4;

Expand Down
24 changes: 24 additions & 0 deletions python/graphscope/framework/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,16 @@ def __init__(self, dag_node, op):
self._session.dag.add_op(self._op)


class UnloadedContext(DAGNode):
"""Unloaded context node in a DAG."""

def __init__(self, session, op):
self._session = session
self._op = op
# add op to dag
self._session.dag.add_op(self._op)


class BaseContextDAGNode(DAGNode):
"""Base class of concrete context DAG node.
Expand Down Expand Up @@ -241,6 +251,10 @@ def output(self, fd, selector, vertex_range=None, **kwargs):
op = dag_utils.output(df, fd, **kwargs)
return ResultDAGNode(self, op)

def unload(self):
op = dag_utils.unload_context(self)
return UnloadedContext(self._session, op)


class TensorContextDAGNode(BaseContextDAGNode):
"""Tensor context DAG node holds a tensor.
Expand Down Expand Up @@ -540,6 +554,13 @@ def __init__(self, context_node, key, result_schema):
self._context_node.evaluated = True
self._saved_signature = self.signature

def __del__(self):
# cleanly ignore all exceptions, cause session may already closed / destroyed.
try:
self.unload()
except Exception: # pylint: disable=broad-except
pass

@property
def op(self):
return self._context_node.op
Expand Down Expand Up @@ -623,6 +644,9 @@ def output_to_client(self, fd, selector, vertex_range=None):
df = self.to_dataframe(selector, vertex_range)
df.to_csv(fd, header=True, index=False)

def unload(self):
return self._session._wrapper(self._context_node.unload())


class DynamicVertexDataContext(collections.abc.Mapping):
"""Vertex data context for complicated result store.
Expand Down
16 changes: 11 additions & 5 deletions python/graphscope/framework/dag_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -673,11 +673,9 @@ def unload_app(app):
Returns:
An op to unload the `app`.
"""
config = {}
op = Operation(
app.session_id,
types_pb2.UNLOAD_APP,
config=config,
inputs=[app.op],
output_types=types_pb2.NULL_OUTPUT,
)
Expand All @@ -693,17 +691,25 @@ def unload_graph(graph):
Returns:
An op to unload the `graph`.
"""
config = {}
op = Operation(
graph.session_id,
types_pb2.UNLOAD_GRAPH,
config=config,
inputs=[graph.op],
output_types=types_pb2.NULL_OUTPUT,
)
return op


def unload_context(context):
op = Operation(
context.session_id,
types_pb2.UNLOAD_CONTEXT,
inputs=[context.op],
output_types=types_pb2.NULL_OUTPUT,
)
return op


def context_to_numpy(context, selector=None, vertex_range=None, axis=0):
"""Retrieve results as a numpy ndarray.
Expand Down Expand Up @@ -838,7 +844,7 @@ def output(result, fd, **kwargs):

def get_context_data(results, node):
config = {
types_pb2.CTX_NAME: utils.s_to_attr(results.key),
types_pb2.CONTEXT_KEY: utils.s_to_attr(results.key),
types_pb2.NODE: utils.s_to_attr(node),
}
op = Operation(
Expand Down

0 comments on commit 3e2fd7d

Please sign in to comment.