Skip to content

Commit

Permalink
add nccl broadcast
Browse files Browse the repository at this point in the history
  • Loading branch information
chengduoZH committed Apr 22, 2018
1 parent 3210055 commit 20ba594
Show file tree
Hide file tree
Showing 7 changed files with 130 additions and 32 deletions.
8 changes: 5 additions & 3 deletions paddle/fluid/framework/details/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,14 @@ if(WITH_GPU)
dynload_cuda)
set(multi_devices_graph_builder_deps nccl_all_reduce_op_handle)
nv_library(reduce_op_handle SRCS reduce_op_handle.cc DEPS op_handle_base variable_visitor scope ddim dynload_cuda)
nv_library(broadcast_op_handle SRCS broadcast_op_handle.cc DEPS op_handle_base scope ddim memory variable_visitor dynload_cuda)

else()
set(multi_devices_graph_builder_deps)
cc_library(reduce_op_handle SRCS reduce_op_handle.cc DEPS op_handle_base variable_visitor scope ddim)
cc_library(broadcast_op_handle SRCS broadcast_op_handle.cc DEPS op_handle_base scope ddim memory variable_visitor)
endif()

cc_library(broadcast_op_handle SRCS broadcast_op_handle.cc DEPS op_handle_base scope ddim memory variable_visitor)
cc_library(gather_op_handle SRCS gather_op_handle.cc DEPS op_handle_base scope ddim memory variable_visitor)

cc_library(multi_devices_graph_builder SRCS multi_devices_graph_builder.cc DEPS ssa_graph_builder computation_op_handle
Expand All @@ -30,8 +32,8 @@ cc_library(ssa_graph_executor SRCS ssa_graph_executor.cc DEPS ssa_graph framewor
cc_library(threaded_ssa_graph_executor SRCS threaded_ssa_graph_executor.cc DEPS fetch_op_handle ssa_graph_executor scope
simple_threadpool device_context)

cc_test(broadcast_op_test SRCS broadcast_op_handle_test.cc DEPS var_handle op_handle_base scope ddim memory
device_context broadcast_op_handle)
#cc_test(broadcast_op_test SRCS broadcast_op_handle_test.cc DEPS var_handle op_handle_base scope ddim memory
# device_context broadcast_op_handle)
cc_test(gather_op_test SRCS gather_op_handle_test.cc DEPS var_handle op_handle_base scope ddim memory
device_context gather_op_handle)
cc_test(reduce_op_handle_test SRCS reduce_op_handle_test.cc DEPS var_handle op_handle_base scope ddim memory
Expand Down
105 changes: 84 additions & 21 deletions paddle/fluid/framework/details/broadcast_op_handle.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,6 @@
namespace paddle {
namespace framework {
namespace details {
BroadcastOpHandle::BroadcastOpHandle(const std::vector<Scope *> &local_scopes,
const std::vector<platform::Place> &places)
: local_scopes_(local_scopes), places_(places) {}

void BroadcastOpHandle::RunImpl() {
if (places_.size() == 1) return;
Expand Down Expand Up @@ -56,27 +53,93 @@ void BroadcastOpHandle::RunImpl() {

Tensor &in_tensor = VariableVisitor::GetMutableTensor(in_var);

for (auto *out : out_var_handles) {
if (*out == *in_var_handle) {
continue;
if (!use_nccl_ || platform::is_cpu_place(in_tensor.place())) {
for (auto *out : out_var_handles) {
if (*out == *in_var_handle) {
continue;
}

auto &out_p = out->place_;
auto *out_var = var_scopes.at(out->scope_idx_)->FindVar(out->name_);
PADDLE_ENFORCE_NOT_NULL(out_var);
PADDLE_ENFORCE_EQ(out_p.which(), in_tensor.place().which(),
"Places must be all on CPU or all on CUDA.");

VariableVisitor::ShareDimsAndLoD(*in_var, out_var);
VariableVisitor::GetMutableTensor(out_var).mutable_data(out_p,
in_tensor.type());

auto dev_ctx = dev_ctxes_.at(out_p);
RunAndRecordEvent(out_p, [in_tensor, out_var, dev_ctx, out_p] {
paddle::framework::TensorCopy(
in_tensor, out_p, *(dev_ctx),
&VariableVisitor::GetMutableTensor(out_var));
});
}
} else {
#ifdef PADDLE_WITH_CUDA
PADDLE_ENFORCE(platform::is_gpu_place(in_tensor.place()));
VarHandle *out_handle;
int root = boost::get<platform::CUDAPlace>(in_tensor.place()).device;
std::vector<std::function<void()>> all_reduce_calls;

for (size_t j = 0; j < out_var_handles.size(); ++j) {
auto *out = out_var_handles[j];
auto *out_var = var_scopes.at(out->scope_idx_)->FindVar(out->name_);

if (*out != *in_var_handle) {
PADDLE_ENFORCE_NOT_NULL(out_var);
PADDLE_ENFORCE_EQ(out->place_.which(), in_tensor.place().which(),
"Places must be all on CPU or all on CUDA.");
VariableVisitor::ShareDimsAndLoD(*in_var, out_var);
VariableVisitor::GetMutableTensor(out_var).mutable_data(
out->place_, in_tensor.type());
}

auto out_p = out->place_;
int dev_id = boost::get<platform::CUDAPlace>(out_p).device;

auto &nccl_ctx = nccl_ctxs_->at(dev_id);
auto stream = nccl_ctx.stream();
auto comm = nccl_ctx.comm_;

void *send_recv_buffer = nullptr;
if (root == dev_id) {
send_recv_buffer = const_cast<void *>(in_tensor.data<void>());
out_handle = out;
} else {
send_recv_buffer =
VariableVisitor::GetMutableTensor(out_var).mutable_data(
out->place_);
}

int type = platform::ToNCCLDataType(in_tensor.type());
all_reduce_calls.emplace_back([=] {
PADDLE_ENFORCE(platform::dynload::ncclBcast(
send_recv_buffer, in_tensor.numel(),
static_cast<ncclDataType_t>(type), root, comm, stream));
});
}

auto &out_p = out->place_;
auto *out_var = var_scopes.at(out->scope_idx_)->FindVar(out->name_);
PADDLE_ENFORCE_NOT_NULL(out_var);
PADDLE_ENFORCE_EQ(out_p.which(), in_tensor.place().which(),
"Places must be all on CPU or all on CUDA.");

VariableVisitor::ShareDimsAndLoD(*in_var, out_var);
VariableVisitor::GetMutableTensor(out_var).mutable_data(out_p,
in_tensor.type());

auto dev_ctx = dev_ctxes_.at(out_p);
RunAndRecordEvent(out_p, [in_tensor, out_var, dev_ctx, out_p] {
paddle::framework::TensorCopy(
in_tensor, out_p, *(dev_ctx),
&VariableVisitor::GetMutableTensor(out_var));
this->RunAndRecordEvent([&] {
{
platform::NCCLGroupGuard guard;
for (auto &call : all_reduce_calls) {
call();
}
}
if (*out_handle != *in_var_handle) {
auto out_var = var_scopes.at(in_var_handle->scope_idx_)
->FindVar(out_var_handles[0]->name_);
paddle::framework::TensorCopy(
in_tensor, in_var_handle->place_,
*(dev_ctxes_.at(in_var_handle->place_)),
&VariableVisitor::GetMutableTensor(out_var));
}
});
#else
PADDLE_THROW("CUDA is not support.");
#endif
}
}

Expand Down
27 changes: 26 additions & 1 deletion paddle/fluid/framework/details/broadcast_op_handle.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,35 @@
#include "paddle/fluid/framework/selected_rows.h"
#include "paddle/fluid/platform/device_context.h"

#ifdef PADDLE_WITH_CUDA
#include "paddle/fluid/platform/nccl_helper.h"
#endif

namespace paddle {
namespace framework {
namespace details {

struct BroadcastOpHandle : public OpHandleBase {
public:
#ifdef PADDLE_WITH_CUDA
BroadcastOpHandle(const std::vector<Scope *> &local_scopes,
const std::vector<platform::Place> &places, bool use_nccl,
const platform::NCCLContextMap *nccl_ctxs)
: local_scopes_(local_scopes),
places_(places),
use_nccl_(use_nccl),
nccl_ctxs_(nccl_ctxs) {
if (nccl_ctxs_) {
for (auto &p_ctx : nccl_ctxs_->contexts_) {
dev_ctxes_[platform::CUDAPlace(p_ctx.first)] = p_ctx.second.ctx_.get();
}
}
}
#else
BroadcastOpHandle(const std::vector<Scope *> &local_scopes,
const std::vector<platform::Place> &places);
const std::vector<platform::Place> &places, bool use_nccl)
: local_scopes_(local_scopes), places_(places), use_nccl_(use_nccl) {}
#endif

std::string Name() const override;

Expand All @@ -44,6 +65,10 @@ struct BroadcastOpHandle : public OpHandleBase {
private:
const std::vector<Scope *> &local_scopes_;
const std::vector<platform::Place> &places_;
bool use_nccl_;
#ifdef PADDLE_WITH_CUDA
const platform::NCCLContextMap *nccl_ctxs_;
#endif
};
} // namespace details
} // namespace framework
Expand Down
2 changes: 1 addition & 1 deletion paddle/fluid/framework/details/broadcast_op_handle_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ struct TestBroadcastOpHandle {
}
param_scopes_[input_scope_idx]->Var("input");

op_handle_.reset(new BroadcastOpHandle(local_scopes_, gpu_list_));
op_handle_.reset(new BroadcastOpHandle(local_scopes_, gpu_list_, false));

auto* in_var_handle =
new VarHandle(1, input_scope_idx, "input", gpu_list_[input_scope_idx]);
Expand Down
13 changes: 8 additions & 5 deletions paddle/fluid/framework/details/multi_devices_graph_builder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,13 @@ int MultiDevSSAGraphBuilder::GetOpDeviceID(
void MultiDevSSAGraphBuilder::CreateBroadcastOp(SSAGraph *result,
const std::string &p_name,
size_t dev_id) const {
auto *op_handle = new BroadcastOpHandle(local_scopes_, places_);
#ifdef PADDLE_WITH_CUDA
auto *op_handle =
new BroadcastOpHandle(local_scopes_, places_, true, nccl_ctxs_);
#else
auto *op_handle = new BroadcastOpHandle(local_scopes_, places_, false);
#endif

result->ops_.emplace_back(op_handle);
auto *in = result->vars_.at(dev_id).at(p_name).back().get();
op_handle->AddInput(in);
Expand All @@ -217,10 +223,7 @@ void MultiDevSSAGraphBuilder::CreateBroadcastOp(SSAGraph *result,
auto *out_var = new VarHandle(vars.size(), i, p_name, p);
vars.emplace_back(out_var);
op_handle->AddOutput(out_var);

#ifdef PADDLE_WITH_CUDA
op_handle->SetDeviceContext(p, nccl_ctxs_->DevCtx(p));
#else
#ifndef ADDLE_WITH_CUDA
op_handle->SetDeviceContext(p,
platform::DeviceContextPool::Instance().Get(p));
#endif
Expand Down
5 changes: 5 additions & 0 deletions paddle/fluid/framework/details/var_handle.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,11 @@ struct VarHandle : public VarHandleBase {
return o.generated_op_ == generated_op_ && o.name_ == name_ &&
o.scope_idx_ == scope_idx_;
}

bool operator!=(const VarHandle& o) const {
return o.generated_op_ != generated_op_ || o.name_ != name_ ||
o.scope_idx_ != scope_idx_;
}
};

// Dummy Variable. It is used to represent dependencies between operators
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -692,7 +692,7 @@ def test_all(self):
pe = fluid.ParallelExecutor(
use_cuda=True,
loss_name=avg_cost.name,
num_threads_per_dev=2,
threads_per_dev=2,
use_nccl_allreduce=use_nccl_allreduce)

feeder = fluid.DataFeeder(
Expand Down

0 comments on commit 20ba594

Please sign in to comment.