diff --git a/coordinator/gscoordinator/dag_manager.py b/coordinator/gscoordinator/dag_manager.py index 4bb0b609e3e9..a1bae41e18a7 100644 --- a/coordinator/gscoordinator/dag_manager.py +++ b/coordinator/gscoordinator/dag_manager.py @@ -75,6 +75,8 @@ class DAGManager(object): _coordinator_split_op = [ types_pb2.DATA_SOURCE, # spawn an io stream to read/write data from/to vineyard types_pb2.DATA_SINK, # spawn an io stream to read/write data from/to vineyard + types_pb2.SERIALIZE_GRAPH, # serialize graph to path via vineyard + types_pb2.DESERIALIZE_GRAPH, # deserialize graph from path via vineyard ] def __init__(self, request_iterator: Sequence[message_pb2.RunStepRequest]): diff --git a/coordinator/gscoordinator/op_executor.py b/coordinator/gscoordinator/op_executor.py index 3606f786bdfe..c3b48cc164a5 100644 --- a/coordinator/gscoordinator/op_executor.py +++ b/coordinator/gscoordinator/op_executor.py @@ -739,12 +739,120 @@ def run_on_coordinator(self, dag_def, dag_bodies, loader_op_bodies): op_result = self._process_data_source(op, dag_bodies, loader_op_bodies) elif op.op == types_pb2.DATA_SINK: op_result = self._process_data_sink(op) + elif op.op == types_pb2.SERIALIZE_GRAPH: + op_result = self._process_serialize_graph(op) + elif op.op == types_pb2.DESERIALIZE_GRAPH: + op_result = self._process_deserialize_graph(op) else: raise RuntimeError("Unsupported op type: " + str(op.op)) response_head.results.append(op_result) self._op_result_pool[op.key] = op_result return message_pb2.RunStepResponse(head=response_head), [] + def _process_serialize_graph(self, op: op_def_pb2.OpDef): + try: + import vineyard + import vineyard.io + except ImportError: + raise RuntimeError( + "Saving context to locations requires 'vineyard', " + "please install those two dependencies via " + "\n" + "\n" + " pip3 install vineyard vineyard-io" + "\n" + "\n" + ) + storage_options = json.loads(op.attr[types_pb2.STORAGE_OPTIONS].s.decode()) + engine_config = self.get_analytical_engine_config() + if self._launcher.type() == types_pb2.HOSTS: + vineyard_endpoint = engine_config["vineyard_rpc_endpoint"] + else: + vineyard_endpoint = self._launcher._vineyard_internal_endpoint + vineyard_ipc_socket = engine_config["vineyard_socket"] + deployment, hosts = self._launcher.get_vineyard_stream_info() + path = op.attr[types_pb2.GRAPH_SERIALIZATION_PATH].s.decode() + obj_id = op.attr[types_pb2.VINEYARD_ID].i + logger.info("serialize graph %d to %s", obj_id, path) + vineyard.io.serialize( + path, + vineyard.ObjectID(obj_id), + type="global", + vineyard_ipc_socket=vineyard_ipc_socket, + vineyard_endpoint=vineyard_endpoint, + storage_options=storage_options, + deployment=deployment, + hosts=hosts, + ) + logger.info("Finish serialization") + return op_def_pb2.OpResult(code=OK, key=op.key) + + def _process_deserialize_graph(self, op: op_def_pb2.OpDef): + try: + import vineyard + import vineyard.io + except ImportError: + raise RuntimeError( + "Saving context to locations requires 'vineyard', " + "please install those two dependencies via " + "\n" + "\n" + " pip3 install vineyard vineyard-io" + "\n" + "\n" + ) + storage_options = json.loads(op.attr[types_pb2.STORAGE_OPTIONS].s.decode()) + engine_config = self.get_analytical_engine_config() + if self._launcher.type() == types_pb2.HOSTS: + vineyard_endpoint = engine_config["vineyard_rpc_endpoint"] + else: + vineyard_endpoint = self._launcher._vineyard_internal_endpoint + vineyard_ipc_socket = engine_config["vineyard_socket"] + deployment, hosts = self._launcher.get_vineyard_stream_info() + path = op.attr[types_pb2.GRAPH_SERIALIZATION_PATH].s.decode() + logger.info("Deserialize graph from %s", path) + graph_id = vineyard.io.deserialize( + path, + type="global", + vineyard_ipc_socket=vineyard_ipc_socket, + vineyard_endpoint=vineyard_endpoint, + storage_options=storage_options, + deployment=deployment, + hosts=hosts, + ) + logger.info("Finish deserialization, graph id: %d", graph_id) + # create graph_def + # run create graph on analytical engine + create_graph_op = create_single_op_dag( + types_pb2.CREATE_GRAPH, + config={ + types_pb2.GRAPH_TYPE: utils.graph_type_to_attr( + graph_def_pb2.ARROW_PROPERTY + ), + types_pb2.OID_TYPE: utils.s_to_attr("int64_t"), + types_pb2.VID_TYPE: utils.s_to_attr("uint64_t"), + types_pb2.IS_FROM_VINEYARD_ID: utils.b_to_attr(True), + types_pb2.VINEYARD_ID: utils.i_to_attr(int(graph_id)), + }, + ) + try: + response_head, response_body = self.run_on_analytical_engine( + create_graph_op, [], {} + ) + except grpc.RpcError as e: + logger.error( + "Create graph failed, code: %s, details: %s", + e.code().name, + e.details(), + ) + if e.code() == grpc.StatusCode.INTERNAL: + raise AnalyticalEngineInternalError(e.details()) + else: + raise + logger.info("response head, %s , body %s", response_head, response_body) + response_head.head.results[0].key = op.key + return response_head.head.results[0] + def _process_data_sink(self, op: op_def_pb2.OpDef): import vineyard import vineyard.io diff --git a/python/graphscope/framework/dag_utils.py b/python/graphscope/framework/dag_utils.py index f6cf867b49b5..d1e3b7d2a18f 100644 --- a/python/graphscope/framework/dag_utils.py +++ b/python/graphscope/framework/dag_utils.py @@ -1103,3 +1103,47 @@ def archive_graph(graph, path): output_types=types_pb2.NULL_OUTPUT, ) return op + + +def save_graph_to( + graph, + path: str, + vineyard_id, + **kwargs, +): + """Serialize graph to the specified location + + Args: + graph (:class:`graphscope.framework.graph.GraphDAGNode`): Source graph. + path (str): The path to serialize the graph, on each worker. + + Returns: + An op to serialize the graph to a path. + """ + config = { + types_pb2.GRAPH_SERIALIZATION_PATH: utils.s_to_attr(path), + types_pb2.VINEYARD_ID: utils.i_to_attr(vineyard_id), + types_pb2.STORAGE_OPTIONS: utils.s_to_attr(json.dumps(kwargs)), + } + op = Operation( + graph.session_id, + types_pb2.SERIALIZE_GRAPH, + config=config, + inputs=[graph.op], + output_types=types_pb2.NULL_OUTPUT, + ) + return op + + +def load_graph_from(path: str, sess, **kwargs): + config = { + types_pb2.GRAPH_SERIALIZATION_PATH: utils.s_to_attr(path), + types_pb2.STORAGE_OPTIONS: utils.s_to_attr(json.dumps(kwargs)), + } + op = Operation( + sess.session_id, + types_pb2.DESERIALIZE_GRAPH, + config=config, + output_types=types_pb2.GRAPH, + ) + return op diff --git a/python/graphscope/framework/graph.py b/python/graphscope/framework/graph.py index 0b4e0e173d2f..561bd7532cce 100644 --- a/python/graphscope/framework/graph.py +++ b/python/graphscope/framework/graph.py @@ -1078,42 +1078,10 @@ def save_to(self, path, **kwargs): Args: path (str): supported storages are local, hdfs, oss, s3 """ - try: - import vineyard - import vineyard.io - except ImportError: - raise RuntimeError( - "Saving context to locations requires 'vineyard', " - "please install those two dependencies via " - "\n" - "\n" - " pip3 install vineyard vineyard-io" - "\n" - "\n" - ) - sess = self._session - deployment = "kubernetes" if sess.info["type"] == "k8s" else "ssh" - conf = sess.info["engine_config"] - vineyard_endpoint = conf["vineyard_rpc_endpoint"] - vineyard_ipc_socket = conf["vineyard_socket"] - if sess.info["type"] == "k8s": - hosts = [ - "{}:{}".format(sess.info["namespace"], s) - for s in sess.info["engine_hosts"].split(",") - ] - else: # type == "hosts" - hosts = sess.info["engine_hosts"].split(",") - vineyard.io.serialize( - path, - vineyard.ObjectID(self._vineyard_id), - type="global", - vineyard_ipc_socket=vineyard_ipc_socket, - vineyard_endpoint=vineyard_endpoint, - storage_options=kwargs, - deployment=deployment, - hosts=hosts, - ) + op = dag_utils.save_graph_to(self, path, self._vineyard_id, **kwargs) + self._session.dag.add_op(op) + return self._session._wrapper(op) @classmethod def load_from(cls, path, sess, **kwargs): @@ -1131,41 +1099,8 @@ def load_from(cls, path, sess, **kwargs): `Graph`: A new graph object. Schema and data is supposed to be identical with the one that called serialized method. """ - try: - import vineyard - import vineyard.io - except ImportError: - raise RuntimeError( - "Saving context to locations requires 'vineyard', " - "please install those two dependencies via " - "\n" - "\n" - " pip3 install vineyard vineyard-io" - "\n" - "\n" - ) - - deployment = "kubernetes" if sess.info["type"] == "k8s" else "ssh" - conf = sess.info["engine_config"] - vineyard_endpoint = conf["vineyard_rpc_endpoint"] - vineyard_ipc_socket = conf["vineyard_socket"] - if sess.info["type"] == "k8s": - hosts = [ - "{}:{}".format(sess.info["namespace"], s) - for s in sess.info["engine_hosts"].split(",") - ] - else: # type == "hosts" - hosts = sess.info["engine_hosts"].split(",") - graph_id = vineyard.io.deserialize( - path, - type="global", - vineyard_ipc_socket=vineyard_ipc_socket, - vineyard_endpoint=vineyard_endpoint, - storage_options=kwargs, - deployment=deployment, - hosts=hosts, - ) - return sess._wrapper(GraphDAGNode(sess, vineyard.ObjectID(graph_id))) + op = dag_utils.load_graph_from(path, sess, **kwargs) + return sess._wrapper(GraphDAGNode(sess, op)) def archive(self, path): """Archive graph gar format files base on the graph info. diff --git a/python/graphscope/proto/types.proto b/python/graphscope/proto/types.proto index f41f7d424101..c4e85367835a 100644 --- a/python/graphscope/proto/types.proto +++ b/python/graphscope/proto/types.proto @@ -104,6 +104,8 @@ enum OperationType { INDUCE_SUBGRAPH = 22; // induce subgraph UNLOAD_CONTEXT = 23; // unload context ARCHIVE_GRAPH = 24; // archive graph + SERIALIZE_GRAPH = 25; // serialize graph + DESERIALIZE_GRAPH = 26; // desrialize graph SUBGRAPH = 32; // subgraph in interactive query GREMLIN_QUERY = 33; // queries on gremlin engine @@ -252,6 +254,9 @@ enum ParamKey { CHUNK_TYPE = 342; GRAPH_LIBRARY_PATH = 400; + // serialization path + GRAPH_SERIALIZATION_PATH = 401; + VFORMAT = 500; // vertex input format EFORMAT = 501; // edge input format