Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove unused nccl comm and mpi comm #257

Merged
merged 14 commits into from
May 8, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 15 additions & 16 deletions chainermn/communicators/_communication_utility.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,23 +56,22 @@ def init_ranks(mpi_comm):
return my_ranks


def init_comms(mpi_comm, intra_rank, intra_size, inter_rank, use_nccl=True):
intra_mpi_comm = mpi_comm.Split(inter_rank, intra_rank)
inter_mpi_comm = mpi_comm.Split(intra_rank, inter_rank)
if use_nccl:
from chainermn import nccl
intra_nccl_comm_id = intra_mpi_comm.bcast(nccl.get_unique_id())
intra_nccl_comm = nccl.NcclCommunicator(
intra_size, intra_nccl_comm_id, intra_rank)
if nccl.get_version() >= 2000:
nccl_comm_id = mpi_comm.bcast(nccl.get_unique_id())
nccl_comm = nccl.NcclCommunicator(
mpi_comm.size, nccl_comm_id, mpi_comm.rank)
else:
nccl_comm = None
return intra_mpi_comm, inter_mpi_comm, intra_nccl_comm, nccl_comm
def init_intra_mpi_comm(mpi_comm, intra_rank, inter_rank):
return mpi_comm.Split(inter_rank, intra_rank)


def init_inter_mpi_comm(mpi_comm, intra_rank, inter_rank):
return mpi_comm.Split(intra_rank, inter_rank)


def init_nccl_comm(mpi_comm):
from chainermn import nccl
if mpi_comm.rank == 0:
nccl_comm_id = nccl.get_unique_id()
else:
return intra_mpi_comm, inter_mpi_comm
nccl_comm_id = None
nccl_comm_id = mpi_comm.bcast(nccl_comm_id)
return nccl.NcclCommunicator(mpi_comm.size, nccl_comm_id, mpi_comm.rank)


def inter_allreduce_gpu(
Expand Down
4 changes: 1 addition & 3 deletions chainermn/communicators/dummy_communicator.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,11 @@ class DummyCommunicator(mpi_communicator_base.MpiCommunicatorBase):
"""

def __init__(self, mpi_comm):
super(DummyCommunicator, self).__init__(mpi_comm, use_nccl=True)
super(DummyCommunicator, self).__init__(mpi_comm)

self.gpu_buffer_a = _memory_utility.DeviceMemory()

def allreduce_grad(self, model):
self._init_comms()

params = _memory_utility.extract_params(model)
itemsize = 4
n_elems_total = sum(param.grad.size for param in params)
Expand Down
4 changes: 1 addition & 3 deletions chainermn/communicators/flat_communicator.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,12 @@
class FlatCommunicator(mpi_communicator_base.MpiCommunicatorBase):

def __init__(self, mpi_comm):
super(FlatCommunicator, self).__init__(mpi_comm, False)
super(FlatCommunicator, self).__init__(mpi_comm)

self.gpu_buffer_a = _memory_utility.DeviceMemory()
self.gpu_buffer_b = _memory_utility.DeviceMemory()

def allreduce_grad(self, model):
self._init_comms()

params = _memory_utility.extract_params(model)
itemsize = 4
n_elems_total = sum(param.grad.size for param in params)
Expand Down
27 changes: 26 additions & 1 deletion chainermn/communicators/hierarchical_communicator.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,35 @@
class HierarchicalCommunicator(mpi_communicator_base.MpiCommunicatorBase):

def __init__(self, mpi_comm):
super(HierarchicalCommunicator, self).__init__(mpi_comm, use_nccl=True)
super(HierarchicalCommunicator, self).__init__(mpi_comm)
if not nccl._available:
raise RuntimeError(
'NCCL is not available. '
'Please confirm that NCCL is enabled in CuPy.'
)

# We have to delay the initialization of communicators. This is because
# NCCL's communicators use the current CUDA devices at the time of
# initialization. Therefore, we have to initialize NCCL communicators
# after users set the devices to use.
self.inter_mpi_comm = None
self.intra_nccl_comm = None

self.gpu_buffer_a = _memory_utility.DeviceMemory()
self.gpu_buffer_b = _memory_utility.DeviceMemory()

def _init_comms(self):
if self.inter_mpi_comm is not None:
assert self.intra_nccl_comm is not None
return

intra_mpi_comm = _communication_utility.init_intra_mpi_comm(
self.mpi_comm, self.intra_rank, self.inter_rank)
self.inter_mpi_comm = _communication_utility.init_inter_mpi_comm(
self.mpi_comm, self.intra_rank, self.inter_rank)
self.intra_nccl_comm = _communication_utility.init_nccl_comm(
intra_mpi_comm)

def allreduce_grad(self, model):
self._init_comms()
stream = chainer.cuda.Stream.null
Expand Down
34 changes: 1 addition & 33 deletions chainermn/communicators/mpi_communicator_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
from chainermn.communicators._communication_utility import chunked_bcast_obj
from chainermn.communicators import _memory_utility
from chainermn.communicators import communicator_base
from chainermn import nccl


def _cnt_to_dsp(cnt):
Expand Down Expand Up @@ -48,27 +47,10 @@ class MpiCommunicatorBase(communicator_base.CommunicatorBase):

'''

def __init__(self, mpi_comm, use_nccl=False):
def __init__(self, mpi_comm):
self.mpi_comm = mpi_comm
self._init_ranks()

if use_nccl and not nccl._available:
raise RuntimeError(
'NCCL is not available. '
'Please confirm that NCCL is enabled in CuPy.'
)

self.use_nccl = use_nccl

# We have to delay the initialization of communicators. This is because
# NCCL's communicators use the current CUDA devices at the time of
# initialization. Therefore, we have to initialize NCCL communicators
# after users set the devices to use.
self.inter_mpi_comm = None
self.intra_mpi_comm = None
if self.use_nccl:
self.intra_nccl_comm = None

@property
def rank(self):
return self.mpi_comm.rank
Expand Down Expand Up @@ -353,17 +335,3 @@ def _init_ranks(self):
self.intra_size = my_ranks[2]
self.inter_rank = my_ranks[3]
self.inter_size = my_ranks[4]

def _init_comms(self):
if self.inter_mpi_comm is not None:
assert self.intra_mpi_comm is not None
assert not self.use_nccl or self.intra_nccl_comm is not None
return

comms = _communication_utility.init_comms(
self.mpi_comm, self.intra_rank, self.intra_size, self.inter_rank,
use_nccl=self.use_nccl)
self.intra_mpi_comm = comms[0]
self.inter_mpi_comm = comms[1]
if self.use_nccl:
self.intra_nccl_comm = comms[2]
28 changes: 27 additions & 1 deletion chainermn/communicators/non_cuda_aware_communicator.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import math
import mpi4py.MPI

from chainermn.communicators import _communication_utility
from chainermn.communicators import _memory_utility
from chainermn.communicators import mpi_communicator_base
from chainermn import nccl
Expand All @@ -10,12 +11,37 @@
class NonCudaAwareCommunicator(mpi_communicator_base.MpiCommunicatorBase):

def __init__(self, mpi_comm):
super(NonCudaAwareCommunicator, self).__init__(mpi_comm, use_nccl=True)
super(NonCudaAwareCommunicator, self).__init__(mpi_comm)
if not nccl._available:
raise RuntimeError(
'NCCL is not available. '
'Please confirm that NCCL is enabled in CuPy.'
)

# We have to delay the initialization of communicators. This is because
# NCCL's communicators use the current CUDA devices at the time of
# initialization. Therefore, we have to initialize NCCL communicators
# after users set the devices to use.
self.inter_mpi_comm = None
self.intra_nccl_comm = None

self.gpu_buffer_a = _memory_utility.DeviceMemory()
self.gpu_buffer_b = _memory_utility.DeviceMemory()
self.cpu_buffer_a = _memory_utility.HostPinnedMemory()
self.cpu_buffer_b = _memory_utility.HostPinnedMemory()

def _init_comms(self):
if self.inter_mpi_comm is not None:
assert self.intra_nccl_comm is not None
return

intra_mpi_comm = _communication_utility.init_intra_mpi_comm(
self.mpi_comm, self.intra_rank, self.inter_rank)
self.inter_mpi_comm = _communication_utility.init_inter_mpi_comm(
self.mpi_comm, self.intra_rank, self.inter_rank)
self.intra_nccl_comm = _communication_utility.init_nccl_comm(
intra_mpi_comm)

def bcast_data(self, model):
for _, param in sorted(model.namedparams()):
data = param.data
Expand Down
34 changes: 8 additions & 26 deletions chainermn/communicators/pure_nccl_communicator.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,15 @@
class PureNcclCommunicator(mpi_communicator_base.MpiCommunicatorBase):

def __init__(self, mpi_comm, allreduce_grad_dtype=None):
super(PureNcclCommunicator, self).__init__(mpi_comm, True)
if nccl.get_version() < 2000:
super(PureNcclCommunicator, self).__init__(mpi_comm)
if not nccl._available or nccl.get_version() < 2000:
raise RuntimeError(
'PureNcclCommunicator is only supported on NCCL 2.0+')
self._init_ranks()

self.inter_mpi_comm = None
self.intra_mpi_comm = None
self.intra_nccl_comm = None
# We have to delay the initialization of communicators. This is because
# NCCL's communicators use the current CUDA devices at the time of
# initialization. Therefore, we have to initialize NCCL communicators
# after users set the devices to use.
self.nccl_comm = None

self.gpu_tmp_buffer = _memory_utility.DeviceMemory()
Expand All @@ -39,28 +39,10 @@ def __init__(self, mpi_comm, allreduce_grad_dtype=None):
self.allreduce_dtype_to_grad_dtype_kernel = None
self.div_by_size = None

def _init_ranks(self):
my_ranks = _communication_utility.init_ranks(self.mpi_comm)
assert my_ranks[0] == self.mpi_comm.rank
self._intra_rank = my_ranks[1]
self.intra_size = my_ranks[2]
self.inter_rank = my_ranks[3]
self.inter_size = my_ranks[4]

def _init_comms(self):
if self.inter_mpi_comm is not None:
assert self.intra_mpi_comm is not None
assert self.intra_nccl_comm is not None
assert self.nccl_comm is not None
if self.nccl_comm is not None:
return

comms = _communication_utility.init_comms(
self.mpi_comm, self.intra_rank, self.intra_size, self.inter_rank,
use_nccl=True)
self.intra_mpi_comm = comms[0]
self.inter_mpi_comm = comms[1]
self.intra_nccl_comm = comms[2]
self.nccl_comm = comms[3]
self.nccl_comm = _communication_utility.init_nccl_comm(self.mpi_comm)

def allreduce_grad(self, model):
stream = chainer.cuda.Stream.null
Expand Down
22 changes: 21 additions & 1 deletion chainermn/communicators/single_node_communicator.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import chainer.cuda

from chainermn.communicators import _communication_utility
from chainermn.communicators import _memory_utility
from chainermn.communicators import mpi_communicator_base
from chainermn import nccl
Expand All @@ -8,15 +9,34 @@
class SingleNodeCommunicator(mpi_communicator_base.MpiCommunicatorBase):

def __init__(self, mpi_comm):
super(SingleNodeCommunicator, self).__init__(mpi_comm, use_nccl=True)
super(SingleNodeCommunicator, self).__init__(mpi_comm)

if self.inter_size != 1:
raise ValueError('SingleNodeCommunicator cannot be used under '
'multi-node settings')
if not nccl._available:
raise RuntimeError(
'NCCL is not available. '
'Please confirm that NCCL is enabled in CuPy.'
)
# We have to delay the initialization of communicators. This is because
# NCCL's communicators use the current CUDA devices at the time of
# initialization. Therefore, we have to initialize NCCL communicators
# after users set the devices to use.
self.intra_nccl_comm = None

self.gpu_buffer_a = _memory_utility.DeviceMemory()
self.gpu_buffer_b = _memory_utility.DeviceMemory()

def _init_comms(self):
if self.intra_nccl_comm is not None:
return

intra_mpi_comm = _communication_utility.init_intra_mpi_comm(
self.mpi_comm, self.intra_rank, self.inter_rank)
self.intra_nccl_comm = _communication_utility.init_nccl_comm(
intra_mpi_comm)

def bcast_data(self, model):
self._init_comms()
stream = chainer.cuda.Stream.null
Expand Down
27 changes: 26 additions & 1 deletion chainermn/communicators/two_dimensional_communicator.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,35 @@ class TwoDimensionalCommunicator(mpi_communicator_base.MpiCommunicatorBase):

def __init__(self, mpi_comm=mpi4py.MPI.COMM_WORLD):
super(TwoDimensionalCommunicator, self).__init__(
mpi_comm, use_nccl=True)
mpi_comm)
if not nccl._available:
raise RuntimeError(
'NCCL is not available. '
'Please confirm that NCCL is enabled in CuPy.'
)

# We have to delay the initialization of communicators. This is because
# NCCL's communicators use the current CUDA devices at the time of
# initialization. Therefore, we have to initialize NCCL communicators
# after users set the devices to use.
self.inter_mpi_comm = None
self.intra_nccl_comm = None

self.gpu_buffer_a = _memory_utility.DeviceMemory()
self.gpu_buffer_b = _memory_utility.DeviceMemory()

def _init_comms(self):
if self.inter_mpi_comm is not None:
assert self.intra_nccl_comm is not None
return

intra_mpi_comm = _communication_utility.init_intra_mpi_comm(
self.mpi_comm, self.intra_rank, self.inter_rank)
self.inter_mpi_comm = _communication_utility.init_inter_mpi_comm(
self.mpi_comm, self.intra_rank, self.inter_rank)
self.intra_nccl_comm = _communication_utility.init_nccl_comm(
intra_mpi_comm)

def allreduce_grad(self, model):
self._init_comms()
stream = chainer.cuda.Stream.null
Expand Down
3 changes: 3 additions & 0 deletions tests/chainermn_tests/communicator_tests/test_communicator.py
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,9 @@ def check_collective_communication(param, use_gpu):
check_bcast_data(communicator, model)
check_allreduce_grad(communicator, model)
check_allreduce_grad_empty(communicator, model)
# barrier() requires before destructor of PureNcclCommunicator
# because communication may not be finished.
communicator.mpi_comm.barrier()


# chainer.testing.parameterize is not available at functions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

class NodeAwareNaiveCommunicator(MpiCommunicatorBase):
def __init__(self, mpi_comm):
super(NodeAwareNaiveCommunicator, self).__init__(mpi_comm, False)
super(NodeAwareNaiveCommunicator, self).__init__(mpi_comm)

def allreduce_grad(self, model):
raise NotImplementedError()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,9 @@ def test_update(self):
chainer.testing.assert_allclose(
self.optimizer.communicated_target.c.W.grad,
(base + 5) * np.ones((5, 4)))
# barrier() requires before destructor of PureNcclCommunicator
# because communication may not be finished.
self.comm.mpi_comm.barrier()


class DynamicExampleModel(chainer.Chain):
Expand Down Expand Up @@ -206,3 +209,6 @@ def test_update(self):
chainer.testing.assert_allclose(
self.optimizer.communicated_target.c.W.grad,
(base + 11) * np.ones((4, 4)))
# barrier() requires before destructor of PureNcclCommunicator
# because communication may not be finished.
self.comm.mpi_comm.barrier()