Skip to content

Commit

Permalink
tune logs (#11649)
Browse files Browse the repository at this point in the history
  • Loading branch information
gongweibao committed Jun 22, 2018
1 parent e45a555 commit dbca7f1
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 11 deletions.
15 changes: 13 additions & 2 deletions paddle/fluid/operators/distributed/grpc_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ limitations under the License. */

#include <limits>

#include "glog/logging.h" // For VLOG
#include "paddle/fluid/framework/threadpool.h"
#include "paddle/fluid/operators/distributed/request_handler.h"
#include "paddle/fluid/platform/profiler.h"
Expand Down Expand Up @@ -75,6 +76,9 @@ bool GRPCClient::AsyncSendVar(const std::string& ep,
var_h.scope = p_scope;
var_h.name = var_name_val;
var_h.ctx = p_ctx;
var_h.method = "Send";

VLOG(3) << var_h.String() << " begin";

// stub context
SendProcessor* s = new SendProcessor(ch);
Expand Down Expand Up @@ -129,6 +133,9 @@ bool GRPCClient::AsyncGetVar(const std::string& ep,
var_h.scope = p_scope;
var_h.name = var_name_val;
var_h.ctx = p_ctx;
var_h.method = "Get";

VLOG(3) << var_h.String() << " begin";

// stub context
GetProcessor* s = new GetProcessor(ch);
Expand Down Expand Up @@ -172,6 +179,9 @@ bool GRPCClient::AsyncPrefetchVar(const std::string& ep,
var_h.scope = p_scope;
var_h.name = out_var_name_val;
var_h.ctx = p_ctx;
var_h.method = "Prefetch";

VLOG(3) << var_h.String() << " begin";

// stub context
GetProcessor* s = new GetProcessor(ch);
Expand Down Expand Up @@ -243,10 +253,11 @@ void GRPCClient::Proceed() {
GPR_ASSERT(ok);
PADDLE_ENFORCE(c);
if (c->status_.ok()) {
VLOG(3) << c->var_h_.String() << " process";
c->Process();
} else {
LOG(FATAL) << "var: " << c->var_h_.String()
<< " grpc error:" << c->status_.error_message();
LOG(FATAL) << c->var_h_.String()
<< " meets grpc error:" << c->status_.error_message();
}
delete c;
{
Expand Down
6 changes: 5 additions & 1 deletion paddle/fluid/operators/distributed/grpc_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,18 @@ namespace operators {
namespace distributed {

struct VarHandle {
// RPC endpoint.
std::string ep;
const platform::DeviceContext* ctx;
const framework::Scope* scope;
// Variable name.
std::string name;
// RPC method name.
std::string method;

std::string String() const {
std::ostringstream s;
s << "name:[" << name << "] ep:[" << ep << "]";
s << method << " name:[" << name << "], ep:[" << ep << "]";
return s.str();
}
};
Expand Down
28 changes: 20 additions & 8 deletions paddle/fluid/operators/distributed/grpc_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,19 @@ class RequestBase {
virtual ~RequestBase() {}
virtual void Process() = 0;

std::string Status2String(const std::string& method) {
std::string status = "Process";
if (status_ == FINISH) {
status = "Finish";
}

std::ostringstream s;
s << method << " name:[" << GetReqName() << "]"
<< ", ep:[" << ctx_.peer() << "]"
<< " " << status << " using req_id:" << req_id_;
return s.str();
}

CallStatus Status() const {
std::lock_guard<std::mutex> l(status_mu_);
return status_;
Expand Down Expand Up @@ -272,7 +285,7 @@ void AsyncGRPCServer::TryToRegisterNewOne(const std::string& rpc_name,
int req_id) {
std::unique_lock<std::mutex> lock(cq_mutex_);
if (is_shut_down_) {
VLOG(3) << "shutdown, do not TryToRegisterNewSendOne";
LOG(WARNING) << "shutdown, do not TryToRegisterNewSendOne";
return;
}

Expand Down Expand Up @@ -306,14 +319,14 @@ void AsyncGRPCServer::HandleRequest(
bool ok = false;

while (true) {
VLOG(3) << "HandleRequest " << rpc_name << " wait next";
VLOG(4) << "HandleRequest " << rpc_name << " wait next";
if (!cq->Next(&tag, &ok)) {
LOG(INFO) << "CompletionQueue " << rpc_name << " shutdown!";
break;
}

int req_id = static_cast<int>(reinterpret_cast<intptr_t>(tag));
VLOG(3) << "HandleRequest " << rpc_name << ", req_id:" << req_id
VLOG(4) << "HandleRequest " << rpc_name << ", req_id:" << req_id
<< " get next";

auto& reqs = rpc_reqs_[rpc_name];
Expand All @@ -324,22 +337,21 @@ void AsyncGRPCServer::HandleRequest(
base = reqs[req_id];
}

VLOG(3) << base->Status2String(rpc_name);

// reference:
// https://github.com/tensorflow/tensorflow/issues/5596
// https://groups.google.com/forum/#!topic/grpc-io/xftlRy-IQwM
// https://groups.google.com/forum/#!topic/grpc-io/ywATt88Ef_I
if (!ok) {
LOG(WARNING) << "completion queue:" << rpc_name
<< " recv no regular event:argument name["
<< base->GetReqName() << "]";
<< " recv no regular event"
<< " context:" << base->Status2String(rpc_name);
TryToRegisterNewOne(rpc_name, req_id);
delete base;
continue;
}

VLOG(3) << "queue id:" << rpc_name << ", req_id:" << req_id
<< ", status:" << base->Status();

switch (base->Status()) {
case PROCESS: {
base->Process();
Expand Down
4 changes: 4 additions & 0 deletions paddle/fluid/operators/distributed/variable_response.cc
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ bool ReadRaw(::google::protobuf::io::CodedInputStream* input,
if (total_written + size_to_write > length) {
size_to_write = length - total_written;
}
// This log is useful to see how long a internal block size is of rpc.
VLOG(7) << "copy " << size_to_write << " data to CUDAPlace";
memory::Copy(boost::get<platform::CUDAPlace>(place),
reinterpret_cast<void*>(p), cpu, data, size_to_write,
gpu_dev_ctx.stream());
Expand Down Expand Up @@ -103,6 +105,8 @@ bool ReadRaw(::google::protobuf::io::CodedInputStream* input,
}
// TODO(gongwb): can we avoid copy?
platform::CPUPlace cpu;
// This log is useful to see how long a internal block size is of rpc.
VLOG(7) << "copy " << size_to_write << " data to CPUPlace";
memory::Copy(cpu, reinterpret_cast<void*>(p), cpu, data, size_to_write);

p += size_to_write;
Expand Down

0 comments on commit dbca7f1

Please sign in to comment.