Skip to content

Commit

Permalink
Merge pull request #1720 from devitocodes/mpi-sparse-setup
Browse files Browse the repository at this point in the history
mpi: Mitigate SparseFunction setup costs
  • Loading branch information
FabioLuporini committed Aug 9, 2021
2 parents d051ffd + 07d2ead commit f53c45a
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 77 deletions.
45 changes: 24 additions & 21 deletions devito/mpi/distributed.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

from devito.data import LEFT, CENTER, RIGHT, Decomposition
from devito.parameters import configuration
from devito.tools import EnrichedTuple, as_tuple, ctypes_to_cstr, is_integer
from devito.tools import EnrichedTuple, as_tuple, ctypes_to_cstr, filter_ordered
from devito.types import CompositeObject, Object


Expand Down Expand Up @@ -308,29 +308,32 @@ def glb_to_rank(self, index):
Parameters
----------
index : int or list of ints
The index, or list of indices, for which the owning MPI rank(s) is
index : array of ints
The index, or array of indices, for which the owning MPI rank(s) is
retrieved.
"""
assert isinstance(index, (tuple, list))
if len(index) == 0:
assert isinstance(index, np.ndarray)
if index.shape[0] == 0:
return None
elif is_integer(index[0]):
# `index` is a single point
indices = [index]
else:
indices = index
ret = []
for i in indices:
assert len(i) == self.ndim
found = False
for r, j in enumerate(self.all_ranges):
if all(v in j[d] for v, d in zip(i, self.dimensions)):
ret.append(r)
found = True
break
assert found
return as_tuple(ret)
elif sum(index.shape) == 1:
return index

assert index.shape[1] == self.ndim

# Add singleton dimension at the end if only single gridpoint is passed
# instead of support.
if len(index.shape) == 2:
index = np.expand_dims(index, axis=2)

ret = {}
for r, j in enumerate(self.all_ranges):
mins = np.array([b[0] for b in j]).reshape(1, -1, 1)
maxs = np.array([b[-1] for b in j]).reshape(1, -1, 1)
inds = np.where(((index <= maxs) & (index >= mins)).all(axis=1))
if inds[0].size == 0:
continue
ret[r] = filter_ordered(inds[0])
return ret

@property
def neighborhood(self):
Expand Down
103 changes: 49 additions & 54 deletions devito/types/sparse.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,92 +114,80 @@ def inject(self, *args, **kwargs):
"""
return self.interpolator.inject(*args, **kwargs)

@cached_property
def _point_support(self):
return np.array(tuple(product(range(-self._radius + 1, self._radius + 1),
repeat=self.grid.dim)))

@property
def _support(self):
"""
The grid points surrounding each sparse point within the radius of self's
injection/interpolation operators.
"""
ret = []
for i in self.gridpoints:
support = [range(max(0, j - self._radius + 1), min(M, j + self._radius + 1))
for j, M in zip(i, self.grid.shape)]
ret.append(tuple(product(*support)))
return tuple(ret)
max_shape = np.array(self.grid.shape).reshape(1, self.grid.dim)
minmax = lambda arr: np.minimum(max_shape, np.maximum(0, arr))
return np.stack([minmax(self.gridpoints + s) for s in self._point_support],
axis=2)

@property
def _dist_datamap(self):
return self._build_dist_datamap(support=self._support)

@memoized_meth
def _build_dist_datamap(self, support=None):
"""
Mapper ``M : MPI rank -> required sparse data``.
"""
ret = {}
support = support or self._support
for i, s in enumerate(support):
# Sparse point `i` is "required" by the following ranks
for r in self.grid.distributor.glb_to_rank(s):
ret.setdefault(r, []).append(i)
return {k: filter_ordered(v) for k, v in ret.items()}
return self.grid.distributor.glb_to_rank(self._support) or {}

@property
def _dist_scatter_mask(self):
def _dist_scatter_mask(self, dmap=None):
"""
A mask to index into ``self.data``, which creates a new data array that
logically contains N consecutive groups of sparse data values, where N
is the number of MPI ranks. The i-th group contains the sparse data
values accessible by the i-th MPI rank. Thus, sparse data values along
the boundary of two or more MPI ranks are duplicated.
"""
dmap = self._dist_datamap
dmap = dmap or self._dist_datamap
mask = np.array(flatten(dmap[i] for i in sorted(dmap)), dtype=int)
ret = [slice(None) for i in range(self.ndim)]
ret = [slice(None) for _ in range(self.ndim)]
ret[self._sparse_position] = mask
return tuple(ret)

@property
def _dist_subfunc_scatter_mask(self):
def _dist_subfunc_scatter_mask(self, dmap=None):
"""
This method is analogous to :meth:`_dist_scatter_mask`, although
the mask is now suitable to index into self's SubFunctions, rather
than into ``self.data``.
"""
return self._dist_scatter_mask[self._sparse_position]
return self._dist_scatter_mask(dmap=dmap)[self._sparse_position]

@property
def _dist_gather_mask(self):
def _dist_gather_mask(self, dmap=None):
"""
A mask to index into the ``data`` received upon returning from
``self._dist_alltoall``. This mask creates a new data array in which
duplicate sparse data values have been discarded. The resulting data
array can thus be used to populate ``self.data``.
"""
ret = list(self._dist_scatter_mask)
ret = list(self._dist_scatter_mask(dmap=dmap))
mask = ret[self._sparse_position]
inds = np.unique(mask, return_index=True)[1]
inds.sort()
ret[self._sparse_position] = inds.tolist()

return tuple(ret)

@property
def _dist_subfunc_gather_mask(self):
def _dist_subfunc_gather_mask(self, dmap=None):
"""
This method is analogous to :meth:`_dist_subfunc_scatter_mask`, although
the mask is now suitable to index into self's SubFunctions, rather
than into ``self.data``.
"""
return self._dist_gather_mask[self._sparse_position]
return self._dist_gather_mask(dmap=dmap)[self._sparse_position]

@property
def _dist_count(self):
def _dist_count(self, dmap=None):
"""
A 2-tuple of comm-sized iterables, which tells how many sparse points
is this MPI rank expected to send/receive to/from each other MPI rank.
"""
dmap = self._dist_datamap
dmap = dmap or self._dist_datamap
comm = self.grid.distributor.comm

ssparse = np.array([len(dmap.get(i, [])) for i in range(comm.size)], dtype=int)
Expand All @@ -217,13 +205,12 @@ def _dist_reorder_mask(self):
ret += tuple(i for i, d in enumerate(self.indices) if d is not self._sparse_dim)
return ret

@property
def _dist_alltoall(self):
def _dist_alltoall(self, dmap=None):
"""
The metadata necessary to perform an ``MPI_Alltoallv`` distributing the
sparse data values across the MPI ranks needing them.
"""
ssparse, rsparse = self._dist_count
ssparse, rsparse = self._dist_count(dmap=dmap)

# Per-rank shape of send/recv data
sshape = []
Expand Down Expand Up @@ -259,8 +246,7 @@ def _dist_alltoall(self):

return sshape, scount, sdisp, rshape, rcount, rdisp

@property
def _dist_subfunc_alltoall(self):
def _dist_subfunc_alltoall(self, dmap=None):
"""
The metadata necessary to perform an ``MPI_Alltoallv`` distributing
self's SubFunction values across the MPI ranks needing them.
Expand Down Expand Up @@ -620,11 +606,9 @@ def _index_matrix(self, offset):
def gridpoints(self):
if self.coordinates._data is None:
raise ValueError("No coordinates attached to this SparseFunction")
ret = []
for coords in self.coordinates.data._local:
ret.append(tuple(int(np.floor(c - o)/s) for c, o, s in
zip(coords, self.grid.origin, self.grid.spacing)))
return tuple(ret)
return (
np.floor(self.coordinates.data._local - self.grid.origin) / self.grid.spacing
).astype(int)

def guard(self, expr=None, offset=0):
"""
Expand Down Expand Up @@ -676,9 +660,9 @@ def _decomposition(self):
mapper = {self._sparse_dim: self._distributor.decomposition[self._sparse_dim]}
return tuple(mapper.get(d) for d in self.dimensions)

@property
def _dist_subfunc_alltoall(self):
ssparse, rsparse = self._dist_count
def _dist_subfunc_alltoall(self, dmap=None):
dmap = dmap or self._dist_datamap
ssparse, rsparse = self._dist_count(dmap=dmap)

# Per-rank shape of send/recv `coordinates`
sshape = [(i, self.grid.dim) for i in ssparse]
Expand Down Expand Up @@ -712,22 +696,28 @@ def _dist_scatter(self, data=None):
comm = distributor.comm
mpitype = MPI._typedict[np.dtype(self.dtype).char]

# Compute dist map only once
dmap = self._dist_datamap

# Pack sparse data values so that they can be sent out via an Alltoallv
data = data[self._dist_scatter_mask]
data = data[self._dist_scatter_mask(dmap=dmap)]
data = np.ascontiguousarray(np.transpose(data, self._dist_reorder_mask))

# Send out the sparse point values
_, scount, sdisp, rshape, rcount, rdisp = self._dist_alltoall
_, scount, sdisp, rshape, rcount, rdisp = self._dist_alltoall(dmap=dmap)
scattered = np.empty(shape=rshape, dtype=self.dtype)
comm.Alltoallv([data, scount, sdisp, mpitype],
[scattered, rcount, rdisp, mpitype])
data = scattered

# Unpack data values so that they follow the expected storage layout
data = np.ascontiguousarray(np.transpose(data, self._dist_reorder_mask))

# Pack (reordered) coordinates so that they can be sent out via an Alltoallv
coords = self.coordinates.data._local[self._dist_subfunc_scatter_mask]
coords = self.coordinates.data._local[self._dist_subfunc_scatter_mask(dmap=dmap)]

# Send out the sparse point coordinates
_, scount, sdisp, rshape, rcount, rdisp = self._dist_subfunc_alltoall
_, scount, sdisp, rshape, rcount, rdisp = self._dist_subfunc_alltoall(dmap=dmap)
scattered = np.empty(shape=rshape, dtype=self.coordinates.dtype)
comm.Alltoallv([coords, scount, sdisp, mpitype],
[scattered, rcount, rdisp, mpitype])
Expand All @@ -747,28 +737,33 @@ def _dist_gather(self, data, coords):

comm = distributor.comm

# Compute dist map only once
dmap = self._dist_datamap

# Pack sparse data values so that they can be sent out via an Alltoallv
data = np.ascontiguousarray(np.transpose(data, self._dist_reorder_mask))
# Send back the sparse point values
sshape, scount, sdisp, _, rcount, rdisp = self._dist_alltoall
sshape, scount, sdisp, _, rcount, rdisp = self._dist_alltoall(dmap=dmap)
gathered = np.empty(shape=sshape, dtype=self.dtype)
mpitype = MPI._typedict[np.dtype(self.dtype).char]
comm.Alltoallv([data, rcount, rdisp, mpitype],
[gathered, scount, sdisp, mpitype])
# Unpack data values so that they follow the expected storage layout
gathered = np.ascontiguousarray(np.transpose(gathered, self._dist_reorder_mask))
self._data[:] = gathered[self._dist_gather_mask]
self._data[:] = gathered[self._dist_gather_mask(dmap=dmap)]

if coords is not None:
# Pack (reordered) coordinates so that they can be sent out via an Alltoallv
coords = coords + np.array(self.grid.origin_offset, dtype=self.dtype)
# Send out the sparse point coordinates
sshape, scount, sdisp, _, rcount, rdisp = self._dist_subfunc_alltoall
sshape, scount, sdisp, _, rcount, rdisp = \
self._dist_subfunc_alltoall(dmap=dmap)
gathered = np.empty(shape=sshape, dtype=self.coordinates.dtype)
mpitype = MPI._typedict[np.dtype(self.coordinates.dtype).char]
comm.Alltoallv([coords, rcount, rdisp, mpitype],
[gathered, scount, sdisp, mpitype])
self._coordinates.data._local[:] = gathered[self._dist_subfunc_gather_mask]
self._coordinates.data._local[:] = \
gathered[self._dist_subfunc_gather_mask(dmap=dmap)]

# Note: this method "mirrors" `_dist_scatter`: a sparse point that is sent
# in `_dist_scatter` is here received; a sparse point that is received in
Expand Down
4 changes: 2 additions & 2 deletions tests/test_mpi.py
Original file line number Diff line number Diff line change
Expand Up @@ -384,8 +384,8 @@ def test_ownership(self, shape, coords, points):
# The domain decomposition is so that the i-th MPI rank gets exactly one
# sparse point `p` and, incidentally, `p` is logically owned by `i`
assert len(sf.gridpoints) == points
assert all(grid.distributor.glb_to_rank(i)[0] == grid.distributor.myrank
for i in sf.gridpoints)
ownership = grid.distributor.glb_to_rank(sf.gridpoints)
assert list(ownership.keys()) == [grid.distributor.myrank]

@pytest.mark.parallel(mode=4)
@pytest.mark.parametrize('coords,expected', [
Expand Down

0 comments on commit f53c45a

Please sign in to comment.