Skip to content

Commit

Permalink
follow comments
Browse files Browse the repository at this point in the history
  • Loading branch information
chengduoZH committed Apr 25, 2018
1 parent 7ee07df commit ea78be2
Show file tree
Hide file tree
Showing 8 changed files with 116 additions and 63 deletions.
28 changes: 15 additions & 13 deletions paddle/fluid/framework/details/broadcast_op_handle.cc
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ void BroadcastOpHandle::RunImpl() {

Tensor &in_tensor = VariableVisitor::GetMutableTensor(in_var);

if (!use_nccl_ || platform::is_cpu_place(in_tensor.place())) {
if (platform::is_cpu_place(in_tensor.place())) {
for (auto *out : out_var_handles) {
if (*out == *in_var_handle) {
continue;
Expand All @@ -72,7 +72,7 @@ void BroadcastOpHandle::RunImpl() {
auto dev_ctx = dev_ctxes_.at(out_p);
RunAndRecordEvent(out_p, [in_tensor, out_var, dev_ctx, out_p] {
paddle::framework::TensorCopy(
in_tensor, out_p, *(dev_ctx),
in_tensor, out_p, *dev_ctx,
&VariableVisitor::GetMutableTensor(out_var));
});
}
Expand All @@ -81,22 +81,24 @@ void BroadcastOpHandle::RunImpl() {
PADDLE_ENFORCE(platform::is_gpu_place(in_tensor.place()));
VarHandle *out_handle;
int root = boost::get<platform::CUDAPlace>(in_tensor.place()).device;
std::vector<std::function<void()>> all_reduce_calls;
std::vector<std::function<void()>> broadcast_calls;

for (size_t j = 0; j < out_var_handles.size(); ++j) {
auto *out = out_var_handles[j];
auto *out_var = var_scopes.at(out->scope_idx_)->FindVar(out->name_);
VarHandle *out_var_handle = out_var_handles[j];
Variable *out_var = var_scopes.at(out_var_handle->scope_idx_)
->FindVar(out_var_handle->name_);

if (*out != *in_var_handle) {
if (*out_var_handle != *in_var_handle) {
PADDLE_ENFORCE_NOT_NULL(out_var);
PADDLE_ENFORCE_EQ(out->place_.which(), in_tensor.place().which(),
PADDLE_ENFORCE_EQ(out_var_handle->place_.which(),
in_tensor.place().which(),
"Places must be all on CPU or all on CUDA.");
VariableVisitor::ShareDimsAndLoD(*in_var, out_var);
VariableVisitor::GetMutableTensor(out_var).mutable_data(
out->place_, in_tensor.type());
out_var_handle->place_, in_tensor.type());
}

auto out_p = out->place_;
auto out_p = out_var_handle->place_;
int dev_id = boost::get<platform::CUDAPlace>(out_p).device;

auto &nccl_ctx = nccl_ctxs_->at(dev_id);
Expand All @@ -106,15 +108,15 @@ void BroadcastOpHandle::RunImpl() {
void *send_recv_buffer = nullptr;
if (root == dev_id) {
send_recv_buffer = const_cast<void *>(in_tensor.data<void>());
out_handle = out;
out_handle = out_var_handle;
} else {
send_recv_buffer =
VariableVisitor::GetMutableTensor(out_var).mutable_data(
out->place_);
out_var_handle->place_);
}

int type = platform::ToNCCLDataType(in_tensor.type());
all_reduce_calls.emplace_back([=] {
broadcast_calls.emplace_back([=] {
PADDLE_ENFORCE(platform::dynload::ncclBcast(
send_recv_buffer, in_tensor.numel(),
static_cast<ncclDataType_t>(type), root, comm, stream));
Expand All @@ -124,7 +126,7 @@ void BroadcastOpHandle::RunImpl() {
this->RunAndRecordEvent([&] {
{
platform::NCCLGroupGuard guard;
for (auto &call : all_reduce_calls) {
for (auto &call : broadcast_calls) {
call();
}
}
Expand Down
12 changes: 4 additions & 8 deletions paddle/fluid/framework/details/broadcast_op_handle.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,9 @@ struct BroadcastOpHandle : public OpHandleBase {
public:
#ifdef PADDLE_WITH_CUDA
BroadcastOpHandle(const std::vector<Scope *> &local_scopes,
const std::vector<platform::Place> &places, bool use_nccl,
const std::vector<platform::Place> &places,
const platform::NCCLContextMap *nccl_ctxs)
: local_scopes_(local_scopes),
places_(places),
use_nccl_(use_nccl),
nccl_ctxs_(nccl_ctxs) {
: local_scopes_(local_scopes), places_(places), nccl_ctxs_(nccl_ctxs) {
if (nccl_ctxs_) {
for (auto &p_ctx : nccl_ctxs_->contexts_) {
dev_ctxes_[platform::CUDAPlace(p_ctx.first)] = p_ctx.second.ctx_.get();
Expand All @@ -50,8 +47,8 @@ struct BroadcastOpHandle : public OpHandleBase {
}
#else
BroadcastOpHandle(const std::vector<Scope *> &local_scopes,
const std::vector<platform::Place> &places, bool use_nccl)
: local_scopes_(local_scopes), places_(places), use_nccl_(use_nccl) {}
const std::vector<platform::Place> &places)
: local_scopes_(local_scopes), places_(places) {}
#endif

std::string Name() const override;
Expand All @@ -65,7 +62,6 @@ struct BroadcastOpHandle : public OpHandleBase {
private:
const std::vector<Scope *> &local_scopes_;
const std::vector<platform::Place> &places_;
bool use_nccl_;
#ifdef PADDLE_WITH_CUDA
const platform::NCCLContextMap *nccl_ctxs_;
#endif
Expand Down
16 changes: 8 additions & 8 deletions paddle/fluid/framework/details/broadcast_op_handle_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -96,19 +96,19 @@ struct TestBroadcastOpHandle {
}
param_scopes_[input_scope_idx]->Var("input");

#ifdef PADDLE_WITH_CUDA
op_handle_.reset(new BroadcastOpHandle(local_scopes_, gpu_list_, use_gpu_,
nccl_ctxs_.get()));
#endif

if (use_gpu_) {
#ifndef PADDLE_WITH_CUDA
#ifdef PADDLE_WITH_CUDA
op_handle_.reset(
new BroadcastOpHandle(local_scopes_, gpu_list_, nccl_ctxs_.get()));
#else
PADDLE_THROW("CUDA is not support.");
#endif
} else {
#ifndef PADDLE_WITH_CUDA
#ifdef PADDLE_WITH_CUDA
op_handle_.reset(
new BroadcastOpHandle(local_scopes_, gpu_list_, use_gpu_));
new BroadcastOpHandle(local_scopes_, gpu_list_, nccl_ctxs_.get()));
#else
op_handle_.reset(new BroadcastOpHandle(local_scopes_, gpu_list_));
#endif
}

Expand Down
5 changes: 2 additions & 3 deletions paddle/fluid/framework/details/multi_devices_graph_builder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -239,10 +239,9 @@ void MultiDevSSAGraphBuilder::CreateBroadcastOp(SSAGraph *result,
const std::string &p_name,
size_t dev_id) const {
#ifdef PADDLE_WITH_CUDA
auto *op_handle =
new BroadcastOpHandle(local_scopes_, places_, true, nccl_ctxs_);
auto *op_handle = new BroadcastOpHandle(local_scopes_, places_, nccl_ctxs_);
#else
auto *op_handle = new BroadcastOpHandle(local_scopes_, places_, false);
auto *op_handle = new BroadcastOpHandle(local_scopes_, places_);
#endif

result->ops_.emplace_back(op_handle);
Expand Down
15 changes: 8 additions & 7 deletions paddle/fluid/framework/details/reduce_op_handle_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -96,17 +96,18 @@ struct TestReduceOpHandle {
}
param_scopes_[out_scope_idx]->Var("out");

#ifdef PADDLE_WITH_CUDA
op_handle_.reset(
new ReduceOpHandle(local_scopes_, gpu_list_, nccl_ctxs_.get()));
#endif

if (use_gpu_) {
#ifndef PADDLE_WITH_CUDA
#ifdef PADDLE_WITH_CUDA
op_handle_.reset(
new ReduceOpHandle(local_scopes_, gpu_list_, nccl_ctxs_.get()));
#else
PADDLE_THROW("CUDA is not support.");
#endif
} else {
#ifndef PADDLE_WITH_CUDA
#ifdef PADDLE_WITH_CUDA
op_handle_.reset(
new ReduceOpHandle(local_scopes_, gpu_list_, nccl_ctxs_.get()));
#else
op_handle_.reset(new ReduceOpHandle(local_scopes_, gpu_list_));
#endif
}
Expand Down
5 changes: 1 addition & 4 deletions paddle/fluid/framework/details/var_handle.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,7 @@ struct VarHandle : public VarHandleBase {
o.scope_idx_ == scope_idx_;
}

bool operator!=(const VarHandle& o) const {
return o.generated_op_ != generated_op_ || o.name_ != name_ ||
o.scope_idx_ != scope_idx_;
}
bool operator!=(const VarHandle& o) const { return !this->operator==(o); }
};

// Dummy Variable. It is used to represent dependencies between operators
Expand Down
10 changes: 9 additions & 1 deletion python/paddle/fluid/parallel_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,17 @@ def __init__(self,
training.
allow_op_delay(bool, default False): Whether to delay and buffer
some operators together for scheduling or not, which may
improve performance in some cases, defalut False.
improve performance in some cases, default False.
share_vars_from(ParallelExecutor, default None): If provied,
it will share variables from the specified ParallelExecutor.
use_nccl_allreduce(bool, default True): Whether to use nccl_allreduce
or not, if set True, the communication between different
devices by nccl allReduce, which doesn't support updating sparse
parameter, if set False, the communication between different
devices by reduce_op and broadcast_op, which will distribute all
the parameter gradients evenly to different device and updates
the parameters, and finally broadcast to other device, this method
support updating sparse parameter. Default True.
Returns:
A ParallelExecutor object.
Expand Down

0 comments on commit ea78be2

Please sign in to comment.