Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/insert reduce_op to parallel exe #10096

Closed
Show file tree
Hide file tree
Changes from 23 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
7995380
Merge branch 'feature/refine_gather_reduce' of https://github.com/che…
chengduoZH Apr 18, 2018
a6715d6
clean code
chengduoZH Apr 18, 2018
ac7b414
fix VisitVariable
chengduoZH Apr 18, 2018
9c8b9c0
Merge branch 'feature/add_reduce_op_handle' of https://github.com/che…
chengduoZH Apr 19, 2018
bd11523
insert reduce op handle to parallel_exe
chengduoZH Apr 19, 2018
9f45c06
fix API
chengduoZH Apr 19, 2018
cecebbe
debug
chengduoZH Apr 19, 2018
01b0106
merge feature/add_reduce_op_handle_Debug
chengduoZH Apr 19, 2018
dee077b
unit test can work
chengduoZH Apr 19, 2018
c741aeb
clean code
chengduoZH Apr 19, 2018
fca0fb9
debug
chengduoZH Apr 20, 2018
0561fae
debug
chengduoZH Apr 20, 2018
a22d385
Polish MultiDevicesBuilder
reyoung Apr 20, 2018
ed13f4a
Merge branch 'develop' of https://github.com/PaddlePaddle/Paddle into…
chengduoZH Apr 20, 2018
e0f37f8
fix reduce op, broadcast op and gather op
chengduoZH Apr 20, 2018
8d83914
fix the threads per dev
chengduoZH Apr 20, 2018
cec94e1
Merge branch 'develop' of https://github.com/PaddlePaddle/Paddle into…
chengduoZH Apr 21, 2018
3210055
code refine
chengduoZH Apr 22, 2018
20ba594
add nccl broadcast
chengduoZH Apr 22, 2018
bbad887
clean code
chengduoZH Apr 23, 2018
f965e9a
Merge branch 'develop' of https://github.com/PaddlePaddle/Paddle into…
chengduoZH Apr 24, 2018
7b58d47
Merge branch 'develop' of https://github.com/PaddlePaddle/Paddle into…
chengduoZH Apr 24, 2018
7ee07df
follow comments
chengduoZH Apr 24, 2018
ea78be2
follow comments
chengduoZH Apr 24, 2018
ed052f1
Merge develop branch and fix CPPLint issues
chengduoZH May 2, 2018
c0a3746
Merge branch 'develop' of https://github.com/PaddlePaddle/Paddle into…
chengduoZH May 2, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 3 additions & 1 deletion paddle/fluid/framework/details/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,14 @@ if(WITH_GPU)
dynload_cuda)
set(multi_devices_graph_builder_deps nccl_all_reduce_op_handle)
nv_library(reduce_op_handle SRCS reduce_op_handle.cc DEPS op_handle_base variable_visitor scope ddim dynload_cuda)
nv_library(broadcast_op_handle SRCS broadcast_op_handle.cc DEPS op_handle_base scope ddim memory variable_visitor dynload_cuda)

else()
set(multi_devices_graph_builder_deps)
cc_library(reduce_op_handle SRCS reduce_op_handle.cc DEPS op_handle_base variable_visitor scope ddim)
cc_library(broadcast_op_handle SRCS broadcast_op_handle.cc DEPS op_handle_base scope ddim memory variable_visitor)
endif()

cc_library(broadcast_op_handle SRCS broadcast_op_handle.cc DEPS op_handle_base scope ddim memory variable_visitor)
cc_library(gather_op_handle SRCS gather_op_handle.cc DEPS op_handle_base scope ddim memory variable_visitor)

cc_library(multi_devices_graph_builder SRCS multi_devices_graph_builder.cc DEPS ssa_graph_builder computation_op_handle
Expand Down
106 changes: 85 additions & 21 deletions paddle/fluid/framework/details/broadcast_op_handle.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,9 @@
namespace paddle {
namespace framework {
namespace details {
BroadcastOpHandle::BroadcastOpHandle(const std::vector<Scope *> &local_scopes,
const std::vector<platform::Place> &places)
: local_scopes_(local_scopes), places_(places) {}

void BroadcastOpHandle::RunImpl() {
if (places_.size() == 1) return;
// the input and output may have dummy var.
VarHandle *in_var_handle;

Expand Down Expand Up @@ -55,27 +53,93 @@ void BroadcastOpHandle::RunImpl() {

Tensor &in_tensor = VariableVisitor::GetMutableTensor(in_var);

for (auto *out : out_var_handles) {
if (*out == *in_var_handle) {
continue;
if (!use_nccl_ || platform::is_cpu_place(in_tensor.place())) {
for (auto *out : out_var_handles) {
if (*out == *in_var_handle) {
continue;
}

auto &out_p = out->place_;
auto *out_var = var_scopes.at(out->scope_idx_)->FindVar(out->name_);
PADDLE_ENFORCE_NOT_NULL(out_var);
PADDLE_ENFORCE_EQ(out_p.which(), in_tensor.place().which(),
"Places must be all on CPU or all on CUDA.");

VariableVisitor::ShareDimsAndLoD(*in_var, out_var);
VariableVisitor::GetMutableTensor(out_var).mutable_data(out_p,
in_tensor.type());

auto dev_ctx = dev_ctxes_.at(out_p);
RunAndRecordEvent(out_p, [in_tensor, out_var, dev_ctx, out_p] {
paddle::framework::TensorCopy(
in_tensor, out_p, *(dev_ctx),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no need for () for dex_ctx?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have removed.

&VariableVisitor::GetMutableTensor(out_var));
});
}
} else {
#ifdef PADDLE_WITH_CUDA
PADDLE_ENFORCE(platform::is_gpu_place(in_tensor.place()));
VarHandle *out_handle;
int root = boost::get<platform::CUDAPlace>(in_tensor.place()).device;
std::vector<std::function<void()>> all_reduce_calls;

for (size_t j = 0; j < out_var_handles.size(); ++j) {
auto *out = out_var_handles[j];
auto *out_var = var_scopes.at(out->scope_idx_)->FindVar(out->name_);

if (*out != *in_var_handle) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you comparing VarHandle instead of VarHandle*? Is it correct?
Hopefully, we don't use "auto" everywhere.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you comparing VarHandle instead of VarHandle*?

line 90 compares the instance but not the point.

Is it correct?

The point of input (in_var_handle) and output(out) must be different because the output varHandle is always created. code

Hopefully, we don't use "auto" everywhere.

Thanks, I will correct later.

PADDLE_ENFORCE_NOT_NULL(out_var);
PADDLE_ENFORCE_EQ(out->place_.which(), in_tensor.place().which(),
"Places must be all on CPU or all on CUDA.");
VariableVisitor::ShareDimsAndLoD(*in_var, out_var);
VariableVisitor::GetMutableTensor(out_var).mutable_data(
out->place_, in_tensor.type());
}

auto out_p = out->place_;
int dev_id = boost::get<platform::CUDAPlace>(out_p).device;

auto &nccl_ctx = nccl_ctxs_->at(dev_id);
auto stream = nccl_ctx.stream();
auto comm = nccl_ctx.comm_;

void *send_recv_buffer = nullptr;
if (root == dev_id) {
send_recv_buffer = const_cast<void *>(in_tensor.data<void>());
out_handle = out;
} else {
send_recv_buffer =
VariableVisitor::GetMutableTensor(out_var).mutable_data(
out->place_);
}

int type = platform::ToNCCLDataType(in_tensor.type());
all_reduce_calls.emplace_back([=] {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is "broadcast" not "all_reduce"?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, thanks!

PADDLE_ENFORCE(platform::dynload::ncclBcast(
send_recv_buffer, in_tensor.numel(),
static_cast<ncclDataType_t>(type), root, comm, stream));
});
}

auto &out_p = out->place_;
auto *out_var = var_scopes.at(out->scope_idx_)->FindVar(out->name_);
PADDLE_ENFORCE_NOT_NULL(out_var);
PADDLE_ENFORCE_EQ(out_p.which(), in_var_handle->place_.which(),
"Places must be all on CPU or all on CUDA.");

VariableVisitor::ShareDimsAndLoD(*in_var, out_var);
VariableVisitor::GetMutableTensor(out_var).mutable_data(out_p,
in_tensor.type());

auto dev_ctx = dev_ctxes_.at(out_p);
RunAndRecordEvent(out_p, [in_tensor, out_var, dev_ctx, out_p] {
paddle::framework::TensorCopy(
in_tensor, out_p, *(dev_ctx),
&VariableVisitor::GetMutableTensor(out_var));
this->RunAndRecordEvent([&] {
{
platform::NCCLGroupGuard guard;
for (auto &call : all_reduce_calls) {
call();
}
}
if (*out_handle != *in_var_handle) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is out_handle? What is the usage of this code block? Can you add some comments to explain this block?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Broadcast is to broadcast the root's data to all the devices except itself, so if the input and output of root device are different, we need to add the copying operation.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is root device and what is input and output of root device?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, the description above is a bit obscure. I regarded the source device, which will send the data to other devices, as the root node.
The function of broadcast_op is sending input data to all the devices, so there must exist one of output and input is in the same device, if the value(tensor or seleceted_rows) of them is not equal, we should do the memory copying manually.

auto out_var = var_scopes.at(in_var_handle->scope_idx_)
->FindVar(out_var_handles[0]->name_);
paddle::framework::TensorCopy(
in_tensor, in_var_handle->place_,
*(dev_ctxes_.at(in_var_handle->place_)),
&VariableVisitor::GetMutableTensor(out_var));
}
});
#else
PADDLE_THROW("CUDA is not support.");
#endif
}
}

Expand Down
27 changes: 26 additions & 1 deletion paddle/fluid/framework/details/broadcast_op_handle.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,35 @@
#include "paddle/fluid/framework/selected_rows.h"
#include "paddle/fluid/platform/device_context.h"

#ifdef PADDLE_WITH_CUDA
#include "paddle/fluid/platform/nccl_helper.h"
#endif

namespace paddle {
namespace framework {
namespace details {

struct BroadcastOpHandle : public OpHandleBase {
public:
#ifdef PADDLE_WITH_CUDA
BroadcastOpHandle(const std::vector<Scope *> &local_scopes,
const std::vector<platform::Place> &places, bool use_nccl,
const platform::NCCLContextMap *nccl_ctxs)
: local_scopes_(local_scopes),
places_(places),
use_nccl_(use_nccl),
nccl_ctxs_(nccl_ctxs) {
if (nccl_ctxs_) {
for (auto &p_ctx : nccl_ctxs_->contexts_) {
dev_ctxes_[platform::CUDAPlace(p_ctx.first)] = p_ctx.second.ctx_.get();
}
}
}
#else
BroadcastOpHandle(const std::vector<Scope *> &local_scopes,
const std::vector<platform::Place> &places);
const std::vector<platform::Place> &places, bool use_nccl)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use_nccl_ should be false and we don't need use_nccl argument here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BroadcastOp also can broadcast CPU data which can not use nccl.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

right, that's why we don't need use_nccl argument here for CPU case?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for pointing out, in order to compare the performance between memory copying and ncclBcast, I add this parameter, the result is ncclBcast is faster. So use_nccl can be removed.

: local_scopes_(local_scopes), places_(places), use_nccl_(use_nccl) {}
#endif

std::string Name() const override;

Expand All @@ -44,6 +65,10 @@ struct BroadcastOpHandle : public OpHandleBase {
private:
const std::vector<Scope *> &local_scopes_;
const std::vector<platform::Place> &places_;
bool use_nccl_;
#ifdef PADDLE_WITH_CUDA
const platform::NCCLContextMap *nccl_ctxs_;
#endif
};
} // namespace details
} // namespace framework
Expand Down
36 changes: 33 additions & 3 deletions paddle/fluid/framework/details/broadcast_op_handle_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,25 @@ struct TestBroadcastOpHandle {
std::unique_ptr<OpHandleBase> op_handle_;
std::vector<std::unique_ptr<VarHandleBase>> vars_;
std::vector<p::Place> gpu_list_;
bool use_gpu_;
#ifdef PADDLE_WITH_CUDA
std::unique_ptr<platform::NCCLContextMap> nccl_ctxs_;
#endif

void WaitAll() {
for (size_t j = 0; j < ctxs_.size(); ++j) {
ctxs_[j]->Wait();
}
#ifdef PADDLE_WITH_CUDA
if (nccl_ctxs_) {
nccl_ctxs_->WaitAll();
}
#endif
}

void InitCtxOnGpu(bool use_gpu) {
if (use_gpu) {
use_gpu_ = use_gpu;
if (use_gpu_) {
#ifdef PADDLE_WITH_CUDA
int count = p::GetCUDADeviceCount();
if (count <= 1) {
Expand All @@ -57,6 +67,7 @@ struct TestBroadcastOpHandle {
gpu_list_.push_back(p);
ctxs_.emplace_back(new p::CUDADeviceContext(p));
}
nccl_ctxs_.reset(new platform::NCCLContextMap(gpu_list_));
#else
PADDLE_THROW("CUDA is not support.");
#endif
Expand All @@ -67,6 +78,9 @@ struct TestBroadcastOpHandle {
gpu_list_.push_back(p);
ctxs_.emplace_back(new p::CPUDeviceContext(p));
}
#ifdef PADDLE_WITH_CUDA
nccl_ctxs_.reset(nullptr);
#endif
}
}

Expand All @@ -82,7 +96,21 @@ struct TestBroadcastOpHandle {
}
param_scopes_[input_scope_idx]->Var("input");

op_handle_.reset(new BroadcastOpHandle(local_scopes_, gpu_list_));
#ifdef PADDLE_WITH_CUDA
op_handle_.reset(new BroadcastOpHandle(local_scopes_, gpu_list_, use_gpu_,
nccl_ctxs_.get()));
#endif

if (use_gpu_) {
#ifndef PADDLE_WITH_CUDA
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are so many ifdef here, can we improve this block of codes?

PADDLE_THROW("CUDA is not support.");
#endif
} else {
#ifndef PADDLE_WITH_CUDA
op_handle_.reset(
new BroadcastOpHandle(local_scopes_, gpu_list_, use_gpu_));
#endif
}

auto* in_var_handle =
new VarHandle(1, input_scope_idx, "input", gpu_list_[input_scope_idx]);
Expand All @@ -97,7 +125,9 @@ struct TestBroadcastOpHandle {
op_handle_->AddInput(dummy_var_handle);

for (size_t j = 0; j < gpu_list_.size(); ++j) {
op_handle_->SetDeviceContext(gpu_list_[j], ctxs_[j].get());
if (!use_gpu_) {
op_handle_->SetDeviceContext(gpu_list_[j], ctxs_[j].get());
}
VarHandle* out_var_handle = new VarHandle(2, j, "out", gpu_list_[j]);
vars_.emplace_back(out_var_handle);
op_handle_->AddOutput(out_var_handle);
Expand Down
53 changes: 26 additions & 27 deletions paddle/fluid/framework/details/gather_op_handle.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ GatherOpHandle::GatherOpHandle(const std::vector<Scope *> &local_scopes,
: local_scopes_(local_scopes), places_(places) {}

void GatherOpHandle::RunImpl() {
if (places_.size() == 1) return;
// the input and output may have dummy var.
auto in_var_handles = DynamicCast<VarHandle>(inputs_);

Expand Down Expand Up @@ -53,55 +54,53 @@ void GatherOpHandle::RunImpl() {
PADDLE_ENFORCE(pre_in_var->IsType<framework::SelectedRows>(),
"Currently, gather_op only can gather SelectedRows.");

auto pre_place = in_0_handle->place_;
PADDLE_ENFORCE_EQ(out_var_handle->place_.which(), pre_place.which(),
"The place of input and output should be the same.");

// Wait input done, this Wait is asynchronous operation
WaitInputVarGenerated(in_var_handles);

std::vector<int64_t> out_rows;
std::vector<Tensor> in_tensors;
std::vector<platform::Place> in_places;

auto &pre_in = pre_in_var->Get<framework::SelectedRows>();
auto &pre_in_value = pre_in_var->Get<framework::SelectedRows>();
// gather the inputs
for (auto *in_handle : in_var_handles) {
auto in_p = in_handle->place_;
in_places.push_back(in_p);
PADDLE_ENFORCE_EQ(in_p.which(), pre_place.which(),
"Places must be all on CPU or all on CUDA.");
auto *in_var =
var_scopes.at(in_handle->scope_idx_)->FindVar(in_handle->name_);
auto &in_sr = in_var->Get<framework::SelectedRows>();
PADDLE_ENFORCE_NOT_NULL(in_var);

auto &in_sr_value = in_var->Get<framework::SelectedRows>();

PADDLE_ENFORCE_EQ(in_sr.value().type(), pre_in.value().type(),
PADDLE_ENFORCE_EQ(in_sr_value.place().which(), pre_in_value.place().which(),
"Places must be all on CPU or all on GPU.");
PADDLE_ENFORCE_EQ(in_sr_value.value().type(), pre_in_value.value().type(),
"The type of input is not consistent.");
PADDLE_ENFORCE_EQ(pre_in.height(), in_sr.height(),
PADDLE_ENFORCE_EQ(in_sr_value.height(), pre_in_value.height(),
"The height of inputs is not consistent.");
PADDLE_ENFORCE_EQ(pre_in.GetCompleteDims(), in_sr.GetCompleteDims(),
PADDLE_ENFORCE_EQ(in_sr_value.GetCompleteDims(),
pre_in_value.GetCompleteDims(),
"The dims of inputs is not consistent.");

auto &in_sr_rows = in_sr.rows();
auto &in_sr_rows = in_sr_value.rows();
out_rows.insert(out_rows.end(), in_sr_rows.begin(), in_sr_rows.end());

in_tensors.emplace_back(in_sr.value());
in_tensors.emplace_back(in_sr_value.value());
}

// write the output
auto &out_place = out_var_handle->place_;
auto out_scope_idx = out_var_handle->scope_idx_;
auto out_var = var_scopes.at(out_scope_idx)->FindVar(out_var_handle->name_);

auto out = out_var->GetMutable<framework::SelectedRows>();
out->set_height(pre_in.height());
out->set_rows(out_rows);
PADDLE_ENFORCE_EQ(out_place.which(), pre_in_value.place().which(),
"Places must be all on CPU or all on GPU.");
auto out_var =
var_scopes.at(out_var_handle->scope_idx_)->FindVar(out_var_handle->name_);
PADDLE_ENFORCE_NOT_NULL(out_var);
auto out_value = out_var->GetMutable<framework::SelectedRows>();
out_value->set_height(pre_in_value.height());
out_value->set_rows(out_rows);
size_t rows = out_rows.size();
DDim out_dim = pre_in.GetCompleteDims();
DDim out_dim = pre_in_value.GetCompleteDims();
out_dim[0] = static_cast<int64_t>(rows);
out->mutable_value()->Resize(out_dim);
out->mutable_value()->mutable_data(out_place, pre_in.value().type());
Tensor *out_tensor = out->mutable_value();
out_value->mutable_value()->Resize(out_dim);
out_value->mutable_value()->mutable_data(out_place,
pre_in_value.value().type());
Tensor *out_tensor = out_value->mutable_value();

// copy
auto dev_ctx = dev_ctxes_[out_place];
Expand Down