Skip to content

Commit

Permalink
[good-first-issue]Move save_to/load_from from client side to coordi…
Browse files Browse the repository at this point in the history
…nator (#2917)

[good-first-issue] Current graph serialization and deserialization is performed at client
side, which should be done on coordiantor.
related issue #1982

---------

Co-authored-by: Tao He <sighingnow@gmail.com>
  • Loading branch information
zhanglei1949 and sighingnow committed Jun 21, 2023
1 parent c1007cf commit f0a779e
Show file tree
Hide file tree
Showing 5 changed files with 164 additions and 70 deletions.
2 changes: 2 additions & 0 deletions coordinator/gscoordinator/dag_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ class DAGManager(object):
_coordinator_split_op = [
types_pb2.DATA_SOURCE, # spawn an io stream to read/write data from/to vineyard
types_pb2.DATA_SINK, # spawn an io stream to read/write data from/to vineyard
types_pb2.SERIALIZE_GRAPH, # serialize graph to path via vineyard
types_pb2.DESERIALIZE_GRAPH, # deserialize graph from path via vineyard
]

def __init__(self, request_iterator: Sequence[message_pb2.RunStepRequest]):
Expand Down
108 changes: 108 additions & 0 deletions coordinator/gscoordinator/op_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -739,12 +739,120 @@ def run_on_coordinator(self, dag_def, dag_bodies, loader_op_bodies):
op_result = self._process_data_source(op, dag_bodies, loader_op_bodies)
elif op.op == types_pb2.DATA_SINK:
op_result = self._process_data_sink(op)
elif op.op == types_pb2.SERIALIZE_GRAPH:
op_result = self._process_serialize_graph(op)
elif op.op == types_pb2.DESERIALIZE_GRAPH:
op_result = self._process_deserialize_graph(op)
else:
raise RuntimeError("Unsupported op type: " + str(op.op))
response_head.results.append(op_result)
self._op_result_pool[op.key] = op_result
return message_pb2.RunStepResponse(head=response_head), []

def _process_serialize_graph(self, op: op_def_pb2.OpDef):
try:
import vineyard
import vineyard.io
except ImportError:
raise RuntimeError(
"Saving context to locations requires 'vineyard', "
"please install those two dependencies via "
"\n"
"\n"
" pip3 install vineyard vineyard-io"
"\n"
"\n"
)
storage_options = json.loads(op.attr[types_pb2.STORAGE_OPTIONS].s.decode())
engine_config = self.get_analytical_engine_config()
if self._launcher.type() == types_pb2.HOSTS:
vineyard_endpoint = engine_config["vineyard_rpc_endpoint"]
else:
vineyard_endpoint = self._launcher._vineyard_internal_endpoint
vineyard_ipc_socket = engine_config["vineyard_socket"]
deployment, hosts = self._launcher.get_vineyard_stream_info()
path = op.attr[types_pb2.GRAPH_SERIALIZATION_PATH].s.decode()
obj_id = op.attr[types_pb2.VINEYARD_ID].i
logger.info("serialize graph %d to %s", obj_id, path)
vineyard.io.serialize(
path,
vineyard.ObjectID(obj_id),
type="global",
vineyard_ipc_socket=vineyard_ipc_socket,
vineyard_endpoint=vineyard_endpoint,
storage_options=storage_options,
deployment=deployment,
hosts=hosts,
)
logger.info("Finish serialization")
return op_def_pb2.OpResult(code=OK, key=op.key)

def _process_deserialize_graph(self, op: op_def_pb2.OpDef):
try:
import vineyard
import vineyard.io
except ImportError:
raise RuntimeError(
"Saving context to locations requires 'vineyard', "
"please install those two dependencies via "
"\n"
"\n"
" pip3 install vineyard vineyard-io"
"\n"
"\n"
)
storage_options = json.loads(op.attr[types_pb2.STORAGE_OPTIONS].s.decode())
engine_config = self.get_analytical_engine_config()
if self._launcher.type() == types_pb2.HOSTS:
vineyard_endpoint = engine_config["vineyard_rpc_endpoint"]
else:
vineyard_endpoint = self._launcher._vineyard_internal_endpoint
vineyard_ipc_socket = engine_config["vineyard_socket"]
deployment, hosts = self._launcher.get_vineyard_stream_info()
path = op.attr[types_pb2.GRAPH_SERIALIZATION_PATH].s.decode()
logger.info("Deserialize graph from %s", path)
graph_id = vineyard.io.deserialize(
path,
type="global",
vineyard_ipc_socket=vineyard_ipc_socket,
vineyard_endpoint=vineyard_endpoint,
storage_options=storage_options,
deployment=deployment,
hosts=hosts,
)
logger.info("Finish deserialization, graph id: %d", graph_id)
# create graph_def
# run create graph on analytical engine
create_graph_op = create_single_op_dag(
types_pb2.CREATE_GRAPH,
config={
types_pb2.GRAPH_TYPE: utils.graph_type_to_attr(
graph_def_pb2.ARROW_PROPERTY
),
types_pb2.OID_TYPE: utils.s_to_attr("int64_t"),
types_pb2.VID_TYPE: utils.s_to_attr("uint64_t"),
types_pb2.IS_FROM_VINEYARD_ID: utils.b_to_attr(True),
types_pb2.VINEYARD_ID: utils.i_to_attr(int(graph_id)),
},
)
try:
response_head, response_body = self.run_on_analytical_engine(
create_graph_op, [], {}
)
except grpc.RpcError as e:
logger.error(
"Create graph failed, code: %s, details: %s",
e.code().name,
e.details(),
)
if e.code() == grpc.StatusCode.INTERNAL:
raise AnalyticalEngineInternalError(e.details())
else:
raise
logger.info("response head, %s , body %s", response_head, response_body)
response_head.head.results[0].key = op.key
return response_head.head.results[0]

def _process_data_sink(self, op: op_def_pb2.OpDef):
import vineyard
import vineyard.io
Expand Down
44 changes: 44 additions & 0 deletions python/graphscope/framework/dag_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -1103,3 +1103,47 @@ def archive_graph(graph, path):
output_types=types_pb2.NULL_OUTPUT,
)
return op


def save_graph_to(
graph,
path: str,
vineyard_id,
**kwargs,
):
"""Serialize graph to the specified location
Args:
graph (:class:`graphscope.framework.graph.GraphDAGNode`): Source graph.
path (str): The path to serialize the graph, on each worker.
Returns:
An op to serialize the graph to a path.
"""
config = {
types_pb2.GRAPH_SERIALIZATION_PATH: utils.s_to_attr(path),
types_pb2.VINEYARD_ID: utils.i_to_attr(vineyard_id),
types_pb2.STORAGE_OPTIONS: utils.s_to_attr(json.dumps(kwargs)),
}
op = Operation(
graph.session_id,
types_pb2.SERIALIZE_GRAPH,
config=config,
inputs=[graph.op],
output_types=types_pb2.NULL_OUTPUT,
)
return op


def load_graph_from(path: str, sess, **kwargs):
config = {
types_pb2.GRAPH_SERIALIZATION_PATH: utils.s_to_attr(path),
types_pb2.STORAGE_OPTIONS: utils.s_to_attr(json.dumps(kwargs)),
}
op = Operation(
sess.session_id,
types_pb2.DESERIALIZE_GRAPH,
config=config,
output_types=types_pb2.GRAPH,
)
return op
75 changes: 5 additions & 70 deletions python/graphscope/framework/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -1078,42 +1078,10 @@ def save_to(self, path, **kwargs):
Args:
path (str): supported storages are local, hdfs, oss, s3
"""
try:
import vineyard
import vineyard.io
except ImportError:
raise RuntimeError(
"Saving context to locations requires 'vineyard', "
"please install those two dependencies via "
"\n"
"\n"
" pip3 install vineyard vineyard-io"
"\n"
"\n"
)

sess = self._session
deployment = "kubernetes" if sess.info["type"] == "k8s" else "ssh"
conf = sess.info["engine_config"]
vineyard_endpoint = conf["vineyard_rpc_endpoint"]
vineyard_ipc_socket = conf["vineyard_socket"]
if sess.info["type"] == "k8s":
hosts = [
"{}:{}".format(sess.info["namespace"], s)
for s in sess.info["engine_hosts"].split(",")
]
else: # type == "hosts"
hosts = sess.info["engine_hosts"].split(",")
vineyard.io.serialize(
path,
vineyard.ObjectID(self._vineyard_id),
type="global",
vineyard_ipc_socket=vineyard_ipc_socket,
vineyard_endpoint=vineyard_endpoint,
storage_options=kwargs,
deployment=deployment,
hosts=hosts,
)
op = dag_utils.save_graph_to(self, path, self._vineyard_id, **kwargs)
self._session.dag.add_op(op)
return self._session._wrapper(op)

@classmethod
def load_from(cls, path, sess, **kwargs):
Expand All @@ -1131,41 +1099,8 @@ def load_from(cls, path, sess, **kwargs):
`Graph`: A new graph object. Schema and data is supposed to be
identical with the one that called serialized method.
"""
try:
import vineyard
import vineyard.io
except ImportError:
raise RuntimeError(
"Saving context to locations requires 'vineyard', "
"please install those two dependencies via "
"\n"
"\n"
" pip3 install vineyard vineyard-io"
"\n"
"\n"
)

deployment = "kubernetes" if sess.info["type"] == "k8s" else "ssh"
conf = sess.info["engine_config"]
vineyard_endpoint = conf["vineyard_rpc_endpoint"]
vineyard_ipc_socket = conf["vineyard_socket"]
if sess.info["type"] == "k8s":
hosts = [
"{}:{}".format(sess.info["namespace"], s)
for s in sess.info["engine_hosts"].split(",")
]
else: # type == "hosts"
hosts = sess.info["engine_hosts"].split(",")
graph_id = vineyard.io.deserialize(
path,
type="global",
vineyard_ipc_socket=vineyard_ipc_socket,
vineyard_endpoint=vineyard_endpoint,
storage_options=kwargs,
deployment=deployment,
hosts=hosts,
)
return sess._wrapper(GraphDAGNode(sess, vineyard.ObjectID(graph_id)))
op = dag_utils.load_graph_from(path, sess, **kwargs)
return sess._wrapper(GraphDAGNode(sess, op))

def archive(self, path):
"""Archive graph gar format files base on the graph info.
Expand Down
5 changes: 5 additions & 0 deletions python/graphscope/proto/types.proto
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,8 @@ enum OperationType {
INDUCE_SUBGRAPH = 22; // induce subgraph
UNLOAD_CONTEXT = 23; // unload context
ARCHIVE_GRAPH = 24; // archive graph
SERIALIZE_GRAPH = 25; // serialize graph
DESERIALIZE_GRAPH = 26; // desrialize graph

SUBGRAPH = 32; // subgraph in interactive query
GREMLIN_QUERY = 33; // queries on gremlin engine
Expand Down Expand Up @@ -252,6 +254,9 @@ enum ParamKey {
CHUNK_TYPE = 342;

GRAPH_LIBRARY_PATH = 400;
// serialization path
GRAPH_SERIALIZATION_PATH = 401;


VFORMAT = 500; // vertex input format
EFORMAT = 501; // edge input format
Expand Down

0 comments on commit f0a779e

Please sign in to comment.