Skip to content

Commit

Permalink
Address review comments
Browse files Browse the repository at this point in the history
Signed-off-by: Nicolas V Castet <nvcastet@us.ibm.com>
  • Loading branch information
nvcastet committed Jan 7, 2020
1 parent 45ec23d commit db24f30
Show file tree
Hide file tree
Showing 11 changed files with 88 additions and 83 deletions.
4 changes: 0 additions & 4 deletions horovod/common/message.cc
Expand Up @@ -56,9 +56,6 @@ const std::string& DataType_Name(DataType value) {
case HOROVOD_BOOL:
static const std::string bool_("bool");
return bool_;
case HOROVOD_BYTE:
static const std::string byte_("byte");
return byte_;
default:
static const std::string unknown("<unknown>");
return unknown;
Expand All @@ -68,7 +65,6 @@ const std::string& DataType_Name(DataType value) {
std::size_t DataType_Size(DataType value) {
switch (value) {
case HOROVOD_UINT8:
case HOROVOD_BYTE:
return sizeof(u_int8_t);
case HOROVOD_INT8:
return sizeof(int8_t);
Expand Down
1 change: 0 additions & 1 deletion horovod/common/message.h
Expand Up @@ -35,7 +35,6 @@ enum DataType {
HOROVOD_FLOAT32 = 7,
HOROVOD_FLOAT64 = 8,
HOROVOD_BOOL = 9,
HOROVOD_BYTE = 10,
};

const std::string& DataType_Name(DataType value);
Expand Down
2 changes: 0 additions & 2 deletions horovod/common/mpi/mpi_context.cc
Expand Up @@ -53,8 +53,6 @@ MPI_Datatype MPIContext::GetMPIDataType(const DataType dtype) {
return MPI_DOUBLE;
case HOROVOD_BOOL:
return MPI_C_BOOL;
case HOROVOD_BYTE:
return MPI_BYTE;
default:
throw std::logic_error("Type " + DataType_Name(dtype) +
" is not supported in MPI mode.");
Expand Down
19 changes: 6 additions & 13 deletions horovod/common/ops/adasum_cuda_operations.cc
Expand Up @@ -22,7 +22,7 @@ AdasumCudaAllreduceOp::AdasumCudaAllreduceOp(MPIContext* mpi_context,
NCCLContext* nccl_context,
CUDAContext* cuda_context,
HorovodGlobalState* global_state)
: NCCLAllreduce(nccl_context, cuda_context, global_state),
: NCCLAllreduce(nccl_context, cuda_context, global_state, Communicator::LOCAL),
AdasumMPI(mpi_context, global_state) {
// Pre-allocate host buffer size equal to the fusion buffer length
current_host_buffer_length =
Expand Down Expand Up @@ -66,7 +66,7 @@ AdasumCudaAllreduceOp::NcclHierarchical(std::vector<TensorTableEntry>& entries,
nccl_device_map.push_back(response.devices()[rank]);
}
cuda_op_context_.InitCUDA(entries);
InitNCCLComm(entries, nccl_device_map);
nccl_op_context_.InitNCCLComm(entries, nccl_device_map);
cuda_op_context_.InitCUDAQueue(entries, response);
const void* fused_input_data;
void* buffer_data;
Expand Down Expand Up @@ -157,7 +157,7 @@ AdasumCudaAllreduceOp::NcclHierarchical(std::vector<TensorTableEntry>& entries,
auto nccl_result = ncclReduceScatter(
fused_input_data, buffer_data_at_rank_offset,
(size_t)num_elements_per_rank, GetNCCLDataType(first_entry.tensor),
ncclSum, *nccl_comm_, *cuda_op_context_.stream);
ncclSum, *nccl_op_context_.nccl_comm_, *cuda_op_context_.stream);

nccl_context_->ErrorCheck("ncclReduceScatter", nccl_result);
if (global_state_->timeline.Initialized()) {
Expand All @@ -172,7 +172,7 @@ AdasumCudaAllreduceOp::NcclHierarchical(std::vector<TensorTableEntry>& entries,
auto nccl_result = ncclReduce(
fused_input_data_remainder, buffer_data_remainder,
(size_t)num_elements_remaining, GetNCCLDataType(first_entry.tensor),
ncclSum, root_rank, *nccl_comm_, *cuda_op_context_.stream);
ncclSum, root_rank, *nccl_op_context_.nccl_comm_, *cuda_op_context_.stream);

nccl_context_->ErrorCheck("ncclReduce", nccl_result);
if (global_state_->timeline.Initialized()) {
Expand Down Expand Up @@ -272,7 +272,7 @@ AdasumCudaAllreduceOp::NcclHierarchical(std::vector<TensorTableEntry>& entries,
"ncclAllGather", ncclAllGather(buffer_data_at_rank_offset, buffer_data,
(size_t)num_elements_per_rank,
GetNCCLDataType(first_entry.tensor),
*nccl_comm_, *cuda_op_context_.stream));
*nccl_op_context_.nccl_comm_, *cuda_op_context_.stream));
if (global_state_->timeline.Initialized()) {
cuda_context_->RecordEvent(cuda_op_context_.event_queue, NCCL_ALLGATHER,
*cuda_op_context_.stream);
Expand All @@ -282,7 +282,7 @@ AdasumCudaAllreduceOp::NcclHierarchical(std::vector<TensorTableEntry>& entries,
nccl_context_->ErrorCheck(
"ncclBcast",
ncclBcast(buffer_data_remainder, (size_t)num_elements_remaining,
GetNCCLDataType(first_entry.tensor), root_rank, *nccl_comm_,
GetNCCLDataType(first_entry.tensor), root_rank, *nccl_op_context_.nccl_comm_,
*cuda_op_context_.stream));
if (global_state_->timeline.Initialized()) {
cuda_context_->RecordEvent(cuda_op_context_.event_queue, NCCL_BCAST,
Expand All @@ -304,13 +304,6 @@ AdasumCudaAllreduceOp::NcclHierarchical(std::vector<TensorTableEntry>& entries,
return cuda_op_context_.FinalizeCUDAQueue(entries, false);
}

void AdasumCudaAllreduceOp::PopulateNCCLCommStrategy(
int& nccl_rank, int& nccl_size, Communicator& nccl_id_bcast_comm) {
nccl_rank = global_state_->controller->GetLocalRank();
nccl_size = global_state_->controller->GetLocalSize();
nccl_id_bcast_comm = Communicator::LOCAL;
}

bool AdasumCudaAllreduceOp::Enabled(
const ParameterManager& param_manager,
const std::vector<TensorTableEntry>& entries,
Expand Down
3 changes: 0 additions & 3 deletions horovod/common/ops/adasum_cuda_operations.h
Expand Up @@ -39,9 +39,6 @@ class AdasumCudaAllreduceOp : public AdasumMPI, public NCCLAllreduce {
const Response& response) override;

protected:
void PopulateNCCLCommStrategy(int& nccl_rank, int& nccl_size,
Communicator& nccl_id_bcast_comm) override;

Status NcclHierarchical(std::vector<TensorTableEntry>& entries,
const Response& response);

Expand Down
2 changes: 0 additions & 2 deletions horovod/common/ops/ccl_operations.cc
Expand Up @@ -34,8 +34,6 @@ namespace common {

ccl_datatype_t GetCCLDataType(const std::shared_ptr<Tensor>& tensor) {
switch (tensor->dtype()) {
case HOROVOD_BYTE:
return ccl_dtype_char;
case HOROVOD_FLOAT32:
return ccl_dtype_float;
case HOROVOD_FLOAT64:
Expand Down
1 change: 0 additions & 1 deletion horovod/common/ops/gloo_operations.cc
Expand Up @@ -32,7 +32,6 @@ IGlooAlgorithms* GetAlgorithmsForType(DataType dtype,
GlooContext* gloo_context) {
switch (dtype) {
case HOROVOD_UINT8:
case HOROVOD_BYTE:
return new GlooAlgorithms<u_int8_t>(gloo_context);
case HOROVOD_INT8:
return new GlooAlgorithms<int8_t>(gloo_context);
Expand Down
56 changes: 28 additions & 28 deletions horovod/common/ops/nccl_operations.cc
Expand Up @@ -22,7 +22,6 @@ namespace common {
ncclDataType_t GetNCCLDataType(const std::shared_ptr<Tensor> tensor) {
switch (tensor->dtype()) {
case HOROVOD_UINT8:
case HOROVOD_BYTE:
return ncclUint8;
case HOROVOD_INT8:
return ncclInt8;
Expand Down Expand Up @@ -57,12 +56,12 @@ void NCCLContext::ShutDown(){
nccl_comms.clear();
}

void NCCLOp::InitNCCLComm(const std::vector<TensorTableEntry>& entries,
const std::vector<int32_t>& nccl_device_map) {
void NCCLOpContext::InitNCCLComm(const std::vector<TensorTableEntry>& entries,
const std::vector<int32_t>& nccl_device_map) {
// Ensure NCCL communicator is in the map before executing operation.
ncclComm_t& nccl_comm = nccl_context_->nccl_comms[hvd_global_state_->current_nccl_stream][nccl_device_map];
ncclComm_t& nccl_comm = nccl_context_->nccl_comms[global_state_->current_nccl_stream][nccl_device_map];
if (nccl_comm == nullptr) {
auto& timeline = hvd_global_state_->timeline;
auto& timeline = global_state_->timeline;
timeline.ActivityStartAll(entries, INIT_NCCL);

int nccl_rank, nccl_size;
Expand All @@ -74,7 +73,7 @@ void NCCLOp::InitNCCLComm(const std::vector<TensorTableEntry>& entries,
nccl_context_->ErrorCheck("ncclGetUniqueId", ncclGetUniqueId(&nccl_id));
}

hvd_global_state_->controller->Bcast((void*)&nccl_id, sizeof(nccl_id), 0,
global_state_->controller->Bcast((void*)&nccl_id, sizeof(nccl_id), 0,
nccl_id_bcast_comm);

ncclComm_t new_nccl_comm;
Expand All @@ -84,27 +83,35 @@ void NCCLOp::InitNCCLComm(const std::vector<TensorTableEntry>& entries,

// Barrier helps NCCL to synchronize after initialization and avoid
// deadlock that we've been seeing without it.
hvd_global_state_->controller->Barrier(Communicator::GLOBAL);
global_state_->controller->Barrier(Communicator::GLOBAL);

timeline.ActivityEndAll(entries);
}

nccl_comm_ = &nccl_comm;
}

void NCCLOp::PopulateNCCLCommStrategy(int& nccl_rank, int& nccl_size,
Communicator& nccl_id_bcast_comm) {
nccl_rank = hvd_global_state_->controller->GetRank();
nccl_size = hvd_global_state_->controller->GetSize();
nccl_id_bcast_comm = Communicator::GLOBAL;
void NCCLOpContext::PopulateNCCLCommStrategy(int& nccl_rank, int& nccl_size,
Communicator& nccl_id_bcast_comm) {
if (communicator_type_ == Communicator::GLOBAL) {
nccl_rank = global_state_->controller->GetRank();
nccl_size = global_state_->controller->GetSize();
} else if (communicator_type_ == Communicator::LOCAL) {
nccl_rank = global_state_->controller->GetLocalRank();
nccl_size = global_state_->controller->GetLocalSize();
} else {
throw std::logic_error("Communicator type " + std::to_string(communicator_type_) +
" is not supported in NCCL mode.");
}
nccl_id_bcast_comm = communicator_type_;
}

Status NCCLAllreduce::Execute(std::vector<TensorTableEntry>& entries,
const Response& response) {
auto& first_entry = entries[0];

cuda_op_context_.InitCUDA(entries);
InitNCCLComm(entries, response.devices());
nccl_op_context_.InitNCCLComm(entries, response.devices());
cuda_op_context_.InitCUDAQueue(entries, response);

const void* fused_input_data;
Expand Down Expand Up @@ -133,7 +140,7 @@ Status NCCLAllreduce::Execute(std::vector<TensorTableEntry>& entries,
auto nccl_result = ncclAllReduce(fused_input_data, buffer_data,
(size_t) num_elements,
GetNCCLDataType(first_entry.tensor), ncclSum,
*nccl_comm_, *cuda_op_context_.stream);
*nccl_op_context_.nccl_comm_, *cuda_op_context_.stream);
nccl_context_->ErrorCheck("ncclAllReduce", nccl_result);
if (global_state_->timeline.Initialized()) {
cuda_context_->RecordEvent(cuda_op_context_.event_queue, NCCL_ALLREDUCE, *cuda_op_context_.stream);
Expand Down Expand Up @@ -166,7 +173,7 @@ NCCLHierarchicalAllreduce::Execute(std::vector<TensorTableEntry>& entries,
}

cuda_op_context_.InitCUDA(entries);
InitNCCLComm(entries, nccl_device_map);
nccl_op_context_.InitNCCLComm(entries, nccl_device_map);
cuda_op_context_.InitCUDAQueue(entries, response);

const void* fused_input_data;
Expand Down Expand Up @@ -258,7 +265,7 @@ NCCLHierarchicalAllreduce::Execute(std::vector<TensorTableEntry>& entries,
buffer_data_at_rank_offset,
(size_t) num_elements_per_rank,
GetNCCLDataType(first_entry.tensor),
ncclSum, *nccl_comm_, *cuda_op_context_.stream);
ncclSum, *nccl_op_context_.nccl_comm_, *cuda_op_context_.stream);
nccl_context_->ErrorCheck("ncclReduceScatter", nccl_result);
if (global_state_->timeline.Initialized()) {
cuda_context_->RecordEvent(cuda_op_context_.event_queue, NCCL_REDUCESCATTER, *cuda_op_context_.stream);
Expand All @@ -272,7 +279,7 @@ NCCLHierarchicalAllreduce::Execute(std::vector<TensorTableEntry>& entries,
buffer_data_remainder,
(size_t) num_elements_remaining,
GetNCCLDataType(first_entry.tensor), ncclSum,
root_rank, *nccl_comm_, *cuda_op_context_.stream);
root_rank, *nccl_op_context_.nccl_comm_, *cuda_op_context_.stream);
nccl_context_->ErrorCheck("ncclReduce", nccl_result);
if (global_state_->timeline.Initialized()) {
cuda_context_->RecordEvent(cuda_op_context_.event_queue, NCCL_REDUCE, *cuda_op_context_.stream);
Expand Down Expand Up @@ -322,7 +329,7 @@ NCCLHierarchicalAllreduce::Execute(std::vector<TensorTableEntry>& entries,
ncclAllGather(buffer_data_at_rank_offset, buffer_data,
(size_t) num_elements_per_rank,
GetNCCLDataType(first_entry.tensor),
*nccl_comm_, *cuda_op_context_.stream));
*nccl_op_context_.nccl_comm_, *cuda_op_context_.stream));
if (global_state_->timeline.Initialized()) {
cuda_context_->RecordEvent(cuda_op_context_.event_queue, NCCL_ALLGATHER, *cuda_op_context_.stream);
}
Expand All @@ -332,7 +339,7 @@ NCCLHierarchicalAllreduce::Execute(std::vector<TensorTableEntry>& entries,
ncclBcast(buffer_data_remainder,
(size_t) num_elements_remaining,
GetNCCLDataType(first_entry.tensor), root_rank,
*nccl_comm_, *cuda_op_context_.stream));
*nccl_op_context_.nccl_comm_, *cuda_op_context_.stream));
if (global_state_->timeline.Initialized()) {
cuda_context_->RecordEvent(cuda_op_context_.event_queue, NCCL_BCAST, *cuda_op_context_.stream);
}
Expand All @@ -358,13 +365,6 @@ bool NCCLHierarchicalAllreduce::Enabled(const ParameterManager& param_manager,
}
return param_manager.HierarchicalAllreduce();
}

void NCCLHierarchicalAllreduce::PopulateNCCLCommStrategy(int& nccl_rank, int& nccl_size,
Communicator& nccl_id_bcast_comm) {
nccl_rank = global_state_->controller->GetLocalRank();
nccl_size = global_state_->controller->GetLocalSize();
nccl_id_bcast_comm = Communicator::LOCAL;
}
#endif

Status NCCLBroadcast::Execute(std::vector<TensorTableEntry>& entries,
Expand All @@ -373,7 +373,7 @@ Status NCCLBroadcast::Execute(std::vector<TensorTableEntry>& entries,
auto e = entries[0];

cuda_op_context_.InitCUDA(entries);
InitNCCLComm(entries, response.devices());
nccl_op_context_.InitNCCLComm(entries, response.devices());
cuda_op_context_.InitCUDAQueue(entries, response);

// On root rank, ncclbcast sends data, on other ranks it receives data.
Expand All @@ -391,7 +391,7 @@ Status NCCLBroadcast::Execute(std::vector<TensorTableEntry>& entries,
e.tensor->shape().num_elements() *
DataType_Size(e.tensor->dtype()),
ncclChar, e.root_rank,
*nccl_comm_, *cuda_op_context_.stream));
*nccl_op_context_.nccl_comm_, *cuda_op_context_.stream));
if (global_state_->timeline.Initialized()) {
cuda_context_->RecordEvent(cuda_op_context_.event_queue, NCCL_BCAST, *cuda_op_context_.stream);
}
Expand Down
53 changes: 33 additions & 20 deletions horovod/common/ops/nccl_operations.h
Expand Up @@ -38,47 +38,63 @@ struct NCCLContext {
void ShutDown();
};

class NCCLOp {
class NCCLOpContext {
public:
NCCLOp(NCCLContext* nccl_context, HorovodGlobalState* global_state)
NCCLOpContext(NCCLContext* nccl_context, HorovodGlobalState* global_state,
horovod::common::Communicator communicator_type)
: nccl_context_(nccl_context),
nccl_comm_(nullptr),
hvd_global_state_(global_state){};
global_state_(global_state),
communicator_type_(communicator_type){};

protected:
private:
void InitNCCLComm(const std::vector<TensorTableEntry>& entries,
const std::vector<int32_t>& nccl_device_map);

virtual void PopulateNCCLCommStrategy(int& nccl_rank, int& nccl_size,
Communicator& nccl_id_bcast_comm);
void PopulateNCCLCommStrategy(int& nccl_rank, int& nccl_size,
Communicator& nccl_id_bcast_comm);

NCCLContext* nccl_context_;
ncclComm_t* nccl_comm_;

private:
HorovodGlobalState* hvd_global_state_;
HorovodGlobalState* global_state_;
horovod::common::Communicator communicator_type_;
};

class NCCLAllreduce : public NCCLOp, public CUDAAllreduce {
class NCCLAllreduce : public CUDAAllreduce {
public:
NCCLAllreduce(NCCLContext* nccl_context, CUDAContext* cuda_context,
HorovodGlobalState* global_state)
: NCCLOp(nccl_context, global_state),
CUDAAllreduce(cuda_context, global_state){};
HorovodGlobalState* global_state,
horovod::common::Communicator communicator_type = Communicator::GLOBAL)
: CUDAAllreduce(cuda_context, global_state),
nccl_context_(nccl_context),
nccl_op_context_(nccl_context, global_state, communicator_type),
global_state_(global_state){};

Status Execute(std::vector<TensorTableEntry>& entries,
const Response& response) override;

protected:
NCCLContext* nccl_context_;
NCCLOpContext nccl_op_context_;
HorovodGlobalState* global_state_;
};

class NCCLBroadcast : public NCCLOp, public CUDABroadcast {
class NCCLBroadcast : public CUDABroadcast {
public:
NCCLBroadcast(NCCLContext* nccl_context, CUDAContext* cuda_context,
HorovodGlobalState* global_state)
: NCCLOp(nccl_context, global_state),
CUDABroadcast(cuda_context, global_state){};
: CUDABroadcast(cuda_context, global_state),
nccl_context_(nccl_context),
nccl_op_context_(nccl_context, global_state, Communicator::GLOBAL),
global_state_(global_state){};

Status Execute(std::vector<TensorTableEntry>& entries,
const Response& response) override;

protected:
NCCLContext* nccl_context_;
NCCLOpContext nccl_op_context_;
HorovodGlobalState* global_state_;
};

#if HAVE_MPI
Expand All @@ -87,7 +103,7 @@ class NCCLHierarchicalAllreduce : public NCCLAllreduce {
NCCLHierarchicalAllreduce(NCCLContext* nccl_context, MPIContext* mpi_context,
CUDAContext* cuda_context,
HorovodGlobalState* global_state)
: NCCLAllreduce(nccl_context, cuda_context, global_state),
: NCCLAllreduce(nccl_context, cuda_context, global_state, Communicator::LOCAL),
mpi_context_(mpi_context){};

Status Execute(std::vector<TensorTableEntry>& entries,
Expand All @@ -98,9 +114,6 @@ class NCCLHierarchicalAllreduce : public NCCLAllreduce {
const Response& response) const override;

private:
void PopulateNCCLCommStrategy(int& nccl_rank, int& nccl_size,
Communicator& nccl_id_bcast_comm) override;

MPIContext* mpi_context_;
};
#endif
Expand Down

0 comments on commit db24f30

Please sign in to comment.