From db24f3035f5f3970ab8e17a8d139303dad665504 Mon Sep 17 00:00:00 2001 From: Nicolas V Castet Date: Tue, 7 Jan 2020 08:54:59 -0600 Subject: [PATCH] Address review comments Signed-off-by: Nicolas V Castet --- horovod/common/message.cc | 4 -- horovod/common/message.h | 1 - horovod/common/mpi/mpi_context.cc | 2 - horovod/common/ops/adasum_cuda_operations.cc | 19 +++---- horovod/common/ops/adasum_cuda_operations.h | 3 -- horovod/common/ops/ccl_operations.cc | 2 - horovod/common/ops/gloo_operations.cc | 1 - horovod/common/ops/nccl_operations.cc | 56 ++++++++++---------- horovod/common/ops/nccl_operations.h | 53 +++++++++++------- horovod/common/wire/message.fbs | 3 +- horovod/common/wire/message_generated.h | 27 +++++++--- 11 files changed, 88 insertions(+), 83 deletions(-) diff --git a/horovod/common/message.cc b/horovod/common/message.cc index 57527f0498..17f2dee29c 100644 --- a/horovod/common/message.cc +++ b/horovod/common/message.cc @@ -56,9 +56,6 @@ const std::string& DataType_Name(DataType value) { case HOROVOD_BOOL: static const std::string bool_("bool"); return bool_; - case HOROVOD_BYTE: - static const std::string byte_("byte"); - return byte_; default: static const std::string unknown(""); return unknown; @@ -68,7 +65,6 @@ const std::string& DataType_Name(DataType value) { std::size_t DataType_Size(DataType value) { switch (value) { case HOROVOD_UINT8: - case HOROVOD_BYTE: return sizeof(u_int8_t); case HOROVOD_INT8: return sizeof(int8_t); diff --git a/horovod/common/message.h b/horovod/common/message.h index 787383fbd2..064d18d55e 100644 --- a/horovod/common/message.h +++ b/horovod/common/message.h @@ -35,7 +35,6 @@ enum DataType { HOROVOD_FLOAT32 = 7, HOROVOD_FLOAT64 = 8, HOROVOD_BOOL = 9, - HOROVOD_BYTE = 10, }; const std::string& DataType_Name(DataType value); diff --git a/horovod/common/mpi/mpi_context.cc b/horovod/common/mpi/mpi_context.cc index feec803d61..a960956750 100644 --- a/horovod/common/mpi/mpi_context.cc +++ b/horovod/common/mpi/mpi_context.cc @@ -53,8 +53,6 @@ MPI_Datatype MPIContext::GetMPIDataType(const DataType dtype) { return MPI_DOUBLE; case HOROVOD_BOOL: return MPI_C_BOOL; - case HOROVOD_BYTE: - return MPI_BYTE; default: throw std::logic_error("Type " + DataType_Name(dtype) + " is not supported in MPI mode."); diff --git a/horovod/common/ops/adasum_cuda_operations.cc b/horovod/common/ops/adasum_cuda_operations.cc index 9b6005f94f..a85957a535 100644 --- a/horovod/common/ops/adasum_cuda_operations.cc +++ b/horovod/common/ops/adasum_cuda_operations.cc @@ -22,7 +22,7 @@ AdasumCudaAllreduceOp::AdasumCudaAllreduceOp(MPIContext* mpi_context, NCCLContext* nccl_context, CUDAContext* cuda_context, HorovodGlobalState* global_state) - : NCCLAllreduce(nccl_context, cuda_context, global_state), + : NCCLAllreduce(nccl_context, cuda_context, global_state, Communicator::LOCAL), AdasumMPI(mpi_context, global_state) { // Pre-allocate host buffer size equal to the fusion buffer length current_host_buffer_length = @@ -66,7 +66,7 @@ AdasumCudaAllreduceOp::NcclHierarchical(std::vector& entries, nccl_device_map.push_back(response.devices()[rank]); } cuda_op_context_.InitCUDA(entries); - InitNCCLComm(entries, nccl_device_map); + nccl_op_context_.InitNCCLComm(entries, nccl_device_map); cuda_op_context_.InitCUDAQueue(entries, response); const void* fused_input_data; void* buffer_data; @@ -157,7 +157,7 @@ AdasumCudaAllreduceOp::NcclHierarchical(std::vector& entries, auto nccl_result = ncclReduceScatter( fused_input_data, buffer_data_at_rank_offset, (size_t)num_elements_per_rank, GetNCCLDataType(first_entry.tensor), - ncclSum, *nccl_comm_, *cuda_op_context_.stream); + ncclSum, *nccl_op_context_.nccl_comm_, *cuda_op_context_.stream); nccl_context_->ErrorCheck("ncclReduceScatter", nccl_result); if (global_state_->timeline.Initialized()) { @@ -172,7 +172,7 @@ AdasumCudaAllreduceOp::NcclHierarchical(std::vector& entries, auto nccl_result = ncclReduce( fused_input_data_remainder, buffer_data_remainder, (size_t)num_elements_remaining, GetNCCLDataType(first_entry.tensor), - ncclSum, root_rank, *nccl_comm_, *cuda_op_context_.stream); + ncclSum, root_rank, *nccl_op_context_.nccl_comm_, *cuda_op_context_.stream); nccl_context_->ErrorCheck("ncclReduce", nccl_result); if (global_state_->timeline.Initialized()) { @@ -272,7 +272,7 @@ AdasumCudaAllreduceOp::NcclHierarchical(std::vector& entries, "ncclAllGather", ncclAllGather(buffer_data_at_rank_offset, buffer_data, (size_t)num_elements_per_rank, GetNCCLDataType(first_entry.tensor), - *nccl_comm_, *cuda_op_context_.stream)); + *nccl_op_context_.nccl_comm_, *cuda_op_context_.stream)); if (global_state_->timeline.Initialized()) { cuda_context_->RecordEvent(cuda_op_context_.event_queue, NCCL_ALLGATHER, *cuda_op_context_.stream); @@ -282,7 +282,7 @@ AdasumCudaAllreduceOp::NcclHierarchical(std::vector& entries, nccl_context_->ErrorCheck( "ncclBcast", ncclBcast(buffer_data_remainder, (size_t)num_elements_remaining, - GetNCCLDataType(first_entry.tensor), root_rank, *nccl_comm_, + GetNCCLDataType(first_entry.tensor), root_rank, *nccl_op_context_.nccl_comm_, *cuda_op_context_.stream)); if (global_state_->timeline.Initialized()) { cuda_context_->RecordEvent(cuda_op_context_.event_queue, NCCL_BCAST, @@ -304,13 +304,6 @@ AdasumCudaAllreduceOp::NcclHierarchical(std::vector& entries, return cuda_op_context_.FinalizeCUDAQueue(entries, false); } -void AdasumCudaAllreduceOp::PopulateNCCLCommStrategy( - int& nccl_rank, int& nccl_size, Communicator& nccl_id_bcast_comm) { - nccl_rank = global_state_->controller->GetLocalRank(); - nccl_size = global_state_->controller->GetLocalSize(); - nccl_id_bcast_comm = Communicator::LOCAL; -} - bool AdasumCudaAllreduceOp::Enabled( const ParameterManager& param_manager, const std::vector& entries, diff --git a/horovod/common/ops/adasum_cuda_operations.h b/horovod/common/ops/adasum_cuda_operations.h index 6223a712a1..8d92bd970c 100644 --- a/horovod/common/ops/adasum_cuda_operations.h +++ b/horovod/common/ops/adasum_cuda_operations.h @@ -39,9 +39,6 @@ class AdasumCudaAllreduceOp : public AdasumMPI, public NCCLAllreduce { const Response& response) override; protected: - void PopulateNCCLCommStrategy(int& nccl_rank, int& nccl_size, - Communicator& nccl_id_bcast_comm) override; - Status NcclHierarchical(std::vector& entries, const Response& response); diff --git a/horovod/common/ops/ccl_operations.cc b/horovod/common/ops/ccl_operations.cc index 6116785b5c..e87627ff58 100644 --- a/horovod/common/ops/ccl_operations.cc +++ b/horovod/common/ops/ccl_operations.cc @@ -34,8 +34,6 @@ namespace common { ccl_datatype_t GetCCLDataType(const std::shared_ptr& tensor) { switch (tensor->dtype()) { - case HOROVOD_BYTE: - return ccl_dtype_char; case HOROVOD_FLOAT32: return ccl_dtype_float; case HOROVOD_FLOAT64: diff --git a/horovod/common/ops/gloo_operations.cc b/horovod/common/ops/gloo_operations.cc index 6faa4b850c..f90a260aa6 100644 --- a/horovod/common/ops/gloo_operations.cc +++ b/horovod/common/ops/gloo_operations.cc @@ -32,7 +32,6 @@ IGlooAlgorithms* GetAlgorithmsForType(DataType dtype, GlooContext* gloo_context) { switch (dtype) { case HOROVOD_UINT8: - case HOROVOD_BYTE: return new GlooAlgorithms(gloo_context); case HOROVOD_INT8: return new GlooAlgorithms(gloo_context); diff --git a/horovod/common/ops/nccl_operations.cc b/horovod/common/ops/nccl_operations.cc index e14a6f3966..58e5aef0ae 100644 --- a/horovod/common/ops/nccl_operations.cc +++ b/horovod/common/ops/nccl_operations.cc @@ -22,7 +22,6 @@ namespace common { ncclDataType_t GetNCCLDataType(const std::shared_ptr tensor) { switch (tensor->dtype()) { case HOROVOD_UINT8: - case HOROVOD_BYTE: return ncclUint8; case HOROVOD_INT8: return ncclInt8; @@ -57,12 +56,12 @@ void NCCLContext::ShutDown(){ nccl_comms.clear(); } -void NCCLOp::InitNCCLComm(const std::vector& entries, - const std::vector& nccl_device_map) { +void NCCLOpContext::InitNCCLComm(const std::vector& entries, + const std::vector& nccl_device_map) { // Ensure NCCL communicator is in the map before executing operation. - ncclComm_t& nccl_comm = nccl_context_->nccl_comms[hvd_global_state_->current_nccl_stream][nccl_device_map]; + ncclComm_t& nccl_comm = nccl_context_->nccl_comms[global_state_->current_nccl_stream][nccl_device_map]; if (nccl_comm == nullptr) { - auto& timeline = hvd_global_state_->timeline; + auto& timeline = global_state_->timeline; timeline.ActivityStartAll(entries, INIT_NCCL); int nccl_rank, nccl_size; @@ -74,7 +73,7 @@ void NCCLOp::InitNCCLComm(const std::vector& entries, nccl_context_->ErrorCheck("ncclGetUniqueId", ncclGetUniqueId(&nccl_id)); } - hvd_global_state_->controller->Bcast((void*)&nccl_id, sizeof(nccl_id), 0, + global_state_->controller->Bcast((void*)&nccl_id, sizeof(nccl_id), 0, nccl_id_bcast_comm); ncclComm_t new_nccl_comm; @@ -84,7 +83,7 @@ void NCCLOp::InitNCCLComm(const std::vector& entries, // Barrier helps NCCL to synchronize after initialization and avoid // deadlock that we've been seeing without it. - hvd_global_state_->controller->Barrier(Communicator::GLOBAL); + global_state_->controller->Barrier(Communicator::GLOBAL); timeline.ActivityEndAll(entries); } @@ -92,11 +91,19 @@ void NCCLOp::InitNCCLComm(const std::vector& entries, nccl_comm_ = &nccl_comm; } -void NCCLOp::PopulateNCCLCommStrategy(int& nccl_rank, int& nccl_size, - Communicator& nccl_id_bcast_comm) { - nccl_rank = hvd_global_state_->controller->GetRank(); - nccl_size = hvd_global_state_->controller->GetSize(); - nccl_id_bcast_comm = Communicator::GLOBAL; +void NCCLOpContext::PopulateNCCLCommStrategy(int& nccl_rank, int& nccl_size, + Communicator& nccl_id_bcast_comm) { + if (communicator_type_ == Communicator::GLOBAL) { + nccl_rank = global_state_->controller->GetRank(); + nccl_size = global_state_->controller->GetSize(); + } else if (communicator_type_ == Communicator::LOCAL) { + nccl_rank = global_state_->controller->GetLocalRank(); + nccl_size = global_state_->controller->GetLocalSize(); + } else { + throw std::logic_error("Communicator type " + std::to_string(communicator_type_) + + " is not supported in NCCL mode."); + } + nccl_id_bcast_comm = communicator_type_; } Status NCCLAllreduce::Execute(std::vector& entries, @@ -104,7 +111,7 @@ Status NCCLAllreduce::Execute(std::vector& entries, auto& first_entry = entries[0]; cuda_op_context_.InitCUDA(entries); - InitNCCLComm(entries, response.devices()); + nccl_op_context_.InitNCCLComm(entries, response.devices()); cuda_op_context_.InitCUDAQueue(entries, response); const void* fused_input_data; @@ -133,7 +140,7 @@ Status NCCLAllreduce::Execute(std::vector& entries, auto nccl_result = ncclAllReduce(fused_input_data, buffer_data, (size_t) num_elements, GetNCCLDataType(first_entry.tensor), ncclSum, - *nccl_comm_, *cuda_op_context_.stream); + *nccl_op_context_.nccl_comm_, *cuda_op_context_.stream); nccl_context_->ErrorCheck("ncclAllReduce", nccl_result); if (global_state_->timeline.Initialized()) { cuda_context_->RecordEvent(cuda_op_context_.event_queue, NCCL_ALLREDUCE, *cuda_op_context_.stream); @@ -166,7 +173,7 @@ NCCLHierarchicalAllreduce::Execute(std::vector& entries, } cuda_op_context_.InitCUDA(entries); - InitNCCLComm(entries, nccl_device_map); + nccl_op_context_.InitNCCLComm(entries, nccl_device_map); cuda_op_context_.InitCUDAQueue(entries, response); const void* fused_input_data; @@ -258,7 +265,7 @@ NCCLHierarchicalAllreduce::Execute(std::vector& entries, buffer_data_at_rank_offset, (size_t) num_elements_per_rank, GetNCCLDataType(first_entry.tensor), - ncclSum, *nccl_comm_, *cuda_op_context_.stream); + ncclSum, *nccl_op_context_.nccl_comm_, *cuda_op_context_.stream); nccl_context_->ErrorCheck("ncclReduceScatter", nccl_result); if (global_state_->timeline.Initialized()) { cuda_context_->RecordEvent(cuda_op_context_.event_queue, NCCL_REDUCESCATTER, *cuda_op_context_.stream); @@ -272,7 +279,7 @@ NCCLHierarchicalAllreduce::Execute(std::vector& entries, buffer_data_remainder, (size_t) num_elements_remaining, GetNCCLDataType(first_entry.tensor), ncclSum, - root_rank, *nccl_comm_, *cuda_op_context_.stream); + root_rank, *nccl_op_context_.nccl_comm_, *cuda_op_context_.stream); nccl_context_->ErrorCheck("ncclReduce", nccl_result); if (global_state_->timeline.Initialized()) { cuda_context_->RecordEvent(cuda_op_context_.event_queue, NCCL_REDUCE, *cuda_op_context_.stream); @@ -322,7 +329,7 @@ NCCLHierarchicalAllreduce::Execute(std::vector& entries, ncclAllGather(buffer_data_at_rank_offset, buffer_data, (size_t) num_elements_per_rank, GetNCCLDataType(first_entry.tensor), - *nccl_comm_, *cuda_op_context_.stream)); + *nccl_op_context_.nccl_comm_, *cuda_op_context_.stream)); if (global_state_->timeline.Initialized()) { cuda_context_->RecordEvent(cuda_op_context_.event_queue, NCCL_ALLGATHER, *cuda_op_context_.stream); } @@ -332,7 +339,7 @@ NCCLHierarchicalAllreduce::Execute(std::vector& entries, ncclBcast(buffer_data_remainder, (size_t) num_elements_remaining, GetNCCLDataType(first_entry.tensor), root_rank, - *nccl_comm_, *cuda_op_context_.stream)); + *nccl_op_context_.nccl_comm_, *cuda_op_context_.stream)); if (global_state_->timeline.Initialized()) { cuda_context_->RecordEvent(cuda_op_context_.event_queue, NCCL_BCAST, *cuda_op_context_.stream); } @@ -358,13 +365,6 @@ bool NCCLHierarchicalAllreduce::Enabled(const ParameterManager& param_manager, } return param_manager.HierarchicalAllreduce(); } - -void NCCLHierarchicalAllreduce::PopulateNCCLCommStrategy(int& nccl_rank, int& nccl_size, - Communicator& nccl_id_bcast_comm) { - nccl_rank = global_state_->controller->GetLocalRank(); - nccl_size = global_state_->controller->GetLocalSize(); - nccl_id_bcast_comm = Communicator::LOCAL; -} #endif Status NCCLBroadcast::Execute(std::vector& entries, @@ -373,7 +373,7 @@ Status NCCLBroadcast::Execute(std::vector& entries, auto e = entries[0]; cuda_op_context_.InitCUDA(entries); - InitNCCLComm(entries, response.devices()); + nccl_op_context_.InitNCCLComm(entries, response.devices()); cuda_op_context_.InitCUDAQueue(entries, response); // On root rank, ncclbcast sends data, on other ranks it receives data. @@ -391,7 +391,7 @@ Status NCCLBroadcast::Execute(std::vector& entries, e.tensor->shape().num_elements() * DataType_Size(e.tensor->dtype()), ncclChar, e.root_rank, - *nccl_comm_, *cuda_op_context_.stream)); + *nccl_op_context_.nccl_comm_, *cuda_op_context_.stream)); if (global_state_->timeline.Initialized()) { cuda_context_->RecordEvent(cuda_op_context_.event_queue, NCCL_BCAST, *cuda_op_context_.stream); } diff --git a/horovod/common/ops/nccl_operations.h b/horovod/common/ops/nccl_operations.h index 114f5c855c..8bfc4e11ec 100644 --- a/horovod/common/ops/nccl_operations.h +++ b/horovod/common/ops/nccl_operations.h @@ -38,47 +38,63 @@ struct NCCLContext { void ShutDown(); }; -class NCCLOp { +class NCCLOpContext { public: - NCCLOp(NCCLContext* nccl_context, HorovodGlobalState* global_state) + NCCLOpContext(NCCLContext* nccl_context, HorovodGlobalState* global_state, + horovod::common::Communicator communicator_type) : nccl_context_(nccl_context), nccl_comm_(nullptr), - hvd_global_state_(global_state){}; + global_state_(global_state), + communicator_type_(communicator_type){}; -protected: +private: void InitNCCLComm(const std::vector& entries, const std::vector& nccl_device_map); - virtual void PopulateNCCLCommStrategy(int& nccl_rank, int& nccl_size, - Communicator& nccl_id_bcast_comm); + void PopulateNCCLCommStrategy(int& nccl_rank, int& nccl_size, + Communicator& nccl_id_bcast_comm); NCCLContext* nccl_context_; ncclComm_t* nccl_comm_; - -private: - HorovodGlobalState* hvd_global_state_; + HorovodGlobalState* global_state_; + horovod::common::Communicator communicator_type_; }; -class NCCLAllreduce : public NCCLOp, public CUDAAllreduce { +class NCCLAllreduce : public CUDAAllreduce { public: NCCLAllreduce(NCCLContext* nccl_context, CUDAContext* cuda_context, - HorovodGlobalState* global_state) - : NCCLOp(nccl_context, global_state), - CUDAAllreduce(cuda_context, global_state){}; + HorovodGlobalState* global_state, + horovod::common::Communicator communicator_type = Communicator::GLOBAL) + : CUDAAllreduce(cuda_context, global_state), + nccl_context_(nccl_context), + nccl_op_context_(nccl_context, global_state, communicator_type), + global_state_(global_state){}; Status Execute(std::vector& entries, const Response& response) override; + +protected: + NCCLContext* nccl_context_; + NCCLOpContext nccl_op_context_; + HorovodGlobalState* global_state_; }; -class NCCLBroadcast : public NCCLOp, public CUDABroadcast { +class NCCLBroadcast : public CUDABroadcast { public: NCCLBroadcast(NCCLContext* nccl_context, CUDAContext* cuda_context, HorovodGlobalState* global_state) - : NCCLOp(nccl_context, global_state), - CUDABroadcast(cuda_context, global_state){}; + : CUDABroadcast(cuda_context, global_state), + nccl_context_(nccl_context), + nccl_op_context_(nccl_context, global_state, Communicator::GLOBAL), + global_state_(global_state){}; Status Execute(std::vector& entries, const Response& response) override; + +protected: + NCCLContext* nccl_context_; + NCCLOpContext nccl_op_context_; + HorovodGlobalState* global_state_; }; #if HAVE_MPI @@ -87,7 +103,7 @@ class NCCLHierarchicalAllreduce : public NCCLAllreduce { NCCLHierarchicalAllreduce(NCCLContext* nccl_context, MPIContext* mpi_context, CUDAContext* cuda_context, HorovodGlobalState* global_state) - : NCCLAllreduce(nccl_context, cuda_context, global_state), + : NCCLAllreduce(nccl_context, cuda_context, global_state, Communicator::LOCAL), mpi_context_(mpi_context){}; Status Execute(std::vector& entries, @@ -98,9 +114,6 @@ class NCCLHierarchicalAllreduce : public NCCLAllreduce { const Response& response) const override; private: - void PopulateNCCLCommStrategy(int& nccl_rank, int& nccl_size, - Communicator& nccl_id_bcast_comm) override; - MPIContext* mpi_context_; }; #endif diff --git a/horovod/common/wire/message.fbs b/horovod/common/wire/message.fbs index 8a2ca5cc29..e78563733e 100644 --- a/horovod/common/wire/message.fbs +++ b/horovod/common/wire/message.fbs @@ -28,8 +28,7 @@ enum DataType:byte { HOROVOD_FLOAT16 = 6, HOROVOD_FLOAT32 = 7, HOROVOD_FLOAT64 = 8, - HOROVOD_BOOL = 9, - HOROVOD_BYTE = 10 + HOROVOD_BOOL = 9 } // An Request is a message sent from a rank greater than zero to the diff --git a/horovod/common/wire/message_generated.h b/horovod/common/wire/message_generated.h index 10d8bc5b19..8dc42f0a28 100644 --- a/horovod/common/wire/message_generated.h +++ b/horovod/common/wire/message_generated.h @@ -1,3 +1,19 @@ +// Copyright 2019 Uber Technologies, Inc. All Rights Reserved. +// Modifications copyright Microsoft +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ============================================================================= + // automatically generated by the FlatBuffers compiler, do not modify @@ -29,12 +45,11 @@ enum DataType { DataType_HOROVOD_FLOAT32 = 7, DataType_HOROVOD_FLOAT64 = 8, DataType_HOROVOD_BOOL = 9, - DataType_HOROVOD_BYTE = 10, DataType_MIN = DataType_HOROVOD_UINT8, - DataType_MAX = DataType_HOROVOD_BYTE + DataType_MAX = DataType_HOROVOD_BOOL }; -inline const DataType (&EnumValuesDataType())[11] { +inline const DataType (&EnumValuesDataType())[10] { static const DataType values[] = { DataType_HOROVOD_UINT8, DataType_HOROVOD_INT8, @@ -45,8 +60,7 @@ inline const DataType (&EnumValuesDataType())[11] { DataType_HOROVOD_FLOAT16, DataType_HOROVOD_FLOAT32, DataType_HOROVOD_FLOAT64, - DataType_HOROVOD_BOOL, - DataType_HOROVOD_BYTE + DataType_HOROVOD_BOOL }; return values; } @@ -63,14 +77,13 @@ inline const char * const *EnumNamesDataType() { "HOROVOD_FLOAT32", "HOROVOD_FLOAT64", "HOROVOD_BOOL", - "HOROVOD_BYTE", nullptr }; return names; } inline const char *EnumNameDataType(DataType e) { - if (e < DataType_HOROVOD_UINT8 || e > DataType_HOROVOD_BYTE) return ""; + if (e < DataType_HOROVOD_UINT8 || e > DataType_HOROVOD_BOOL) return ""; const size_t index = static_cast(e); return EnumNamesDataType()[index]; }