From dfe836c0fea750e3135ea53e81c9c776477ae7e2 Mon Sep 17 00:00:00 2001 From: tangwei Date: Tue, 30 Jul 2019 19:45:38 +0800 Subject: [PATCH 1/5] fix sync mode hang in transpiler, test=develop --- .../operators/distributed_ops/fetch_barrier_op.cc | 12 ++++++++---- paddle/fluid/operators/distributed_ops/recv_op.cc | 13 +++++++++---- .../operators/distributed_ops/send_barrier_op.cc | 13 ++++++++++--- paddle/fluid/operators/distributed_ops/send_op.cc | 10 ++++------ .../fluid/transpiler/distribute_transpiler.py | 4 ++-- 5 files changed, 33 insertions(+), 19 deletions(-) diff --git a/paddle/fluid/operators/distributed_ops/fetch_barrier_op.cc b/paddle/fluid/operators/distributed_ops/fetch_barrier_op.cc index 7275ab201f471..fce41c63c864a 100644 --- a/paddle/fluid/operators/distributed_ops/fetch_barrier_op.cc +++ b/paddle/fluid/operators/distributed_ops/fetch_barrier_op.cc @@ -40,13 +40,17 @@ class FetchBarrierOp : public framework::OperatorBase { distributed::RPCClient::GetInstance( Attr("trainer_id")); - PADDLE_ENFORCE(rpc_client->Wait(), "internal error in RPCClient"); - + std::vector rets; for (auto& ep : eps) { VLOG(3) << "fetch barrier, ep: " << ep; - rpc_client->AsyncSendFetchBarrier(ep); + rets.push_back(rpc_client->AsyncSendFetchBarrier(ep)); + } + + for (size_t i = 0; i < rets.size(); i++) { + VLOG(7) << "before sync_fetch_barrier " << ins[i] << "from " << epmap[i]; + PADDLE_ENFORCE(rets[i]->Wait(), "internal error in RPCClient"); + VLOG(7) << "after sync_fetch_barrier " << ins[i] << "from " << epmap[i]; } - PADDLE_ENFORCE(rpc_client->Wait(), "internal error in RPCClient"); } }; diff --git a/paddle/fluid/operators/distributed_ops/recv_op.cc b/paddle/fluid/operators/distributed_ops/recv_op.cc index b871859dbb142..4a2e5f4f37138 100644 --- a/paddle/fluid/operators/distributed_ops/recv_op.cc +++ b/paddle/fluid/operators/distributed_ops/recv_op.cc @@ -73,10 +73,10 @@ class RecvOp : public framework::OperatorBase { rets.push_back( rpc_client->AsyncGetVar(epmap[i], ctx, scope, varname, outs[i])); } - if (sync_mode) { - for (size_t i = 0; i < rets.size(); i++) { - PADDLE_ENFORCE(rets[i]->Wait(), "internal error in RPCClient"); - } + for (size_t i = 0; i < rets.size(); i++) { + VLOG(7) << "before sync_recv " << ins[i] << "from " << epmap[i]; + PADDLE_ENFORCE(rets[i]->Wait(), "internal error in RPCClient"); + VLOG(7) << "after sync_recv " << ins[i] << "from " << epmap[i]; } } else { std::vector rets; @@ -87,8 +87,13 @@ class RecvOp : public framework::OperatorBase { rets.push_back(rpc_client->AsyncGetVarNoBarrier(epmap[i], ctx, scope, varname, outs[i])); } + for (size_t i = 0; i < rets.size(); i++) { + VLOG(7) << "before sync_recv_nobarrier " << ins[i] << "from " + << epmap[i]; PADDLE_ENFORCE(rets[i]->Wait(), "internal error in RPCClient"); + VLOG(7) << "after sync_recv_nobarrier " << ins[i] << "from " + << epmap[i]; } } } diff --git a/paddle/fluid/operators/distributed_ops/send_barrier_op.cc b/paddle/fluid/operators/distributed_ops/send_barrier_op.cc index ae1b10c3b6c7b..7f7f7fa3066ee 100644 --- a/paddle/fluid/operators/distributed_ops/send_barrier_op.cc +++ b/paddle/fluid/operators/distributed_ops/send_barrier_op.cc @@ -44,12 +44,19 @@ class SendBarrierOp : public framework::OperatorBase { VLOG(3) << "SendBarrierOp sync"; - // need to wait before sending send_barrier message - PADDLE_ENFORCE(rpc_client->Wait(), "internal error in RPCClient"); + std::vector rets; + for (auto& ep : eps) { VLOG(3) << "send barrier, ep: " << ep; - rpc_client->AsyncSendBatchBarrier(ep); + rets.push_back(rpc_client->AsyncSendBatchBarrier(ep)); } + + for (size_t i = 0; i < rets.size(); i++) { + VLOG(7) << "before sync_send_barrier " << ins[i] << "from " << epmap[i]; + PADDLE_ENFORCE(rets[i]->Wait(), "internal error in RPCClient"); + VLOG(7) << "after sync_send_barrier " << ins[i] << "from " << epmap[i]; + } + PADDLE_ENFORCE(rpc_client->Wait(), "internal error in RPCClient"); } }; diff --git a/paddle/fluid/operators/distributed_ops/send_op.cc b/paddle/fluid/operators/distributed_ops/send_op.cc index 5731bcc15a070..79685a2b338fd 100644 --- a/paddle/fluid/operators/distributed_ops/send_op.cc +++ b/paddle/fluid/operators/distributed_ops/send_op.cc @@ -75,12 +75,10 @@ class SendOp : public framework::OperatorBase { VLOG(3) << "don't send no-initialied variable: " << ins[i]; } } - if (sync_send) { - for (size_t i = 0; i < rets.size(); i++) { - VLOG(7) << "before sync_send " << ins[i] << "from " << epmap[i]; - PADDLE_ENFORCE(rets[i]->Wait(), "internal error in RPCClient"); - VLOG(7) << "after sync_send " << ins[i] << "from " << epmap[i]; - } + for (size_t i = 0; i < rets.size(); i++) { + VLOG(7) << "before sync_send " << ins[i] << "from " << epmap[i]; + PADDLE_ENFORCE(rets[i]->Wait(), "internal error in RPCClient"); + VLOG(7) << "after sync_send " << ins[i] << "from " << epmap[i]; } } } diff --git a/python/paddle/fluid/transpiler/distribute_transpiler.py b/python/paddle/fluid/transpiler/distribute_transpiler.py index 722531abe4be1..e34de6caf8dc4 100644 --- a/python/paddle/fluid/transpiler/distribute_transpiler.py +++ b/python/paddle/fluid/transpiler/distribute_transpiler.py @@ -575,7 +575,7 @@ def transpile(self, self.grad_name_to_param_name[grad_varname], splited_grad_varname ], - "sync_mode": not self.sync_mode, + "sync_mode": True, }) for _, var in enumerate(splited_vars): send_vars.append(var) @@ -670,7 +670,7 @@ def transpile(self, RPC_OP_ROLE_ATTR_NAME: RPC_OP_ROLE_ATTR_VALUE, OP_ROLE_VAR_ATTR_NAME: [param_varname, recv_op_role_var_name], - "sync_mode": not self.sync_mode + "sync_mode": True }) if self.sync_mode: From 10b76de7328d15feda09c212e97f38a20516a854 Mon Sep 17 00:00:00 2001 From: tangwei Date: Wed, 31 Jul 2019 14:32:25 +0800 Subject: [PATCH 2/5] fix sync mode hang in transpiler, test=develop --- paddle/fluid/operators/distributed_ops/fetch_barrier_op.cc | 2 -- paddle/fluid/operators/distributed_ops/recv_op.cc | 4 ++-- paddle/fluid/operators/distributed_ops/send_barrier_op.cc | 4 ---- 3 files changed, 2 insertions(+), 8 deletions(-) diff --git a/paddle/fluid/operators/distributed_ops/fetch_barrier_op.cc b/paddle/fluid/operators/distributed_ops/fetch_barrier_op.cc index fce41c63c864a..36c8f2e8929d8 100644 --- a/paddle/fluid/operators/distributed_ops/fetch_barrier_op.cc +++ b/paddle/fluid/operators/distributed_ops/fetch_barrier_op.cc @@ -47,9 +47,7 @@ class FetchBarrierOp : public framework::OperatorBase { } for (size_t i = 0; i < rets.size(); i++) { - VLOG(7) << "before sync_fetch_barrier " << ins[i] << "from " << epmap[i]; PADDLE_ENFORCE(rets[i]->Wait(), "internal error in RPCClient"); - VLOG(7) << "after sync_fetch_barrier " << ins[i] << "from " << epmap[i]; } } }; diff --git a/paddle/fluid/operators/distributed_ops/recv_op.cc b/paddle/fluid/operators/distributed_ops/recv_op.cc index 4a2e5f4f37138..1245b104c4a54 100644 --- a/paddle/fluid/operators/distributed_ops/recv_op.cc +++ b/paddle/fluid/operators/distributed_ops/recv_op.cc @@ -89,10 +89,10 @@ class RecvOp : public framework::OperatorBase { } for (size_t i = 0; i < rets.size(); i++) { - VLOG(7) << "before sync_recv_nobarrier " << ins[i] << "from " + VLOG(7) << "before sync_recv_nobarrier " << outs[i] << "from " << epmap[i]; PADDLE_ENFORCE(rets[i]->Wait(), "internal error in RPCClient"); - VLOG(7) << "after sync_recv_nobarrier " << ins[i] << "from " + VLOG(7) << "after sync_recv_nobarrier " << outs[i] << "from " << epmap[i]; } } diff --git a/paddle/fluid/operators/distributed_ops/send_barrier_op.cc b/paddle/fluid/operators/distributed_ops/send_barrier_op.cc index 7f7f7fa3066ee..309a85209fbd4 100644 --- a/paddle/fluid/operators/distributed_ops/send_barrier_op.cc +++ b/paddle/fluid/operators/distributed_ops/send_barrier_op.cc @@ -52,12 +52,8 @@ class SendBarrierOp : public framework::OperatorBase { } for (size_t i = 0; i < rets.size(); i++) { - VLOG(7) << "before sync_send_barrier " << ins[i] << "from " << epmap[i]; PADDLE_ENFORCE(rets[i]->Wait(), "internal error in RPCClient"); - VLOG(7) << "after sync_send_barrier " << ins[i] << "from " << epmap[i]; } - - PADDLE_ENFORCE(rpc_client->Wait(), "internal error in RPCClient"); } }; From 41d3102bf0517e9283f233d7b3b4d72a013962d5 Mon Sep 17 00:00:00 2001 From: tangwei Date: Wed, 7 Aug 2019 14:52:37 +0800 Subject: [PATCH 3/5] fix outs in recv op, test=develop --- paddle/fluid/operators/distributed_ops/recv_op.cc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/paddle/fluid/operators/distributed_ops/recv_op.cc b/paddle/fluid/operators/distributed_ops/recv_op.cc index 1245b104c4a54..70b935b25bbf2 100644 --- a/paddle/fluid/operators/distributed_ops/recv_op.cc +++ b/paddle/fluid/operators/distributed_ops/recv_op.cc @@ -74,9 +74,9 @@ class RecvOp : public framework::OperatorBase { rpc_client->AsyncGetVar(epmap[i], ctx, scope, varname, outs[i])); } for (size_t i = 0; i < rets.size(); i++) { - VLOG(7) << "before sync_recv " << ins[i] << "from " << epmap[i]; + VLOG(7) << "before sync_recv " << outs[i] << "from " << epmap[i]; PADDLE_ENFORCE(rets[i]->Wait(), "internal error in RPCClient"); - VLOG(7) << "after sync_recv " << ins[i] << "from " << epmap[i]; + VLOG(7) << "after sync_recv " << outs[i] << "from " << epmap[i]; } } else { std::vector rets; From 8afc13da7bb19e08cf0ebf836dea3cf0adee1439 Mon Sep 17 00:00:00 2001 From: tangwei Date: Thu, 22 Aug 2019 11:52:16 +0800 Subject: [PATCH 4/5] remove sync mode in send/recv, test=develop --- .../operators/distributed_ops/recv_op.cc | 27 +++++-------------- .../operators/distributed_ops/send_op.cc | 5 ---- .../fluid/transpiler/distribute_transpiler.py | 8 ++---- 3 files changed, 9 insertions(+), 31 deletions(-) diff --git a/paddle/fluid/operators/distributed_ops/recv_op.cc b/paddle/fluid/operators/distributed_ops/recv_op.cc index 70b935b25bbf2..aecc7e28f5398 100644 --- a/paddle/fluid/operators/distributed_ops/recv_op.cc +++ b/paddle/fluid/operators/distributed_ops/recv_op.cc @@ -44,7 +44,7 @@ class RecvOp : public framework::OperatorBase { std::vector epmap = Attr>("epmap"); std::vector varnames = Attr>("varnames"); - int sync_mode = Attr("sync_mode"); + auto outs = Outputs("Out"); bool with_barrier = Attr("with_barrier"); @@ -64,8 +64,8 @@ class RecvOp : public framework::OperatorBase { trainer_id); recv_functor(rpc_ctx, scope); } else { + std::vector rets; if (with_barrier) { - std::vector rets; for (size_t i = 0; i < outs.size(); i++) { std::string varname = varnames.size() == 0 ? outs[i] : varnames[i]; VLOG(4) << "recv " << outs[i] << " from " << epmap[i] << " with " @@ -73,13 +73,7 @@ class RecvOp : public framework::OperatorBase { rets.push_back( rpc_client->AsyncGetVar(epmap[i], ctx, scope, varname, outs[i])); } - for (size_t i = 0; i < rets.size(); i++) { - VLOG(7) << "before sync_recv " << outs[i] << "from " << epmap[i]; - PADDLE_ENFORCE(rets[i]->Wait(), "internal error in RPCClient"); - VLOG(7) << "after sync_recv " << outs[i] << "from " << epmap[i]; - } } else { - std::vector rets; for (size_t i = 0; i < outs.size(); i++) { std::string varname = varnames.size() == 0 ? outs[i] : varnames[i]; VLOG(4) << "recv " << outs[i] << " from " << epmap[i] << " with " @@ -87,14 +81,11 @@ class RecvOp : public framework::OperatorBase { rets.push_back(rpc_client->AsyncGetVarNoBarrier(epmap[i], ctx, scope, varname, outs[i])); } - - for (size_t i = 0; i < rets.size(); i++) { - VLOG(7) << "before sync_recv_nobarrier " << outs[i] << "from " - << epmap[i]; - PADDLE_ENFORCE(rets[i]->Wait(), "internal error in RPCClient"); - VLOG(7) << "after sync_recv_nobarrier " << outs[i] << "from " - << epmap[i]; - } + } + for (size_t i = 0; i < rets.size(); i++) { + VLOG(7) << "before sync_recv " << outs[i] << "from " << epmap[i]; + PADDLE_ENFORCE(rets[i]->Wait(), "internal error in RPCClient"); + VLOG(7) << "after sync_recv " << outs[i] << "from " << epmap[i]; } } } @@ -117,10 +108,6 @@ This operator can get variables from server side. "variables for mapping") .SetDefault({}); AddAttr("trainer_id", "trainer id from 0 ~ worker_num.").SetDefault(0); - AddAttr("sync_mode", - "(int, default 0)" - "sync recv or async recv.") - .SetDefault(0); AddAttr("with_barrier", "(bool, default True) if with_barrier=False, will use " "AsyncGetVarNoBarrier get variable from pserver immediately") diff --git a/paddle/fluid/operators/distributed_ops/send_op.cc b/paddle/fluid/operators/distributed_ops/send_op.cc index 79685a2b338fd..838f7a3bbf5d5 100644 --- a/paddle/fluid/operators/distributed_ops/send_op.cc +++ b/paddle/fluid/operators/distributed_ops/send_op.cc @@ -41,7 +41,6 @@ class SendOp : public framework::OperatorBase { auto ins = Inputs("X"); auto epmap = Attr>("epmap"); - int sync_send = Attr("sync_mode"); auto trainer_id = Attr("trainer_id"); auto send_varnames = Attr>("send_varnames"); @@ -96,10 +95,6 @@ Send operator This operator will send variables to listen_and_serve op at the parameter server. )DOC"); - AddAttr("sync_mode", - "(int, default 0)" - "sync send or async send.") - .SetDefault(0); AddAttr("trainer_id", "trainer id from 0 ~ worker_num.").SetDefault(0); AddAttr>("epmap", "(string vector, default 127.0.0.1:6164)" diff --git a/python/paddle/fluid/transpiler/distribute_transpiler.py b/python/paddle/fluid/transpiler/distribute_transpiler.py index e34de6caf8dc4..507d3c1ac498d 100644 --- a/python/paddle/fluid/transpiler/distribute_transpiler.py +++ b/python/paddle/fluid/transpiler/distribute_transpiler.py @@ -574,8 +574,7 @@ def transpile(self, OP_ROLE_VAR_ATTR_NAME: [ self.grad_name_to_param_name[grad_varname], splited_grad_varname - ], - "sync_mode": True, + ] }) for _, var in enumerate(splited_vars): send_vars.append(var) @@ -595,7 +594,6 @@ def transpile(self, outputs={"Out": send_barrier_out}, attrs={ "endpoints": pserver_endpoints, - "sync_mode": self.sync_mode, "trainer_id": self.trainer_id, RPC_OP_ROLE_ATTR_NAME: RPC_OP_ROLE_ATTR_VALUE }) @@ -669,8 +667,7 @@ def transpile(self, "trainer_id": self.trainer_id, RPC_OP_ROLE_ATTR_NAME: RPC_OP_ROLE_ATTR_VALUE, OP_ROLE_VAR_ATTR_NAME: - [param_varname, recv_op_role_var_name], - "sync_mode": True + [param_varname, recv_op_role_var_name] }) if self.sync_mode: @@ -1548,7 +1545,6 @@ def _split_table_grad_and_add_send_vars(self, program, pserver_endpoints): if self.sync_mode else [] }, attrs={ - "sync_mode": not self.sync_mode, "epmap": pserver_endpoints, "trainer_id": self.trainer_id, RPC_OP_ROLE_ATTR_NAME: RPC_OP_ROLE_ATTR_VALUE, From b2381e7ae670ce4a7b7c4d678e69e5d96d8e093b Mon Sep 17 00:00:00 2001 From: tangwei Date: Thu, 22 Aug 2019 15:46:03 +0800 Subject: [PATCH 5/5] replace PADDLE_ENFORCE with PADDLE_ENFORCE_NE, test=develop --- paddle/fluid/operators/distributed_ops/fetch_barrier_op.cc | 2 +- paddle/fluid/operators/distributed_ops/recv_op.cc | 2 +- paddle/fluid/operators/distributed_ops/send_barrier_op.cc | 2 +- paddle/fluid/operators/distributed_ops/send_op.cc | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/paddle/fluid/operators/distributed_ops/fetch_barrier_op.cc b/paddle/fluid/operators/distributed_ops/fetch_barrier_op.cc index 36c8f2e8929d8..ae4b687ffc4c8 100644 --- a/paddle/fluid/operators/distributed_ops/fetch_barrier_op.cc +++ b/paddle/fluid/operators/distributed_ops/fetch_barrier_op.cc @@ -47,7 +47,7 @@ class FetchBarrierOp : public framework::OperatorBase { } for (size_t i = 0; i < rets.size(); i++) { - PADDLE_ENFORCE(rets[i]->Wait(), "internal error in RPCClient"); + PADDLE_ENFORCE_NE(rets[i]->Wait(), 0U, "internal error in RPCClient"); } } }; diff --git a/paddle/fluid/operators/distributed_ops/recv_op.cc b/paddle/fluid/operators/distributed_ops/recv_op.cc index aecc7e28f5398..30a161fe2565e 100644 --- a/paddle/fluid/operators/distributed_ops/recv_op.cc +++ b/paddle/fluid/operators/distributed_ops/recv_op.cc @@ -84,7 +84,7 @@ class RecvOp : public framework::OperatorBase { } for (size_t i = 0; i < rets.size(); i++) { VLOG(7) << "before sync_recv " << outs[i] << "from " << epmap[i]; - PADDLE_ENFORCE(rets[i]->Wait(), "internal error in RPCClient"); + PADDLE_ENFORCE_NE(rets[i]->Wait(), 0U, "internal error in RPCClient"); VLOG(7) << "after sync_recv " << outs[i] << "from " << epmap[i]; } } diff --git a/paddle/fluid/operators/distributed_ops/send_barrier_op.cc b/paddle/fluid/operators/distributed_ops/send_barrier_op.cc index 309a85209fbd4..558d0090d734b 100644 --- a/paddle/fluid/operators/distributed_ops/send_barrier_op.cc +++ b/paddle/fluid/operators/distributed_ops/send_barrier_op.cc @@ -52,7 +52,7 @@ class SendBarrierOp : public framework::OperatorBase { } for (size_t i = 0; i < rets.size(); i++) { - PADDLE_ENFORCE(rets[i]->Wait(), "internal error in RPCClient"); + PADDLE_ENFORCE_NE(rets[i]->Wait(), 0U, "internal error in RPCClient"); } } }; diff --git a/paddle/fluid/operators/distributed_ops/send_op.cc b/paddle/fluid/operators/distributed_ops/send_op.cc index 838f7a3bbf5d5..acb25b17d563c 100644 --- a/paddle/fluid/operators/distributed_ops/send_op.cc +++ b/paddle/fluid/operators/distributed_ops/send_op.cc @@ -76,7 +76,7 @@ class SendOp : public framework::OperatorBase { } for (size_t i = 0; i < rets.size(); i++) { VLOG(7) << "before sync_send " << ins[i] << "from " << epmap[i]; - PADDLE_ENFORCE(rets[i]->Wait(), "internal error in RPCClient"); + PADDLE_ENFORCE_NE(rets[i]->Wait(), 0U, "internal error in RPCClient"); VLOG(7) << "after sync_send " << ins[i] << "from " << epmap[i]; } }