Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

mpi: Mitigate SparseFunction setup costs #1720

Merged
merged 3 commits into from
Aug 9, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
45 changes: 24 additions & 21 deletions devito/mpi/distributed.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

from devito.data import LEFT, CENTER, RIGHT, Decomposition
from devito.parameters import configuration
from devito.tools import EnrichedTuple, as_tuple, ctypes_to_cstr, is_integer
from devito.tools import EnrichedTuple, as_tuple, ctypes_to_cstr, filter_ordered
from devito.types import CompositeObject, Object


Expand Down Expand Up @@ -308,29 +308,32 @@ def glb_to_rank(self, index):

Parameters
----------
index : int or list of ints
The index, or list of indices, for which the owning MPI rank(s) is
index : array of ints
The index, or array of indices, for which the owning MPI rank(s) is
retrieved.
"""
assert isinstance(index, (tuple, list))
if len(index) == 0:
assert isinstance(index, np.ndarray)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe this fixes the python 3.9 issue as well?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

possibly, where is the issue for it again?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think no issue was opened, need to check

if index.shape[0] == 0:
return None
elif is_integer(index[0]):
# `index` is a single point
indices = [index]
else:
indices = index
ret = []
for i in indices:
assert len(i) == self.ndim
found = False
for r, j in enumerate(self.all_ranges):
if all(v in j[d] for v, d in zip(i, self.dimensions)):
ret.append(r)
found = True
break
assert found
return as_tuple(ret)
elif sum(index.shape) == 1:
return index

assert index.shape[1] == self.ndim

# Add singleton dimension at the end if only single gridpoint is passed
# instead of support.
if len(index.shape) == 2:
index = np.expand_dims(index, axis=2)

ret = {}
for r, j in enumerate(self.all_ranges):
mins = np.array([b[0] for b in j]).reshape(1, -1, 1)
FabioLuporini marked this conversation as resolved.
Show resolved Hide resolved
maxs = np.array([b[-1] for b in j]).reshape(1, -1, 1)
inds = np.where(((index <= maxs) & (index >= mins)).all(axis=1))
if inds[0].size == 0:
continue
ret[r] = filter_ordered(inds[0])
FabioLuporini marked this conversation as resolved.
Show resolved Hide resolved
return ret

@property
def neighborhood(self):
Expand Down
2 changes: 1 addition & 1 deletion devito/passes/clusters/utils.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from collections import Iterable
from collections.abc import Iterable

from devito.symbolics import uxreplace
from devito.tools import flatten, timed_pass
Expand Down
103 changes: 49 additions & 54 deletions devito/types/sparse.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,92 +113,80 @@ def inject(self, *args, **kwargs):
"""
return self.interpolator.inject(*args, **kwargs)

@cached_property
def _point_support(self):
return np.array(tuple(product(range(-self._radius + 1, self._radius + 1),
repeat=self.grid.dim)))

@property
def _support(self):
"""
The grid points surrounding each sparse point within the radius of self's
injection/interpolation operators.
"""
ret = []
for i in self.gridpoints:
support = [range(max(0, j - self._radius + 1), min(M, j + self._radius + 1))
for j, M in zip(i, self.grid.shape)]
ret.append(tuple(product(*support)))
return tuple(ret)
max_shape = np.array(self.grid.shape).reshape(1, self.grid.dim)
minmax = lambda arr: np.minimum(max_shape, np.maximum(0, arr))
FabioLuporini marked this conversation as resolved.
Show resolved Hide resolved
return np.stack([minmax(self.gridpoints + s) for s in self._point_support],
axis=2)

@property
def _dist_datamap(self):
return self._build_dist_datamap(support=self._support)

@memoized_meth
def _build_dist_datamap(self, support=None):
"""
Mapper ``M : MPI rank -> required sparse data``.
"""
ret = {}
support = support or self._support
for i, s in enumerate(support):
# Sparse point `i` is "required" by the following ranks
for r in self.grid.distributor.glb_to_rank(s):
ret.setdefault(r, []).append(i)
return {k: filter_ordered(v) for k, v in ret.items()}
return self.grid.distributor.glb_to_rank(self._support) or {}

@property
def _dist_scatter_mask(self):
def _dist_scatter_mask(self, dmap=None):
"""
A mask to index into ``self.data``, which creates a new data array that
logically contains N consecutive groups of sparse data values, where N
is the number of MPI ranks. The i-th group contains the sparse data
values accessible by the i-th MPI rank. Thus, sparse data values along
the boundary of two or more MPI ranks are duplicated.
"""
dmap = self._dist_datamap
dmap = dmap or self._dist_datamap
mask = np.array(flatten(dmap[i] for i in sorted(dmap)), dtype=int)
ret = [slice(None) for i in range(self.ndim)]
ret = [slice(None) for _ in range(self.ndim)]
ret[self._sparse_position] = mask
return tuple(ret)

@property
def _dist_subfunc_scatter_mask(self):
def _dist_subfunc_scatter_mask(self, dmap=None):
"""
This method is analogous to :meth:`_dist_scatter_mask`, although
the mask is now suitable to index into self's SubFunctions, rather
than into ``self.data``.
"""
return self._dist_scatter_mask[self._sparse_position]
return self._dist_scatter_mask(dmap=dmap)[self._sparse_position]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is, essentially, to avoid recomputation of _dist_datamap when processing eg coordinates?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, it's called like 8 times so this gain quite a bit to do it only once.


@property
def _dist_gather_mask(self):
def _dist_gather_mask(self, dmap=None):
"""
A mask to index into the ``data`` received upon returning from
``self._dist_alltoall``. This mask creates a new data array in which
duplicate sparse data values have been discarded. The resulting data
array can thus be used to populate ``self.data``.
"""
ret = list(self._dist_scatter_mask)
ret = list(self._dist_scatter_mask(dmap=dmap))
mask = ret[self._sparse_position]
inds = np.unique(mask, return_index=True)[1]
inds.sort()
ret[self._sparse_position] = inds.tolist()

return tuple(ret)

@property
def _dist_subfunc_gather_mask(self):
def _dist_subfunc_gather_mask(self, dmap=None):
"""
This method is analogous to :meth:`_dist_subfunc_scatter_mask`, although
the mask is now suitable to index into self's SubFunctions, rather
than into ``self.data``.
"""
return self._dist_gather_mask[self._sparse_position]
return self._dist_gather_mask(dmap=dmap)[self._sparse_position]

@property
def _dist_count(self):
def _dist_count(self, dmap=None):
"""
A 2-tuple of comm-sized iterables, which tells how many sparse points
is this MPI rank expected to send/receive to/from each other MPI rank.
"""
dmap = self._dist_datamap
dmap = dmap or self._dist_datamap
comm = self.grid.distributor.comm

ssparse = np.array([len(dmap.get(i, [])) for i in range(comm.size)], dtype=int)
Expand All @@ -216,13 +204,12 @@ def _dist_reorder_mask(self):
ret += tuple(i for i, d in enumerate(self.indices) if d is not self._sparse_dim)
return ret

@property
def _dist_alltoall(self):
def _dist_alltoall(self, dmap=None):
"""
The metadata necessary to perform an ``MPI_Alltoallv`` distributing the
sparse data values across the MPI ranks needing them.
"""
ssparse, rsparse = self._dist_count
ssparse, rsparse = self._dist_count(dmap=dmap)

# Per-rank shape of send/recv data
sshape = []
Expand Down Expand Up @@ -258,8 +245,7 @@ def _dist_alltoall(self):

return sshape, scount, sdisp, rshape, rcount, rdisp

@property
def _dist_subfunc_alltoall(self):
def _dist_subfunc_alltoall(self, dmap=None):
"""
The metadata necessary to perform an ``MPI_Alltoallv`` distributing
self's SubFunction values across the MPI ranks needing them.
Expand Down Expand Up @@ -619,11 +605,9 @@ def _index_matrix(self, offset):
def gridpoints(self):
if self.coordinates._data is None:
raise ValueError("No coordinates attached to this SparseFunction")
ret = []
for coords in self.coordinates.data._local:
ret.append(tuple(int(np.floor(c - o)/s) for c, o, s in
zip(coords, self.grid.origin, self.grid.spacing)))
return tuple(ret)
return (
np.floor(self.coordinates.data._local - self.grid.origin) / self.grid.spacing
).astype(int)

def guard(self, expr=None, offset=0):
"""
Expand Down Expand Up @@ -675,9 +659,9 @@ def _decomposition(self):
mapper = {self._sparse_dim: self._distributor.decomposition[self._sparse_dim]}
return tuple(mapper.get(d) for d in self.dimensions)

@property
def _dist_subfunc_alltoall(self):
ssparse, rsparse = self._dist_count
def _dist_subfunc_alltoall(self, dmap=None):
dmap = dmap or self._dist_datamap
ssparse, rsparse = self._dist_count(dmap=dmap)

# Per-rank shape of send/recv `coordinates`
sshape = [(i, self.grid.dim) for i in ssparse]
Expand Down Expand Up @@ -711,22 +695,28 @@ def _dist_scatter(self, data=None):
comm = distributor.comm
mpitype = MPI._typedict[np.dtype(self.dtype).char]

# Compute dist map only once
dmap = self._dist_datamap

# Pack sparse data values so that they can be sent out via an Alltoallv
data = data[self._dist_scatter_mask]
data = data[self._dist_scatter_mask(dmap=dmap)]
FabioLuporini marked this conversation as resolved.
Show resolved Hide resolved
data = np.ascontiguousarray(np.transpose(data, self._dist_reorder_mask))

# Send out the sparse point values
_, scount, sdisp, rshape, rcount, rdisp = self._dist_alltoall
_, scount, sdisp, rshape, rcount, rdisp = self._dist_alltoall(dmap=dmap)
scattered = np.empty(shape=rshape, dtype=self.dtype)
comm.Alltoallv([data, scount, sdisp, mpitype],
[scattered, rcount, rdisp, mpitype])
data = scattered

# Unpack data values so that they follow the expected storage layout
data = np.ascontiguousarray(np.transpose(data, self._dist_reorder_mask))

# Pack (reordered) coordinates so that they can be sent out via an Alltoallv
coords = self.coordinates.data._local[self._dist_subfunc_scatter_mask]
coords = self.coordinates.data._local[self._dist_subfunc_scatter_mask(dmap=dmap)]

# Send out the sparse point coordinates
_, scount, sdisp, rshape, rcount, rdisp = self._dist_subfunc_alltoall
_, scount, sdisp, rshape, rcount, rdisp = self._dist_subfunc_alltoall(dmap=dmap)
scattered = np.empty(shape=rshape, dtype=self.coordinates.dtype)
comm.Alltoallv([coords, scount, sdisp, mpitype],
[scattered, rcount, rdisp, mpitype])
Expand All @@ -746,28 +736,33 @@ def _dist_gather(self, data, coords):

comm = distributor.comm

# Compute dist map only once
dmap = self._dist_datamap

# Pack sparse data values so that they can be sent out via an Alltoallv
data = np.ascontiguousarray(np.transpose(data, self._dist_reorder_mask))
# Send back the sparse point values
sshape, scount, sdisp, _, rcount, rdisp = self._dist_alltoall
sshape, scount, sdisp, _, rcount, rdisp = self._dist_alltoall(dmap=dmap)
gathered = np.empty(shape=sshape, dtype=self.dtype)
mpitype = MPI._typedict[np.dtype(self.dtype).char]
comm.Alltoallv([data, rcount, rdisp, mpitype],
[gathered, scount, sdisp, mpitype])
# Unpack data values so that they follow the expected storage layout
gathered = np.ascontiguousarray(np.transpose(gathered, self._dist_reorder_mask))
self._data[:] = gathered[self._dist_gather_mask]
self._data[:] = gathered[self._dist_gather_mask(dmap=dmap)]

if coords is not None:
# Pack (reordered) coordinates so that they can be sent out via an Alltoallv
coords = coords + np.array(self.grid.origin_offset, dtype=self.dtype)
# Send out the sparse point coordinates
sshape, scount, sdisp, _, rcount, rdisp = self._dist_subfunc_alltoall
sshape, scount, sdisp, _, rcount, rdisp = \
self._dist_subfunc_alltoall(dmap=dmap)
gathered = np.empty(shape=sshape, dtype=self.coordinates.dtype)
mpitype = MPI._typedict[np.dtype(self.coordinates.dtype).char]
comm.Alltoallv([coords, rcount, rdisp, mpitype],
[gathered, scount, sdisp, mpitype])
self._coordinates.data._local[:] = gathered[self._dist_subfunc_gather_mask]
self._coordinates.data._local[:] = \
gathered[self._dist_subfunc_gather_mask(dmap=dmap)]

# Note: this method "mirrors" `_dist_scatter`: a sparse point that is sent
# in `_dist_scatter` is here received; a sparse point that is received in
Expand Down
4 changes: 2 additions & 2 deletions tests/test_mpi.py
Original file line number Diff line number Diff line change
Expand Up @@ -384,8 +384,8 @@ def test_ownership(self, shape, coords, points):
# The domain decomposition is so that the i-th MPI rank gets exactly one
# sparse point `p` and, incidentally, `p` is logically owned by `i`
assert len(sf.gridpoints) == points
assert all(grid.distributor.glb_to_rank(i)[0] == grid.distributor.myrank
for i in sf.gridpoints)
ownership = grid.distributor.glb_to_rank(sf.gridpoints)
assert list(ownership.keys()) == [grid.distributor.myrank]

@pytest.mark.parallel(mode=4)
@pytest.mark.parametrize('coords,expected', [
Expand Down