Skip to content

Commit

Permalink
Stream RunStepRequest of grpc to support loading graph from numpy/pan…
Browse files Browse the repository at this point in the history
…das more than 2GB (#1309)

* Test chunk split in nightly CI
  • Loading branch information
lidongze0629 committed Feb 9, 2022
1 parent cdb0dcf commit 6918b76
Show file tree
Hide file tree
Showing 19 changed files with 378 additions and 108 deletions.
14 changes: 11 additions & 3 deletions analytical_engine/core/io/property_parser.h
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,11 @@ inline void ParseVertex(std::shared_ptr<detail::Graph>& graph,
vertex->label = attrs.at(rpc::LABEL).s();
vertex->vid = attrs.at(rpc::VID).s();
vertex->protocol = attrs.at(rpc::PROTOCOL).s();
vertex->values = data;
if (vertex->protocol == "pandas") {
vertex->values = data;
} else {
vertex->values = attrs.at(rpc::SOURCE).s();
}
graph->vertices.push_back(vertex);
}

Expand All @@ -163,7 +167,11 @@ inline void ParseEdge(std::shared_ptr<detail::Graph>& graph,
sub_label.dst_vid = attrs.at(rpc::DST_VID).s();
sub_label.load_strategy = attrs.at(rpc::LOAD_STRATEGY).s();
sub_label.protocol = attrs.at(rpc::PROTOCOL).s();
sub_label.values = data;
if (sub_label.protocol == "pandas") {
sub_label.values = data;
} else {
sub_label.values = attrs.at(rpc::SOURCE).s();
}
edge->sub_labels.push_back(sub_label);

if (!has_edge_label) {
Expand Down Expand Up @@ -211,7 +219,7 @@ inline void DistributeChunk(const rpc::Chunk& chunk, int num,
if (protocol == "pandas") {
SplitTable(data, num, distributed_values);
} else {
distributed_values.resize(num, data);
distributed_values.resize(num, attrs.at(rpc::SOURCE).s());
}
for (int i = 0; i < num; ++i) {
distributed_chunk[i].set_buffer(std::move(distributed_values[i]));
Expand Down
64 changes: 54 additions & 10 deletions analytical_engine/core/server/command_detail.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,34 +21,78 @@ namespace gs {

grape::InArchive& operator<<(grape::InArchive& archive,
const CommandDetail& cd) {
// type
archive << cd.type;
// params
std::map<int, std::string> buffer;
for (auto& pair : cd.params) {
buffer[pair.first] = pair.second.SerializeAsString();
}

archive << cd.type;
archive << buffer;
archive << cd.large_attr.SerializeAsString();
// large attr
bool has_chunk_list = cd.large_attr.has_chunk_list();
archive << has_chunk_list;
if (has_chunk_list) {
size_t chunk_list_size = cd.large_attr.chunk_list().items().size();
archive << chunk_list_size;
for (size_t i = 0; i < chunk_list_size; ++i) {
const auto& chunk = cd.large_attr.chunk_list().items(i);
// buffer
archive << chunk.buffer();
// attr
std::map<int, std::string> attr;
for (auto& pair : chunk.attr()) {
attr[pair.first] = pair.second.SerializeAsString();
}
archive << attr;
}
}
// query_args
archive << cd.query_args.SerializeAsString();

return archive;
}

grape::OutArchive& operator>>(grape::OutArchive& archive, CommandDetail& cd) {
std::map<int, std::string> buffer;
std::string s_large_attr, s_args;

// type
archive >> cd.type;
// params
std::map<int, std::string> buffer;
archive >> buffer;
archive >> s_large_attr;
archive >> s_args;

for (auto& pair : buffer) {
rpc::AttrValue attr_value;
attr_value.ParseFromString(pair.second);
cd.params[pair.first] = attr_value;
}
cd.large_attr.ParseFromString(s_large_attr);
// large attr
bool has_chunk_list;
archive >> has_chunk_list;
if (has_chunk_list) {
size_t chunk_list_size;
archive >> chunk_list_size;
if (chunk_list_size > 0) {
auto* chunk_list = cd.large_attr.mutable_chunk_list();
for (size_t i = 0; i < chunk_list_size; ++i) {
auto* chunk = chunk_list->add_items();
// buffer
std::string buf;
archive >> buf;
chunk->set_buffer(std::move(buf));
// attr
auto* mutable_attr = chunk->mutable_attr();
std::map<int, std::string> attr;
archive >> attr;
for (auto& pair : attr) {
rpc::AttrValue attr_value;
attr_value.ParseFromString(pair.second);
(*mutable_attr)[pair.first].CopyFrom(attr_value);
}
}
}
}
// query_args
std::string s_args;
archive >> s_args;
cd.query_args.ParseFromString(s_args);

return archive;
Expand Down
59 changes: 50 additions & 9 deletions analytical_engine/core/server/graphscope_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

#include "core/server/graphscope_service.h"

#include <queue>
#include <unordered_map>

#include "google/protobuf/util/message_differencer.h"
Expand All @@ -24,22 +25,62 @@
namespace gs {
namespace rpc {

using ::grpc::Status;
using ::grpc::StatusCode;

Status GraphScopeService::HeartBeat(::grpc::ServerContext* context,
Status GraphScopeService::HeartBeat(ServerContext* context,
const HeartBeatRequest* request,
HeartBeatResponse* response) {
return Status::OK;
}

::grpc::Status GraphScopeService::RunStep(::grpc::ServerContext* context,
const RunStepRequest* request,
::grpc::Status GraphScopeService::RunStep(ServerContext* context,
ServerReader<RunStepRequest>* stream,
RunStepResponse* response) {
CHECK(request->has_dag_def());
const DagDef& dag_def = request->dag_def();
std::unordered_map<std::string, OpResult*> op_key_to_result;
// ServerReaderWriter<RunStepRequest, RunStepResponse>* stream) {
DagDef dag_def;
std::queue<std::string> chunks;
RunStepRequest request;
bool has_next = true;
// read stream request and join the chunk
while (stream->Read(&request)) {
if (request.has_head()) {
// head is always the first in the stream
// get a copy of 'dag_def' and set the 'large_attr' from body later.
dag_def = request.head().dag_def();
} else {
// body
if (chunks.empty() || has_next == false) {
chunks.push("");
}
auto& chunk = chunks.back();
chunk += request.body().chunk();
has_next = request.body().has_next();
}
}
// fill the chunks into dag_def
auto* ops = dag_def.mutable_op();
for (auto& op : *ops) {
LargeAttrValue large_attr = op.large_attr();
if (large_attr.has_chunk_meta_list()) {
auto* mutable_large_attr = op.mutable_large_attr();
auto* chunk_list = mutable_large_attr->mutable_chunk_list();
for (const auto& chunk_meta : large_attr.chunk_meta_list().items()) {
auto* chunk = chunk_list->add_items();
if (chunk_meta.size() > 0) {
// set buffer
chunk->set_buffer(std::move(chunks.front()));
chunks.pop();
}
// copy attr from chunk_meta
auto* mutable_attr = chunk->mutable_attr();
for (auto& attr : chunk_meta.attr()) {
(*mutable_attr)[attr.first].CopyFrom(attr.second);
}
}
}
}
assert(chunks.empty());

// execute the dag
std::unordered_map<std::string, OpResult*> op_key_to_result;
for (const auto& op : dag_def.op()) {
OpResult* op_result = response->add_results();
op_result->set_key(op.key());
Expand Down
5 changes: 4 additions & 1 deletion analytical_engine/core/server/graphscope_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <utility>

#include "core/server/dispatcher.h"
#include "proto/graphscope/proto/attr_value.pb.h"
#include "proto/graphscope/proto/engine_service.grpc.pb.h"
#include "proto/graphscope/proto/graph_def.pb.h"
#include "proto/graphscope/proto/op_def.pb.h"
Expand All @@ -30,6 +31,8 @@ namespace gs {
namespace rpc {

using grpc::ServerContext;
using ::grpc::ServerReader;
using ::grpc::ServerReaderWriter;
using grpc::Status;
using grpc::StatusCode;

Expand All @@ -43,7 +46,7 @@ class GraphScopeService final : public EngineService::Service {
: dispatcher_(std::move(dispatcher)) {}

::grpc::Status RunStep(::grpc::ServerContext* context,
const RunStepRequest* request,
ServerReader<RunStepRequest>* stream,
RunStepResponse* response) override;

::grpc::Status HeartBeat(::grpc::ServerContext* context,
Expand Down
1 change: 1 addition & 0 deletions analytical_engine/core/server/rpc_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include "core/error.h"
#include "core/server/command_detail.h"
#include "proto/graphscope/proto/attr_value.pb.h"
#include "proto/graphscope/proto/message.pb.h"
#include "proto/graphscope/proto/op_def.pb.h"
#include "proto/graphscope/proto/types.pb.h"

Expand Down
Loading

0 comments on commit 6918b76

Please sign in to comment.