diff --git a/analytical_engine/core/io/property_parser.h b/analytical_engine/core/io/property_parser.h index a061ccf1f6a0..ccf394672592 100644 --- a/analytical_engine/core/io/property_parser.h +++ b/analytical_engine/core/io/property_parser.h @@ -138,7 +138,11 @@ inline void ParseVertex(std::shared_ptr& graph, vertex->label = attrs.at(rpc::LABEL).s(); vertex->vid = attrs.at(rpc::VID).s(); vertex->protocol = attrs.at(rpc::PROTOCOL).s(); - vertex->values = data; + if (vertex->protocol == "pandas") { + vertex->values = data; + } else { + vertex->values = attrs.at(rpc::SOURCE).s(); + } graph->vertices.push_back(vertex); } @@ -163,7 +167,11 @@ inline void ParseEdge(std::shared_ptr& graph, sub_label.dst_vid = attrs.at(rpc::DST_VID).s(); sub_label.load_strategy = attrs.at(rpc::LOAD_STRATEGY).s(); sub_label.protocol = attrs.at(rpc::PROTOCOL).s(); - sub_label.values = data; + if (sub_label.protocol == "pandas") { + sub_label.values = data; + } else { + sub_label.values = attrs.at(rpc::SOURCE).s(); + } edge->sub_labels.push_back(sub_label); if (!has_edge_label) { @@ -211,7 +219,7 @@ inline void DistributeChunk(const rpc::Chunk& chunk, int num, if (protocol == "pandas") { SplitTable(data, num, distributed_values); } else { - distributed_values.resize(num, data); + distributed_values.resize(num, attrs.at(rpc::SOURCE).s()); } for (int i = 0; i < num; ++i) { distributed_chunk[i].set_buffer(std::move(distributed_values[i])); diff --git a/analytical_engine/core/server/command_detail.cc b/analytical_engine/core/server/command_detail.cc index 0b81c6cab633..82fa85160781 100644 --- a/analytical_engine/core/server/command_detail.cc +++ b/analytical_engine/core/server/command_detail.cc @@ -21,34 +21,78 @@ namespace gs { grape::InArchive& operator<<(grape::InArchive& archive, const CommandDetail& cd) { + // type + archive << cd.type; + // params std::map buffer; for (auto& pair : cd.params) { buffer[pair.first] = pair.second.SerializeAsString(); } - - archive << cd.type; archive << buffer; - archive << cd.large_attr.SerializeAsString(); + // large attr + bool has_chunk_list = cd.large_attr.has_chunk_list(); + archive << has_chunk_list; + if (has_chunk_list) { + size_t chunk_list_size = cd.large_attr.chunk_list().items().size(); + archive << chunk_list_size; + for (size_t i = 0; i < chunk_list_size; ++i) { + const auto& chunk = cd.large_attr.chunk_list().items(i); + // buffer + archive << chunk.buffer(); + // attr + std::map attr; + for (auto& pair : chunk.attr()) { + attr[pair.first] = pair.second.SerializeAsString(); + } + archive << attr; + } + } + // query_args archive << cd.query_args.SerializeAsString(); return archive; } grape::OutArchive& operator>>(grape::OutArchive& archive, CommandDetail& cd) { - std::map buffer; - std::string s_large_attr, s_args; - + // type archive >> cd.type; + // params + std::map buffer; archive >> buffer; - archive >> s_large_attr; - archive >> s_args; - for (auto& pair : buffer) { rpc::AttrValue attr_value; attr_value.ParseFromString(pair.second); cd.params[pair.first] = attr_value; } - cd.large_attr.ParseFromString(s_large_attr); + // large attr + bool has_chunk_list; + archive >> has_chunk_list; + if (has_chunk_list) { + size_t chunk_list_size; + archive >> chunk_list_size; + if (chunk_list_size > 0) { + auto* chunk_list = cd.large_attr.mutable_chunk_list(); + for (size_t i = 0; i < chunk_list_size; ++i) { + auto* chunk = chunk_list->add_items(); + // buffer + std::string buf; + archive >> buf; + chunk->set_buffer(std::move(buf)); + // attr + auto* mutable_attr = chunk->mutable_attr(); + std::map attr; + archive >> attr; + for (auto& pair : attr) { + rpc::AttrValue attr_value; + attr_value.ParseFromString(pair.second); + (*mutable_attr)[pair.first].CopyFrom(attr_value); + } + } + } + } + // query_args + std::string s_args; + archive >> s_args; cd.query_args.ParseFromString(s_args); return archive; diff --git a/analytical_engine/core/server/graphscope_service.cc b/analytical_engine/core/server/graphscope_service.cc index 93e95a555f5e..c028c9394632 100644 --- a/analytical_engine/core/server/graphscope_service.cc +++ b/analytical_engine/core/server/graphscope_service.cc @@ -15,6 +15,7 @@ #include "core/server/graphscope_service.h" +#include #include #include "google/protobuf/util/message_differencer.h" @@ -24,22 +25,62 @@ namespace gs { namespace rpc { -using ::grpc::Status; -using ::grpc::StatusCode; - -Status GraphScopeService::HeartBeat(::grpc::ServerContext* context, +Status GraphScopeService::HeartBeat(ServerContext* context, const HeartBeatRequest* request, HeartBeatResponse* response) { return Status::OK; } -::grpc::Status GraphScopeService::RunStep(::grpc::ServerContext* context, - const RunStepRequest* request, +::grpc::Status GraphScopeService::RunStep(ServerContext* context, + ServerReader* stream, RunStepResponse* response) { - CHECK(request->has_dag_def()); - const DagDef& dag_def = request->dag_def(); - std::unordered_map op_key_to_result; + // ServerReaderWriter* stream) { + DagDef dag_def; + std::queue chunks; + RunStepRequest request; + bool has_next = true; + // read stream request and join the chunk + while (stream->Read(&request)) { + if (request.has_head()) { + // head is always the first in the stream + // get a copy of 'dag_def' and set the 'large_attr' from body later. + dag_def = request.head().dag_def(); + } else { + // body + if (chunks.empty() || has_next == false) { + chunks.push(""); + } + auto& chunk = chunks.back(); + chunk += request.body().chunk(); + has_next = request.body().has_next(); + } + } + // fill the chunks into dag_def + auto* ops = dag_def.mutable_op(); + for (auto& op : *ops) { + LargeAttrValue large_attr = op.large_attr(); + if (large_attr.has_chunk_meta_list()) { + auto* mutable_large_attr = op.mutable_large_attr(); + auto* chunk_list = mutable_large_attr->mutable_chunk_list(); + for (const auto& chunk_meta : large_attr.chunk_meta_list().items()) { + auto* chunk = chunk_list->add_items(); + if (chunk_meta.size() > 0) { + // set buffer + chunk->set_buffer(std::move(chunks.front())); + chunks.pop(); + } + // copy attr from chunk_meta + auto* mutable_attr = chunk->mutable_attr(); + for (auto& attr : chunk_meta.attr()) { + (*mutable_attr)[attr.first].CopyFrom(attr.second); + } + } + } + } + assert(chunks.empty()); + // execute the dag + std::unordered_map op_key_to_result; for (const auto& op : dag_def.op()) { OpResult* op_result = response->add_results(); op_result->set_key(op.key()); diff --git a/analytical_engine/core/server/graphscope_service.h b/analytical_engine/core/server/graphscope_service.h index f9aa038baf6f..dc7762353d9c 100644 --- a/analytical_engine/core/server/graphscope_service.h +++ b/analytical_engine/core/server/graphscope_service.h @@ -22,6 +22,7 @@ #include #include "core/server/dispatcher.h" +#include "proto/graphscope/proto/attr_value.pb.h" #include "proto/graphscope/proto/engine_service.grpc.pb.h" #include "proto/graphscope/proto/graph_def.pb.h" #include "proto/graphscope/proto/op_def.pb.h" @@ -30,6 +31,8 @@ namespace gs { namespace rpc { using grpc::ServerContext; +using ::grpc::ServerReader; +using ::grpc::ServerReaderWriter; using grpc::Status; using grpc::StatusCode; @@ -43,7 +46,7 @@ class GraphScopeService final : public EngineService::Service { : dispatcher_(std::move(dispatcher)) {} ::grpc::Status RunStep(::grpc::ServerContext* context, - const RunStepRequest* request, + ServerReader* stream, RunStepResponse* response) override; ::grpc::Status HeartBeat(::grpc::ServerContext* context, diff --git a/analytical_engine/core/server/rpc_utils.h b/analytical_engine/core/server/rpc_utils.h index 425079cc3e29..26d00a874cdb 100644 --- a/analytical_engine/core/server/rpc_utils.h +++ b/analytical_engine/core/server/rpc_utils.h @@ -29,6 +29,7 @@ #include "core/error.h" #include "core/server/command_detail.h" #include "proto/graphscope/proto/attr_value.pb.h" +#include "proto/graphscope/proto/message.pb.h" #include "proto/graphscope/proto/op_def.pb.h" #include "proto/graphscope/proto/types.pb.h" diff --git a/coordinator/gscoordinator/coordinator.py b/coordinator/gscoordinator/coordinator.py index cf07fd4732e8..59bc436b7d8b 100644 --- a/coordinator/gscoordinator/coordinator.py +++ b/coordinator/gscoordinator/coordinator.py @@ -46,6 +46,7 @@ sys.stdout = StdStreamWrapper(sys.stdout) sys.stderr = StdStreamWrapper(sys.stderr) +from graphscope.client.utils import GRPCUtils from graphscope.framework import utils from graphscope.framework.dag_utils import create_graph from graphscope.framework.dag_utils import create_loader @@ -118,6 +119,7 @@ def __init__(self, launcher, dangling_timeout_seconds, log_level="INFO"): self._request = None self._object_manager = ObjectManager() + self._grpc_utils = GRPCUtils() self._dangling_detecting_timer = None self._config_logging(log_level) @@ -310,8 +312,27 @@ def HeartBeat(self, request, context): return message_pb2.HeartBeatResponse() def run_on_analytical_engine( # noqa: C901 - self, session_id, dag_def: op_def_pb2.DagDef, op_results: list + self, + dag_def: op_def_pb2.DagDef, + dag_bodies, + op_results: list, + loader_op_bodies: dict, ): + def _generate_runstep_request(session_id, dag_def, dag_bodies): + runstep_requests = [] + # head + runstep_requests.append( + message_pb2.RunStepRequest( + head=message_pb2.RunStepRequestHead( + session_id=session_id, dag_def=dag_def + ) + ) + ) + runstep_requests.extend(dag_bodies) + for item in runstep_requests: + yield item + + # preprocess of op for op in dag_def.op: self._key_to_op[op.key] = op op_pre_process( @@ -322,6 +343,15 @@ def run_on_analytical_engine( # noqa: C901 engine_config=self._analytical_engine_config, ) + # Handle op that depends on loader (data source) + if op.op == types_pb2.CREATE_GRAPH or op.op == types_pb2.ADD_LABELS: + for key_of_parent_op in op.parents: + parent_op = self._key_to_op[key_of_parent_op] + if parent_op.op == types_pb2.DATA_SOURCE: + # handle bodies of loader op + if parent_op.key in loader_op_bodies: + dag_bodies.extend(loader_op_bodies[parent_op.key]) + # Compile app or not. if op.op == types_pb2.BIND_APP: op, app_sig, app_lib_path = self._maybe_compile_app(op) @@ -340,14 +370,12 @@ def run_on_analytical_engine( # noqa: C901 or op.op == types_pb2.PROJECT_TO_SIMPLE or op.op == types_pb2.ADD_LABELS ): - op = self._maybe_register_graph(op, session_id) + op = self._maybe_register_graph(op, self._session_id) - request = message_pb2.RunStepRequest( - session_id=self._session_id, dag_def=dag_def - ) + requests = _generate_runstep_request(self._session_id, dag_def, dag_bodies) error = None # n.b.: avoid raising deep nested error stack to users try: - response = self._analytical_engine_stub.RunStep(request) + response = self._analytical_engine_stub.RunStep(requests) except grpc.RpcError as e: logger.error( "Engine RunStep failed, code: %s, details: %s", @@ -417,9 +445,7 @@ def run_on_analytical_engine( # noqa: C901 self._object_manager.pop(op.attr[types_pb2.APP_NAME].s.decode()) return response.results - def run_on_interactive_engine( - self, session_id, dag_def: op_def_pb2.DagDef, op_results: list - ): + def run_on_interactive_engine(self, dag_def: op_def_pb2.DagDef, op_results: list): for op in dag_def.op: self._key_to_op[op.key] = op op_pre_process( @@ -448,9 +474,7 @@ def run_on_interactive_engine( self._op_result_pool[op.key] = op_result return op_results - def run_on_learning_engine( - self, session_id, dag_def: op_def_pb2.DagDef, op_results: list - ): + def run_on_learning_engine(self, dag_def: op_def_pb2.DagDef, op_results: list): for op in dag_def.op: self._key_to_op[op.key] = op op_pre_process( @@ -471,7 +495,11 @@ def run_on_learning_engine( return op_results def run_on_coordinator( - self, session_id, dag_def: op_def_pb2.DagDef, op_results: list + self, + dag_def: op_def_pb2.DagDef, + dag_bodies, + op_results: list, + loader_op_bodies: dict, ): for op in dag_def.op: self._key_to_op[op.key] = op @@ -483,7 +511,7 @@ def run_on_coordinator( engine_config=self._analytical_engine_config, ) if op.op == types_pb2.DATA_SOURCE: - op_result = self._process_data_source(op) + op_result = self._process_data_source(op, dag_bodies, loader_op_bodies) elif op.op == types_pb2.OUTPUT: op_result = self._output(op) else: @@ -498,30 +526,32 @@ def _make_response(code, msg, full_exc=b""): code=code, error_msg=msg, full_exception=full_exc ) - def RunStep(self, request, context): - op_results = list() + def RunStep(self, request_iterator, context): # split dag - dag_manager = DAGManager(request.dag_def) + dag_manager = DAGManager(request_iterator) + op_results = [] + loader_op_bodies = {} + while not dag_manager.empty(): - next_dag = dag_manager.get_next_dag() - run_dag_on, dag_def = next_dag + run_dag_on, dag, dag_bodies = dag_manager.next_dag() try: if run_dag_on == GSEngine.analytical_engine: + # need dag_bodies to load graph from pandas/numpy error_code = error_codes_pb2.ANALYTICAL_ENGINE_INTERNAL_ERROR self.run_on_analytical_engine( - request.session_id, dag_def, op_results + dag, dag_bodies, op_results, loader_op_bodies ) elif run_dag_on == GSEngine.interactive_engine: error_code = error_codes_pb2.INTERACTIVE_ENGINE_INTERNAL_ERROR - self.run_on_interactive_engine( - request.session_id, dag_def, op_results - ) + self.run_on_interactive_engine(dag, op_results) elif run_dag_on == GSEngine.learning_engine: error_code = error_codes_pb2.LEARNING_ENGINE_INTERNAL_ERROR - self.run_on_learning_engine(request.session_id, dag_def, op_results) + self.run_on_learning_engine(dag, op_results) elif run_dag_on == GSEngine.coordinator: error_code = error_codes_pb2.COORDINATOR_INTERNAL_ERROR - self.run_on_coordinator(request.session_id, dag_def, op_results) + self.run_on_coordinator( + dag, dag_bodies, op_results, loader_op_bodies + ) except grpc.RpcError as exc: # Not raised by graphscope, maybe socket closed, etc context.set_code(exc.code()) @@ -594,7 +624,7 @@ def _maybe_register_graph(self, op, session_id): ) dag_def = op_def_pb2.DagDef() dag_def.op.extend([op_def]) - register_request = message_pb2.RunStepRequest( + register_request = self._grpc_utils.generate_runstep_requests( session_id=session_id, dag_def=dag_def ) error = None # n.b.: avoid raising deep nested error stack to users @@ -785,7 +815,9 @@ def _output(self, op: op_def_pb2.OpDef): ) return op_def_pb2.OpResult(code=error_codes_pb2.OK, key=op.key) - def _process_data_source(self, op: op_def_pb2.OpDef): + def _process_data_source( + self, op: op_def_pb2.OpDef, dag_bodies, loader_op_bodies: dict + ): def _spawn_vineyard_io_stream(source, storage_options, read_options): import vineyard import vineyard.io @@ -814,7 +846,7 @@ def _process_loader_func(loader): # loader is type of attr_value_pb2.Chunk protocol = loader.attr[types_pb2.PROTOCOL].s.decode() if protocol in ("hdfs", "hive", "oss", "s3"): - source = loader.buffer.decode() + source = loader.attr[types_pb2.SOURCE].s.decode() storage_options = json.loads( loader.attr[types_pb2.STORAGE_OPTIONS].s.decode() ) @@ -824,12 +856,18 @@ def _process_loader_func(loader): new_protocol, new_source = _spawn_vineyard_io_stream( source, storage_options, read_options ) - loader.buffer = new_source.encode("utf-8") loader.attr[types_pb2.PROTOCOL].CopyFrom(utils.s_to_attr(new_protocol)) + loader.attr[types_pb2.SOURCE].CopyFrom(utils.s_to_attr(new_source)) - for loader in op.large_attr.chunk_list.items: + for loader in op.large_attr.chunk_meta_list.items: # handle vertex or edge loader if loader.attr[types_pb2.CHUNK_TYPE].s.decode() == "loader": + # set op bodies, this is for loading graph from numpy/pandas + op_bodies = [] + for bodies in dag_bodies: + if bodies.body.op_key == op.key: + op_bodies.append(bodies) + loader_op_bodies[op.key] = op_bodies _process_loader_func(loader) return op_def_pb2.OpResult(code=error_codes_pb2.OK, key=op.key) @@ -894,8 +932,8 @@ def load_subgraph(oid_type, name): new_op_def.key = op.key dag = op_def_pb2.DagDef() dag.op.extend([new_op_def]) - self.run_on_coordinator(self._session_id, coordinator_dag, []) - results = self.run_on_analytical_engine(self._session_id, dag, []) + self.run_on_coordinator(coordinator_dag, [], [], {}) + results = self.run_on_analytical_engine(dag, [], [], {}) logger.info("subgraph has been loaded") return results[-1] @@ -1009,7 +1047,7 @@ def _cleanup(self, cleanup_instance=True, is_dangling=False): if unload_type: dag_def = create_single_op_dag(unload_type, config) - request = message_pb2.RunStepRequest( + request = self._grpc_utils.generate_runstep_requests( session_id=self._session_id, dag_def=dag_def ) try: @@ -1051,12 +1089,12 @@ def _create_grpc_stub(self): def _get_engine_config(self): dag_def = create_single_op_dag(types_pb2.GET_ENGINE_CONFIG) - request = message_pb2.RunStepRequest( + requests = self._grpc_utils.generate_runstep_requests( session_id=self._session_id, dag_def=dag_def ) error = None # n.b.: avoid raising deep nested error stack to users try: - response = self._analytical_engine_stub.RunStep(request) + response = self._analytical_engine_stub.RunStep(requests) except grpc.RpcError as e: logger.error( "Get engine config failed, code: %s, details: %s", diff --git a/coordinator/gscoordinator/dag_manager.py b/coordinator/gscoordinator/dag_manager.py index c222f517d260..eb67bdd107ce 100644 --- a/coordinator/gscoordinator/dag_manager.py +++ b/coordinator/gscoordinator/dag_manager.py @@ -19,7 +19,9 @@ import copy import queue from enum import Enum +from typing import Sequence +from graphscope.proto import message_pb2 from graphscope.proto import op_def_pb2 from graphscope.proto import types_pb2 @@ -68,42 +70,61 @@ class DAGManager(object): types_pb2.OUTPUT, # spawn an io stream to read/write data from/to vineyard ] - def __init__(self, dag_def: op_def_pb2.DagDef): - self._dag_def = dag_def - self._split_dag_def_queue = queue.Queue() - + def __init__(self, request_iterator: Sequence[message_pb2.RunStepRequest]): + self._dag_queue = queue.Queue() + req_head = None + # a list of chunks + req_bodies = [] + for req in request_iterator: + if req.HasField("head"): + req_head = req + else: + req_bodies.append(req) # split dag - split_dag_def = op_def_pb2.DagDef() - split_dag_def_for = GSEngine.analytical_engine - for op in self._dag_def.op: - if op.op in self._analytical_engine_split_op: - if split_dag_def.op: - self._split_dag_def_queue.put((split_dag_def_for, split_dag_def)) - split_dag_def = op_def_pb2.DagDef() - split_dag_def_for = GSEngine.analytical_engine - if op.op in self._interactive_engine_split_op: - if split_dag_def.op: - self._split_dag_def_queue.put((split_dag_def_for, split_dag_def)) - split_dag_def = op_def_pb2.DagDef() - split_dag_def_for = GSEngine.interactive_engine - if op.op in self._learning_engine_split_op: - if split_dag_def.op: - self._split_dag_def_queue.put((split_dag_def_for, split_dag_def)) - split_dag_def = op_def_pb2.DagDef() - split_dag_def_for = GSEngine.learning_engine - if op.op in self._coordinator_split_op: - if split_dag_def.op: - self._split_dag_def_queue.put((split_dag_def_for, split_dag_def)) - split_dag_def = op_def_pb2.DagDef() - split_dag_def_for = GSEngine.coordinator - split_dag_def.op.extend([copy.deepcopy(op)]) - if len(split_dag_def.op) > 0: - self._split_dag_def_queue.put((split_dag_def_for, split_dag_def)) + dag = op_def_pb2.DagDef() + dag_for = GSEngine.analytical_engine + dag_bodies = [] + for op in req_head.head.dag_def.op: + if self.is_splited_op(op): + if dag.op: + self._dag_queue.put((dag_for, dag, dag_bodies)) + # init empty dag + dag = op_def_pb2.DagDef() + dag_for = self.get_op_exec_engine(op) + dag_bodies = [] + # select op + dag.op.extend([copy.deepcopy(op)]) + for req_body in req_bodies: + # select chunks belong to this op + if req_body.body.op_key == op.key: + dag_bodies.append(req_body) + if dag.op: + self._dag_queue.put((dag_for, dag, dag_bodies)) + + def is_splited_op(self, op): + return op.op in ( + self._analytical_engine_split_op + + self._interactive_engine_split_op + + self._learning_engine_split_op + + self._coordinator_split_op + ) + + def get_op_exec_engine(self, op): + op_type = op.op + if op_type in self._analytical_engine_split_op: + return GSEngine.analytical_engine + if op_type in self._interactive_engine_split_op: + return GSEngine.interactive_engine + if op_type in self._learning_engine_split_op: + return GSEngine.learning_engine + if op_type in self._coordinator_split_op: + return GSEngine.coordinator + raise RuntimeError("Op {0} get execution engine failed.".format(op_type)) def empty(self): - return self._split_dag_def_queue.empty() + return self._dag_queue.empty() - def get_next_dag(self): - if not self._split_dag_def_queue.empty(): - return self._split_dag_def_queue.get() - return None + def next_dag(self): + if not self._dag_queue.empty(): + return self._dag_queue.get() + raise RuntimeError("Get element from empty queue.") diff --git a/k8s/Makefile b/k8s/Makefile index 80f3d9f64f38..f7d263ff202a 100644 --- a/k8s/Makefile +++ b/k8s/Makefile @@ -201,7 +201,7 @@ graphscope-client-manylinux2014-py3: cmake -DWITH_VINEYARD=ON .. && \ make graphlearn_shared -j`nproc` && \ export LD_LIBRARY_PATH=/work/learning_engine/graph-learn/built/lib:$$LD_LIBRARY_PATH && \ - for py in cp36-cp36m cp37-cp37m cp38-cp38 cp39-cp39 ; do \ + for py in cp37-cp37m cp38-cp38 cp39-cp39 ; do \ cd /work/python; \ export PATH=/opt/python/$$py/bin:$$PATH; \ pip3 install -U pip numpy==1.18.5 auditwheel==5.0.0 grpcio grpcio_tools wheel ; \ diff --git a/proto/graphscope/proto/attr_value.proto b/proto/graphscope/proto/attr_value.proto index 76337a8c56b3..4d9ecc471e53 100644 --- a/proto/graphscope/proto/attr_value.proto +++ b/proto/graphscope/proto/attr_value.proto @@ -76,8 +76,8 @@ message Chunk { } message ChunkMeta { + // total buffer size of the chunk int64 size = 1; - string op_key = 2; map attr = 3; } diff --git a/proto/graphscope/proto/coordinator_service.proto b/proto/graphscope/proto/coordinator_service.proto index 432355884e4c..465c5f4587e8 100644 --- a/proto/graphscope/proto/coordinator_service.proto +++ b/proto/graphscope/proto/coordinator_service.proto @@ -26,7 +26,7 @@ service CoordinatorService { rpc HeartBeat(HeartBeatRequest) returns (HeartBeatResponse); // Drives the graph computation. - rpc RunStep(RunStepRequest) returns (RunStepResponse); + rpc RunStep(stream RunStepRequest) returns (RunStepResponse); // Fetch engine logs. rpc FetchLogs(FetchLogsRequest) returns (stream FetchLogsResponse); diff --git a/proto/graphscope/proto/engine_service.proto b/proto/graphscope/proto/engine_service.proto index 07f278f4747e..7258227fadab 100644 --- a/proto/graphscope/proto/engine_service.proto +++ b/proto/graphscope/proto/engine_service.proto @@ -21,7 +21,7 @@ import "graphscope/proto/message.proto"; service EngineService { // Drives the graph computation. - rpc RunStep(RunStepRequest) returns (RunStepResponse); + rpc RunStep(stream RunStepRequest) returns (RunStepResponse); rpc HeartBeat(HeartBeatRequest) returns (HeartBeatResponse); } diff --git a/proto/graphscope/proto/message.proto b/proto/graphscope/proto/message.proto index f881b015a157..d19a1d2caa76 100644 --- a/proto/graphscope/proto/message.proto +++ b/proto/graphscope/proto/message.proto @@ -77,16 +77,28 @@ message HeartBeatResponse { // //////////////////////////////////////////////////////////////////////////////// -message RunStepRequest { +message RunStepRequestHead { // REQUIRED: session_id must be returned by a CreateSession call // to the same master service. string session_id = 1; // REQUIRED: A Dag with op that will be evaluated. - // A DagDef will contain only 1 op(ideally). DagDef dag_def = 2; } +message RunStepRequestBody { + bytes chunk = 1; + string op_key = 2; + bool has_next = 3; +} + +message RunStepRequest { + oneof value { + RunStepRequestHead head = 1; + RunStepRequestBody body = 2; + } +} + message RunStepResponse { // list of result of ops in dag repeated OpResult results = 1; diff --git a/proto/graphscope/proto/types.proto b/proto/graphscope/proto/types.proto index 56eda15e436e..0ac1394a26dd 100644 --- a/proto/graphscope/proto/types.proto +++ b/proto/graphscope/proto/types.proto @@ -234,6 +234,7 @@ enum ParamKey { STORAGE_OPTIONS = 321; READ_OPTIONS = 322; FD = 323; // file descriptor + SOURCE = 324; // large attr CHUNK_NAME = 341; diff --git a/python/graphscope/client/rpc.py b/python/graphscope/client/rpc.py index 05f90eb01570..e9584a909e2b 100644 --- a/python/graphscope/client/rpc.py +++ b/python/graphscope/client/rpc.py @@ -27,6 +27,7 @@ import grpc from graphscope.client.utils import GS_GRPC_MAX_MESSAGE_LENGTH +from graphscope.client.utils import GRPCUtils from graphscope.framework.errors import FatalError from graphscope.framework.errors import GRPCError from graphscope.proto import coordinator_service_pb2_grpc @@ -95,6 +96,7 @@ def __init__(self, launcher, endpoint, reconnect=False): ("grpc.max_metadata_size", GS_GRPC_MAX_MESSAGE_LENGTH), ] self._launcher = launcher + self._grpc_utils = GRPCUtils() self._channel = grpc.insecure_channel(endpoint, options=options) self._stub = coordinator_service_pb2_grpc.CoordinatorServiceStub(self._channel) self._session_id = None @@ -142,7 +144,10 @@ def __repr__(self): return str(self) def run(self, dag_def): - return self._run_step_impl(dag_def) + runstep_requests = self._grpc_utils.generate_runstep_requests( + self._session_id, dag_def + ) + return self._run_step_impl(runstep_requests) def fetch_logs(self): if self._logs_fetching_thread is None: @@ -213,11 +218,8 @@ def _close_session_impl(self): return response @catch_grpc_error - def _run_step_impl(self, dag_def): - request = message_pb2.RunStepRequest( - session_id=self._session_id, dag_def=dag_def - ) - response = self._stub.RunStep(request) + def _run_step_impl(self, runstep_requests): + response = self._stub.RunStep(runstep_requests) if response.code != error_codes_pb2.OK: logger.error( "Runstep failed with code: %s, message: %s", diff --git a/python/graphscope/client/session.py b/python/graphscope/client/session.py index 34171948a03f..dcf953d7805f 100755 --- a/python/graphscope/client/session.py +++ b/python/graphscope/client/session.py @@ -95,7 +95,7 @@ def __init__(self, dag, fetches): if hasattr(fetch, "op"): fetch = fetch.op if not isinstance(fetch, Operation): - raise ValueError("Expect a `Operation` in sess run method.") + raise ValueError("Expect an `Operation` in sess run method.") self._ops.append(fetch) # extract sub dag self._sub_dag = dag.extract_subdag_for(self._ops) diff --git a/python/graphscope/client/utils.py b/python/graphscope/client/utils.py index 77e9db1775b8..86cd0fabb85a 100644 --- a/python/graphscope/client/utils.py +++ b/python/graphscope/client/utils.py @@ -18,11 +18,14 @@ import inspect import logging +import os import signal import sys from functools import wraps from graphscope.config import GSConfig as gs_config +from graphscope.proto import attr_value_pb2 +from graphscope.proto import message_pb2 logger = logging.getLogger("graphscope") @@ -30,6 +33,77 @@ GS_GRPC_MAX_MESSAGE_LENGTH = 2 * 1024 * 1024 * 1024 - 1 +class GRPCUtils(object): + # default to 256MB + CHUNK_SIZE = ( + int(os.environ["GS_GRPC_CHUNK_SIZE"]) + if "GS_GRPC_CHUNK_SIZE" in os.environ + else 256 * 1024 * 1024 - 1 + ) + + def _generate_chunk_meta(self, chunk): + chunk_meta = attr_value_pb2.ChunkMeta() + chunk_meta.size = len(chunk.buffer) + for k, v in chunk.attr.items(): + chunk_meta.attr[k].CopyFrom(v) + return chunk_meta + + def split(self, dag_def): + """Traverse `large_attr` of op and split into a list of chunks. + + Note that this method will modify `large_attr` attribute of op in dag_def. + + Returns: + Sequence[Sequence[bytes]]: splited chunks. + """ + chunks_list = [] + for op in dag_def.op: + large_attr = attr_value_pb2.LargeAttrValue() + for chunk in op.large_attr.chunk_list.items: + # construct chunk meta + large_attr.chunk_meta_list.items.extend( + [self._generate_chunk_meta(chunk)] + ) + # split buffer + chunks_list.append( + ( + [ + chunk.buffer[i : i + self.CHUNK_SIZE] + for i in range(0, len(chunk.buffer), self.CHUNK_SIZE) + ], + op.key, + ) + ) + # replace chunk with chunk_meta + op.large_attr.CopyFrom(large_attr) + return chunks_list + + def generate_runstep_requests(self, session_id, dag_def): + runstep_requests = [] + chunks_list = self.split(dag_def) + # head + runstep_request = message_pb2.RunStepRequest( + head=message_pb2.RunStepRequestHead(session_id=session_id, dag_def=dag_def) + ) + runstep_requests.append(runstep_request) + # bodies + for chunks, op_key in chunks_list: + for i, chunk in enumerate(chunks): + # check the last element + has_next = True + if i + 1 == len(chunks): + has_next = False + runstep_request = message_pb2.RunStepRequest( + body=message_pb2.RunStepRequestBody( + chunk=chunk, op_key=op_key, has_next=has_next + ) + ) + runstep_requests.append(runstep_request) + # return a generator for stream request + for item in runstep_requests: + yield item + + class ConditionalFormatter(logging.Formatter): """Provide an option to disable format for some messages. Taken from https://stackoverflow.com/questions/34954373/disable-format-for-some-messages diff --git a/python/graphscope/framework/loader.py b/python/graphscope/framework/loader.py index 370b3e6b37b4..e2a817082118 100644 --- a/python/graphscope/framework/loader.py +++ b/python/graphscope/framework/loader.py @@ -246,11 +246,11 @@ def get_attr(self): # Maybe handled by vineyard in the near future if self.protocol == "file": source = "{}#{}".format(self.source, self.options) - config[types_pb2.VALUES] = source.encode("utf-8") + config[types_pb2.SOURCE] = utils.s_to_attr(source) elif self.protocol == "pandas": config[types_pb2.VALUES] = self.source else: # Let vineyard handle other data source. - config[types_pb2.VALUES] = self.source.encode("utf-8") + config[types_pb2.SOURCE] = utils.s_to_attr(self.source) if self.protocol != "vineyard": # need spawn an io stream in coordinator config[types_pb2.STORAGE_OPTIONS] = utils.s_to_attr( diff --git a/python/graphscope/tests/conftest.py b/python/graphscope/tests/conftest.py index e77451f46494..b60fdf086630 100644 --- a/python/graphscope/tests/conftest.py +++ b/python/graphscope/tests/conftest.py @@ -423,6 +423,19 @@ def p2p_property_graph(graphscope_session): g.unload() +@pytest.fixture(scope="module") +def p2p_graph_from_pandas(graphscope_session): + # set chunk size to 1k + os.environ["GS_GRPC_CHUNK_SIZE"] = str(1024 - 1) + df_v = pd.read_csv(f"{property_dir}/p2p-31_property_v_0", sep=",") + df_e = pd.read_csv(f"{property_dir}/p2p-31_property_e_0", sep=",") + g = graphscope_session.g(generate_eid=False) + g = g.add_vertices(df_v, "person") + g = g.add_edges(df_e, label="knows", src_label="person", dst_label="person") + yield g + g.unload() + + @pytest.fixture(scope="module") def p2p_property_graph_string(graphscope_session): g = graphscope_session.g(oid_type="string", generate_eid=False) diff --git a/python/graphscope/tests/unittest/test_app.py b/python/graphscope/tests/unittest/test_app.py index 48987424187a..77aa5e3ffa7e 100644 --- a/python/graphscope/tests/unittest/test_app.py +++ b/python/graphscope/tests/unittest/test_app.py @@ -68,6 +68,18 @@ def test_run_app_on_property_graph(arrow_property_graph, twitter_sssp_result): assert np.allclose(r1, twitter_sssp_result) +@pytest.mark.skipif("FULL_TEST_SUITE" not in os.environ, reason="Run in nightly CI") +def test_run_app_on_pandas_graph(p2p_graph_from_pandas, sssp_result): + ctx1 = sssp(p2p_graph_from_pandas, src=6, weight="dist") + r1 = ( + ctx1.to_dataframe({"node": "v.id", "r": "r"}) + .sort_values(by=["node"]) + .to_numpy(dtype=float) + ) + r1[r1 == 1.7976931348623157e308] = float("inf") # replace limit::max with inf + assert np.allclose(r1, sssp_result["directed"]) + + def test_run_app_on_directed_graph( p2p_project_directed_graph, sssp_result,