Skip to content

Commit

Permalink
Merge pull request #7980 from typhoonzero/grpc_perf_conn_once
Browse files Browse the repository at this point in the history
Performance enhancement by reuse connection
  • Loading branch information
helinwang committed Jan 30, 2018
2 parents c52c0d6 + 683c5a3 commit fbd5f68
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 11 deletions.
27 changes: 17 additions & 10 deletions paddle/operators/send_op.cc
Original file line number Diff line number Diff line change
Expand Up @@ -42,28 +42,32 @@ class SendOp : public framework::OperatorBase {

platform::DeviceContextPool& pool = platform::DeviceContextPool::Instance();
auto& ctx = *pool.Get(place);

auto client_var_name = Output("RPCClient");
PADDLE_ENFORCE_NOT_NULL(scope.FindVar(client_var_name),
"Can not find variable '%s' in the scope.",
client_var_name);
auto* client_var = scope.FindVar(client_var_name);
detail::RPCClient* rpc_client = client_var->GetMutable<detail::RPCClient>();

for (size_t i = 0; i < ins.size(); i++) {
VLOG(3) << "sending " << ins[i] << " to " << epmap[i];
client_.AsyncSendVariable(epmap[i], ctx, scope, ins[i]);
rpc_client->AsyncSendVariable(epmap[i], ctx, scope, ins[i]);
}
PADDLE_ENFORCE(client_.Wait());
PADDLE_ENFORCE(rpc_client->Wait());

for (auto& ep : endpoints) {
VLOG(3) << "batch barrier, ep: " << ep;
client_.AsyncSendBatchBarrier(ep);
rpc_client->AsyncSendBatchBarrier(ep);
}
PADDLE_ENFORCE(client_.Wait());
PADDLE_ENFORCE(rpc_client->Wait());

for (size_t i = 0; i < outs.size(); i++) {
VLOG(3) << "getting " << outs[i] << " from " << epmap[i];
client_.AsyncGetVariable(epmap[i], ctx, scope, outs[i]);
rpc_client->AsyncGetVariable(epmap[i], ctx, scope, outs[i]);
}

PADDLE_ENFORCE(client_.Wait());
PADDLE_ENFORCE(rpc_client->Wait());
}

private:
mutable detail::RPCClient client_;
};

class SendOpMaker : public framework::OpProtoAndCheckerMaker {
Expand All @@ -73,6 +77,9 @@ class SendOpMaker : public framework::OpProtoAndCheckerMaker {
AddInput("X", "(Tensor) Input tensor to be sent").AsDuplicable();
AddOutput("Out", "(Tensor) Output tensor to be received from server")
.AsDuplicable();
AddOutput("RPCClient",
"(RPCClient) The RPC client object which is"
"initialized at most once.");
AddComment(R"DOC(
Send operator
Expand Down
9 changes: 8 additions & 1 deletion python/paddle/v2/fluid/distribute_transpiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,11 +153,18 @@ def transpile(self,
self.param_grad_ep_mapping[ep]["params"].append(param)
self.param_grad_ep_mapping[ep]["grads"].append(grad)

rpc_client_var = program.global_block().create_var(
name="RPC_CLIENT_VAR",
psersistable=True,
dtype='float32', # dtype and shape is not used in fact
shape=[0])

# create send_op
send_op = program.global_block().append_op(
type="send",
inputs={"X": send_inputs},
outputs={"Out": send_outputs},
outputs={"Out": send_outputs,
"RPCClient": rpc_client_var},
attrs={"endpoints": pserver_endpoints,
"epmap": eplist})
# step4
Expand Down

0 comments on commit fbd5f68

Please sign in to comment.