Skip to content

Commit

Permalink
Add Lazy mode support to GraphScope (#340)
Browse files Browse the repository at this point in the history
* Add Lazy mode support to GraphScope

   As an important performance optimization technique, lazy evaluation has been widely applied by many big data processing systems like TensorFlow, which provides three-fold benefits compared with eager evaluation. 

   Eager VS Lazy:

Eager execution is a flexible platform for research and experimentation, it provides: 
      - An intuitive interface: Quickly test on small data.
      - Easier debugging: Call ops directly to inspect running models and test changes.

Lazy execution means GraphScope does not process the data till it has to. It just gathers all the information to a DAG that we feed into it, and processes only when we execute code `sess.run(fetches)`
  • Loading branch information
lidongze0629 committed Jun 10, 2021
1 parent 65a0f4e commit e4d0ebd
Show file tree
Hide file tree
Showing 35 changed files with 2,676 additions and 1,223 deletions.
5 changes: 3 additions & 2 deletions .github/workflows/ci.yml
Expand Up @@ -340,8 +340,9 @@ jobs:
export GS_TEST_DIR='/root/gstest'
python3 -m pytest --exitfirst -s -v python/graphscope/nx/tests/classes
python3 -m pytest --exitfirst -s -v python/graphscope/nx/tests/test_nx.py
python3 -m pytest --exitfirst -s -v python/graphscope/nx/tests/algorithms/builtin \
python/graphscope/nx/tests/test_utils.py
python3 -m pytest --exitfirst -s -v python/graphscope/nx/tests/test_ctx_builtin.py
python3 -m pytest --exitfirst -s -v python/graphscope/nx/tests/algorithms/builtin
python3 -m pytest --exitfirst -s -v python/graphscope/nx/tests/test_utils.py
pkill -TERM etcd || true
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/local.yml
Expand Up @@ -85,7 +85,7 @@ jobs:
- name: Install libvineyard
run: |
git clone -b v0.2.3 --single-branch --depth=1 https://github.com/alibaba/libvineyard.git
git clone -b main-v0.2.3 --single-branch --depth=1 https://github.com/alibaba/libvineyard.git
cd libvineyard
git submodule update --init
mkdir build && pushd build
Expand Down
4 changes: 4 additions & 0 deletions analytical_engine/core/grape_instance.cc
Expand Up @@ -897,6 +897,10 @@ bl::result<std::shared_ptr<DispatchResult>> GrapeInstance::OnReceive(
break;
}
case rpc::CREATE_APP: {
// do nothing
break;
}
case rpc::BIND_APP: {
BOOST_LEAF_AUTO(app_name, loadApp(params));
r->set_data(app_name);
break;
Expand Down
159 changes: 86 additions & 73 deletions analytical_engine/core/server/graphscope_service.cc
Expand Up @@ -13,6 +13,8 @@
* limitations under the License.
*/

#include <unordered_map>

#include "core/server/graphscope_service.h"

#include "core/server/rpc_utils.h"
Expand All @@ -34,91 +36,102 @@ ::grpc::Status GraphScopeService::RunStep(::grpc::ServerContext* context,
RunStepResponse* response) {
CHECK(request->has_dag_def());
const DagDef& dag_def = request->dag_def();
CHECK_EQ(dag_def.op().size(), 1);
const auto& op = dag_def.op(0);

CommandDetail cmd = OpToCmd(op);
auto result = dispatcher_->Dispatch(cmd);
auto policy = result[0].aggregate_policy();
bool success = true;
std::string error_msgs;

// First pass: make sure all result are valid and check the consistency
for (auto& e : result) {
auto ok = (e.error_code() == rpc::Code::OK);

if (ok) {
CHECK_EQ(e.aggregate_policy(), policy);
auto& graph_def = e.graph_def();

if (!graph_def.key().empty()) {
if (response->graph_def().key().empty()) {
response->mutable_graph_def()->CopyFrom(graph_def);
} else if (graph_def.SerializeAsString() !=
response->graph_def().SerializeAsString()) {
LOG(FATAL) << "BUG: Multiple workers return different graph def.";
}
}
} else {
error_msgs += e.message() + "\n";
}

success &= ok;
}

auto* res_status = response->mutable_status();
std::unordered_map<std::string, OpResult*> op_key_to_result;

if (!success) {
res_status->set_code(rpc::Code::ANALYTICAL_ENGINE_INTERNAL_ERROR);
res_status->set_error_msg(error_msgs);
OpDef* opdef = res_status->mutable_op();
opdef->CopyFrom(op);
}
for (const auto& op : dag_def.op()) {
OpResult* op_result = response->add_results();
op_result->set_key(op.key());
op_key_to_result.emplace(op.key(), op_result);
CommandDetail cmd = OpToCmd(op);

// Second pass: aggregate result according to the policy
switch (policy) {
case DispatchResult::AggregatePolicy::kPickFirst: {
response->mutable_result()->assign(result[0].data());
break;
}
case DispatchResult::AggregatePolicy::kPickFirstNonEmpty: {
for (auto& e : result) {
auto& data = e.data();
bool success = true;
std::string error_msgs;
auto result = dispatcher_->Dispatch(cmd);
auto policy = result[0].aggregate_policy();

if (!data.empty()) {
response->mutable_result()->assign(data.begin(), data.end());
break;
// First pass: make sure all result are valid and check the consistency
for (auto& e : result) {
auto ok = (e.error_code() == rpc::Code::OK);
if (ok) {
CHECK_EQ(e.aggregate_policy(), policy);
auto& graph_def = e.graph_def();

if (!graph_def.key().empty()) {
if (op_result->graph_def().key().empty()) {
op_result->mutable_graph_def()->CopyFrom(graph_def);
} else if (graph_def.SerializeAsString() !=
op_result->graph_def().SerializeAsString()) {
LOG(FATAL) << "BUG: Multiple workers return different graph def.";
}
}
} else {
error_msgs += e.message() + "\n";
op_result->set_code(e.error_code());
}
success &= ok;
}
break;
}
case DispatchResult::AggregatePolicy::kRequireConsistent: {
for (auto& e : result) {
auto& data = e.data();

if (response->result().empty()) {
response->mutable_result()->assign(data.begin(), data.end());
} else if (response->result() != data) {
std::stringstream ss;
if (!success) {
res_status->set_code(rpc::Code::ANALYTICAL_ENGINE_INTERNAL_ERROR);
res_status->set_error_msg(error_msgs);
op_result->set_error_msg(error_msgs);
// break dag exection flow
break;
}

ss << "Error: Multiple workers return different data."
<< " Current worker id: " << e.worker_id() << " " << data
<< " vs the previous: " << response->result();
// Second pass: aggregate result according to the policy
switch (policy) {
case DispatchResult::AggregatePolicy::kPickFirst: {
op_result->mutable_result()->assign(result[0].data());
break;
}
case DispatchResult::AggregatePolicy::kPickFirstNonEmpty: {
for (auto& e : result) {
auto& data = e.data();

res_status->set_code(rpc::Code::ANALYTICAL_ENGINE_INTERNAL_ERROR);
res_status->set_error_msg(ss.str());
LOG(ERROR) << ss.str();
break;
if (!data.empty()) {
op_result->mutable_result()->assign(data.begin(), data.end());
break;
}
}
break;
}
break;
}
case DispatchResult::AggregatePolicy::kConcat: {
for (auto& e : result) {
response->mutable_result()->append(e.data());
case DispatchResult::AggregatePolicy::kRequireConsistent: {
for (auto& e : result) {
auto& data = e.data();

if (op_result->result().empty()) {
op_result->mutable_result()->assign(data.begin(), data.end());
} else if (op_result->result() != data) {
std::stringstream ss;

ss << "Error: Multiple workers return different data."
<< " Current worker id: " << e.worker_id() << " " << data
<< " vs the previous: " << op_result->result();

op_result->set_code(rpc::Code::WORKER_RESULTS_INCONSISTENT_ERROR);
res_status->set_error_msg(ss.str());
LOG(ERROR) << ss.str();
success &= false;
break;
}
}
break;
}
case DispatchResult::AggregatePolicy::kConcat: {
for (auto& e : result) {
op_result->mutable_result()->append(e.data());
}
break;
}
}

if (!success) {
res_status->set_code(rpc::Code::ANALYTICAL_ENGINE_INTERNAL_ERROR);
// break dag exection flow
break;
}
break;
}
}

return ::grpc::Status::OK;
Expand Down
6 changes: 4 additions & 2 deletions analytical_engine/frame/project_frame.cc
Expand Up @@ -99,8 +99,10 @@ class ProjectSimpleFrame<
if (graph_def.has_extension()) {
graph_def.extension().UnpackTo(&vy_info);
}
vy_info.set_oid_type(PropertyTypeToPb(parent_meta.GetKeyValue("oid_type")));
vy_info.set_vid_type(PropertyTypeToPb(parent_meta.GetKeyValue("vid_type")));
vy_info.set_oid_type(PropertyTypeToPb(
vineyard::normalize_datatype(parent_meta.GetKeyValue("oid_type"))));
vy_info.set_vid_type(PropertyTypeToPb(
vineyard::normalize_datatype(parent_meta.GetKeyValue("vid_type"))));

std::string vdata_type, edata_type;
if (v_prop != "-1") {
Expand Down

0 comments on commit e4d0ebd

Please sign in to comment.