Skip to content

Commit

Permalink
Add 'large_attr' data structure in op definition for large chunk (dat…
Browse files Browse the repository at this point in the history
…a of pandas/numpy) (#1303)

* shrink grpc max_len for debug
* Add LargeAttrValue in op
* update
* update
* Make CommandDetail shared_ptr
* include memory in rpc_utils.h
  • Loading branch information
lidongze0629 committed Jan 27, 2022
1 parent e554e93 commit 10f2580
Show file tree
Hide file tree
Showing 22 changed files with 242 additions and 416 deletions.
10 changes: 5 additions & 5 deletions analytical_engine/core/grape_instance.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1056,11 +1056,11 @@ bl::result<void> GrapeInstance::registerGraphType(const rpc::GSParams& params) {
}

bl::result<std::shared_ptr<DispatchResult>> GrapeInstance::OnReceive(
const CommandDetail& cmd) {
std::shared_ptr<CommandDetail> cmd) {
auto r = std::make_shared<DispatchResult>(comm_spec_.worker_id());
rpc::GSParams params(cmd.params);
rpc::GSParams params(cmd->params, cmd->large_attr);

switch (cmd.type) {
switch (cmd->type) {
case rpc::CREATE_GRAPH: {
BOOST_LEAF_AUTO(graph_def, loadGraph(params));
r->set_graph_def(graph_def);
Expand All @@ -1076,7 +1076,7 @@ bl::result<std::shared_ptr<DispatchResult>> GrapeInstance::OnReceive(
break;
}
case rpc::RUN_APP: {
BOOST_LEAF_AUTO(context_key, query(params, cmd.query_args));
BOOST_LEAF_AUTO(context_key, query(params, cmd->query_args));
r->set_data(context_key);
break;
}
Expand Down Expand Up @@ -1273,7 +1273,7 @@ bl::result<std::shared_ptr<DispatchResult>> GrapeInstance::OnReceive(
}
default:
RETURN_GS_ERROR(vineyard::ErrorCode::kInvalidValueError,
"Unsupported command type: " + std::to_string(cmd.type));
"Unsupported command type: " + std::to_string(cmd->type));
}
return r;
}
Expand Down
2 changes: 1 addition & 1 deletion analytical_engine/core/grape_instance.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ class GrapeInstance : public Subscriber {
void Init(const std::string& vineyard_socket);

bl::result<std::shared_ptr<DispatchResult>> OnReceive(
const CommandDetail& cmd) override;
std::shared_ptr<CommandDetail> cmd) override;

private:
bl::result<rpc::graph::GraphDefPb> loadGraph(const rpc::GSParams& params);
Expand Down
225 changes: 63 additions & 162 deletions analytical_engine/core/io/property_parser.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,10 @@ template <typename Key, typename Value>
using ProtobufMap = ::google::protobuf::Map<Key, Value>;

using gs::rpc::AttrValue;
using gs::rpc::Chunk;
using gs::rpc::DataType;
using gs::rpc::GSParams;
using gs::rpc::LargeAttrValue;
using gs::rpc::OpDef;

using AttrMap = ProtobufMap<int, AttrValue>;
Expand All @@ -60,9 +62,6 @@ struct Vertex {
std::string protocol; // file/oss/numpy/pandas/vineyard
std::string values; // from location, vineyard or pandas

// The following fields are only needed when protocol is numpy/pandas
std::vector<std::pair<std::string, DataType>> properties;

std::string SerializeToString() const {
std::stringstream ss;
ss << "V ";
Expand All @@ -85,7 +84,6 @@ class Edge {
std::string load_strategy;
std::string protocol;
std::string values;
std::vector<std::pair<std::string, DataType>> properties;

std::string SerializeToString() const {
std::stringstream ss;
Expand Down Expand Up @@ -134,51 +132,50 @@ struct Graph {
};
} // namespace detail

inline void ParseLoader(std::string& protocol, std::string& values,
const AttrMap& attrs) {
protocol = attrs.at(rpc::PROTOCOL).s();
values = attrs.at(rpc::VALUES).s();
inline void ParseVertex(std::shared_ptr<detail::Graph>& graph,
const std::string& data, const AttrMap& attrs) {
auto vertex = std::make_shared<detail::Vertex>();
vertex->label = attrs.at(rpc::LABEL).s();
vertex->vid = attrs.at(rpc::VID).s();
vertex->protocol = attrs.at(rpc::PROTOCOL).s();
vertex->values = data;
graph->vertices.push_back(vertex);
}

inline detail::Edge::SubLabel ParseSubLabel(const AttrMap& attrs) {
inline void ParseEdge(std::shared_ptr<detail::Graph>& graph,
const std::string& data, const AttrMap& attrs) {
std::string label = attrs.at(rpc::LABEL).s();

bool has_edge_label = false;
if (!graph->edges.empty() && graph->edges.back()->label == label) {
has_edge_label = true;
}

auto edge =
has_edge_label ? graph->edges.back() : std::make_shared<detail::Edge>();
edge->label = label;

// sub_label: src_label / dst_label
detail::Edge::SubLabel sub_label;
sub_label.src_label = attrs.at(rpc::SRC_LABEL).s();
sub_label.dst_label = attrs.at(rpc::DST_LABEL).s();
sub_label.src_vid = attrs.at(rpc::SRC_VID).s();
sub_label.dst_vid = attrs.at(rpc::DST_VID).s();
sub_label.load_strategy = attrs.at(rpc::LOAD_STRATEGY).s();
sub_label.protocol = attrs.at(rpc::PROTOCOL).s();
sub_label.values = data;
edge->sub_labels.push_back(sub_label);

ParseLoader(sub_label.protocol, sub_label.values,
attrs.at(rpc::LOADER).func().attr());

return sub_label;
}

inline std::shared_ptr<detail::Vertex> ParseVertex(const AttrMap& attrs) {
auto vertex = std::make_shared<detail::Vertex>();
vertex->label = attrs.at(rpc::LABEL).s();
vertex->vid = attrs.at(rpc::VID).s();

ParseLoader(vertex->protocol, vertex->values,
attrs.at(rpc::LOADER).func().attr());
return vertex;
}

inline std::shared_ptr<detail::Edge> ParseEdge(const AttrMap& attrs) {
auto edge = std::make_shared<detail::Edge>();
edge->label = attrs.at(rpc::LABEL).s();

auto sub_label_defs = attrs.at(rpc::SUB_LABEL).list().func();
for (const auto& sub_label_def : sub_label_defs) {
edge->sub_labels.push_back(ParseSubLabel(sub_label_def.attr()));
if (!has_edge_label) {
graph->edges.push_back(edge);
}

return edge;
}

// The input string is the serialized bytes of an arrow::Table, this function
// split the table to several small tables.
inline std::vector<std::string> SplitTable(const std::string& data, int num) {
inline void SplitTable(const std::string& data, int num,
std::vector<std::string>& sliced_bytes) {
sliced_bytes.resize(num);
std::shared_ptr<arrow::Buffer> buffer =
arrow::Buffer::Wrap(data.data(), data.size());
std::shared_ptr<arrow::Table> table;
Expand All @@ -195,149 +192,55 @@ inline std::vector<std::string> SplitTable(const std::string& data, int num) {
sliced_tables[i] = sliced;
cur += offset;
}
std::vector<std::string> sliced_bytes(num);
for (int i = 0; i < num; ++i) {
if (sliced_tables[i]->num_rows() > 0) {
std::shared_ptr<arrow::Buffer> out_buf;
vineyard::SerializeTable(sliced_tables[i], &out_buf);
sliced_bytes[i] = out_buf->ToString();
}
}
return sliced_bytes;
}

inline std::vector<AttrMap> DistributeLoader(const AttrMap& attrs, int num) {
std::vector<AttrMap> distributed_attrs(num);

inline void DistributeChunk(const rpc::Chunk& chunk, int num,
std::vector<rpc::Chunk>& distributed_chunk) {
distributed_chunk.resize(num);
const auto& attrs = chunk.attr();
std::string protocol = attrs.at(rpc::PROTOCOL).s();
std::vector<std::string> distributed_values;
const std::string& data = attrs.at(rpc::VALUES).s();
const std::string& data = chunk.buffer();
if (protocol == "pandas") {
distributed_values = SplitTable(data, num);
SplitTable(data, num, distributed_values);
} else {
distributed_values.resize(num, data);
}
for (int i = 0; i < num; ++i) {
distributed_attrs[i][rpc::PROTOCOL].set_s(protocol);
distributed_attrs[i][rpc::VALUES].set_s(std::move(distributed_values[i]));
}
return distributed_attrs;
}

inline std::vector<AttrMap> DistributeVertex(const AttrMap& attrs, int num) {
auto loader_name = attrs.at(rpc::LOADER).func().name();
auto loader_attr = attrs.at(rpc::LOADER).func().attr();
auto sliced_attrs = DistributeLoader(loader_attr, num);

std::vector<AttrMap> distributed_attrs(num);
for (int i = 0; i < num; ++i) {
rpc::NameAttrList* list = new rpc::NameAttrList();
list->set_name(loader_name);
list->mutable_attr()->swap(sliced_attrs[i]);
distributed_attrs[i][rpc::LOADER].set_allocated_func(list);
}
for (auto& pair : attrs) {
if (pair.first != rpc::LOADER) {
for (int i = 0; i < num; ++i) {
distributed_attrs[i][pair.first].CopyFrom(pair.second);
}
distributed_chunk[i].set_buffer(std::move(distributed_values[i]));
auto* attr = distributed_chunk[i].mutable_attr();
for (auto& pair : attrs) {
(*attr)[pair.first].CopyFrom(pair.second);
}
}

return distributed_attrs;
}

inline std::vector<AttrMap> DistributeSubLabel(const AttrMap& attrs, int num) {
auto loader_name = attrs.at(rpc::LOADER).func().name();
auto loader_attr = attrs.at(rpc::LOADER).func().attr();
auto sliced_attrs = DistributeLoader(loader_attr, num);

std::vector<AttrMap> distributed_attrs(num);
for (int i = 0; i < num; ++i) {
rpc::NameAttrList* list = new rpc::NameAttrList();
list->set_name(loader_name);
list->mutable_attr()->swap(sliced_attrs[i]);
distributed_attrs[i][rpc::LOADER].set_allocated_func(list);
}
for (auto& pair : attrs) {
if (pair.first != rpc::LOADER) {
for (int i = 0; i < num; ++i) {
distributed_attrs[i][pair.first].CopyFrom(pair.second);
}
}
}

return distributed_attrs;
}

inline std::vector<AttrMap> DistributeEdge(const AttrMap& attrs, int num) {
std::vector<AttrMap> distributed_attrs(num);
std::vector<std::pair<std::string, std::vector<AttrMap>>>
named_distributed_sub_labels;
auto sub_label_defs = attrs.at(rpc::SUB_LABEL).list().func();
for (const auto& sub_label_def : sub_label_defs) {
auto sub_label_attrs = DistributeSubLabel(sub_label_def.attr(), num);
named_distributed_sub_labels.emplace_back(sub_label_def.name(),
std::move(sub_label_attrs));
}

for (int i = 0; i < num; ++i) {
for (auto& pair : named_distributed_sub_labels) {
rpc::NameAttrList* func =
distributed_attrs[i][rpc::SUB_LABEL].mutable_list()->add_func();
func->set_name(pair.first);
func->mutable_attr()->swap(pair.second[i]);
}
}
for (auto& pair : attrs) {
if (pair.first != rpc::SUB_LABEL) {
for (int i = 0; i < num; ++i) {
distributed_attrs[i][pair.first].CopyFrom(pair.second);
}
}
}
return distributed_attrs;
}

// If contains contents from numpy or pandas, then we should distribute those
// raw bytes evenly across all workers, each worker would only receive a slice,
// in order to reduce the communication overhead.
inline std::vector<std::map<int, rpc::AttrValue>> DistributeGraph(
const std::map<int, rpc::AttrValue>& params, int num) {
std::vector<std::map<int, rpc::AttrValue>> distributed_graph(num);
// Initialize empty property definition, maybe filled afterwards.
for (int i = 0; i < num; ++i) {
distributed_graph[i][rpc::ARROW_PROPERTY_DEFINITION] = rpc::AttrValue();
}

if (params.find(rpc::ARROW_PROPERTY_DEFINITION) != params.end()) {
auto items = params.at(rpc::ARROW_PROPERTY_DEFINITION).list().func();
std::vector<std::pair<std::string, std::vector<AttrMap>>> named_items;

for (const auto& item : items) {
std::vector<AttrMap> vec;
if (item.name() == "vertex") {
vec = DistributeVertex(item.attr(), num);
} else if (item.name() == "edge") {
vec = DistributeEdge(item.attr(), num);
}
named_items.emplace_back(item.name(), std::move(vec));
inline std::vector<rpc::LargeAttrValue> DistributeGraph(
const rpc::LargeAttrValue& large_attr, int num) {
std::vector<rpc::LargeAttrValue> distributed_graph(num);
if (large_attr.has_chunk_list()) {
size_t chunk_list_size = large_attr.chunk_list().items().size();
std::vector<std::vector<rpc::Chunk>> distributed_vec(chunk_list_size);
// split
for (size_t i = 0; i < chunk_list_size; ++i) {
DistributeChunk(large_attr.chunk_list().items(i), num,
distributed_vec[i]);
}
for (int i = 0; i < num; ++i) {
for (auto& pair : named_items) {
rpc::NameAttrList* func =
distributed_graph[i][rpc::ARROW_PROPERTY_DEFINITION]
.mutable_list()
->add_func();
func->set_name(pair.first);
func->mutable_attr()->swap(pair.second[i]);
}
}
}
for (auto& pair : params) {
if (pair.first != rpc::ARROW_PROPERTY_DEFINITION) {
for (int i = 0; i < num; ++i) {
distributed_graph[i][pair.first].CopyFrom(pair.second);
for (auto& vec : distributed_vec) {
rpc::Chunk* chunk =
distributed_graph[i].mutable_chunk_list()->add_items();
chunk->Swap(&vec[i]);
}
}
}
Expand All @@ -346,22 +249,20 @@ inline std::vector<std::map<int, rpc::AttrValue>> DistributeGraph(

inline bl::result<std::shared_ptr<detail::Graph>> ParseCreatePropertyGraph(
const GSParams& params) {
BOOST_LEAF_AUTO(list, params.Get<rpc::AttrValue_ListValue>(
rpc::ARROW_PROPERTY_DEFINITION));
BOOST_LEAF_AUTO(directed, params.Get<bool>(rpc::DIRECTED));
BOOST_LEAF_AUTO(generate_eid, params.Get<bool>(rpc::GENERATE_EID));

auto& items = list.func();
auto graph = std::make_shared<detail::Graph>();

graph->directed = directed;
graph->generate_eid = generate_eid;

for (const auto& item : items) {
if (item.name() == "vertex") {
graph->vertices.push_back(ParseVertex(item.attr()));
} else if (item.name() == "edge") {
graph->edges.push_back(ParseEdge(item.attr()));
const auto& large_attr = params.GetLargeAttr();
for (const auto& item : large_attr.chunk_list().items()) {
const auto& chunk_attr = item.attr();
if (chunk_attr.at(rpc::CHUNK_NAME).s() == "vertex") {
ParseVertex(graph, item.buffer(), chunk_attr);
} else if (chunk_attr.at(rpc::CHUNK_NAME).s() == "edge") {
ParseEdge(graph, item.buffer(), chunk_attr);
}
}
return graph;
Expand Down
Loading

0 comments on commit 10f2580

Please sign in to comment.