Skip to content

Commit

Permalink
mpi: move distribution from property to method taking the map as input
Browse files Browse the repository at this point in the history
  • Loading branch information
mloubout committed Jul 20, 2021
1 parent 5b69ef4 commit 5b3d994
Show file tree
Hide file tree
Showing 6 changed files with 52 additions and 54 deletions.
10 changes: 8 additions & 2 deletions devito/mpi/distributed.py
Original file line number Diff line number Diff line change
Expand Up @@ -312,12 +312,19 @@ def glb_to_rank(self, index):
The index, or array of indices, for which the owning MPI rank(s) is
retrieved.
"""
# assert isinstance(index, np.array)
assert isinstance(index, np.ndarray)
if index.shape[0] == 0:
return None
elif sum(index.shape) == 1:
return index
print(index.shape, self.ndim)
assert index.shape[1] == self.ndim

# Add singleton dimension at the end if only single gridpoint is passed
# instead of support.
if len(index.shape) == 2:
index = np.expand_dims(index, axis=2)

ret = {}

for r, j in enumerate(self.all_ranges):
Expand All @@ -329,7 +336,6 @@ def glb_to_rank(self, index):
ret[r] = filter_ordered(inds[0])
return ret


@property
def neighborhood(self):
"""
Expand Down
2 changes: 1 addition & 1 deletion devito/passes/clusters/utils.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from collections import Iterable
from collections.abc import Iterable

from devito.symbolics import uxreplace
from devito.tools import flatten, timed_pass
Expand Down
3 changes: 0 additions & 3 deletions devito/types/dense.py
Original file line number Diff line number Diff line change
Expand Up @@ -1452,9 +1452,6 @@ def _arg_values(self, **kwargs):
def parent(self):
return self._parent

def __hash__(self):
return hash(self.data._local.tobytes())

_pickle_kwargs = Function._pickle_kwargs + ['parent']


Expand Down
83 changes: 39 additions & 44 deletions devito/types/sparse.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@
from devito.types.utils import IgnoreDimSort


import time

__all__ = ['SparseFunction', 'SparseTimeFunction', 'PrecomputedSparseFunction',
'PrecomputedSparseTimeFunction', 'MatrixSparseTimeFunction']

Expand Down Expand Up @@ -127,7 +125,7 @@ def _support(self):
injection/interpolation operators.
"""
max_shape = np.array(self.grid.shape).reshape(1, self.grid.dim)
minmax = lambda arr : np.minimum(max_shape, np.maximum(0, arr))
minmax = lambda arr: np.minimum(max_shape, np.maximum(0, arr))
return np.stack([minmax(self.gridpoints + s) for s in self._point_support],
axis=2)

Expand All @@ -139,62 +137,57 @@ def _dist_datamap(self):
dmap = self.grid.distributor.glb_to_rank(self._support)
return dmap or {}

@property
def _dist_scatter_mask(self):
def _dist_scatter_mask(self, dmap=None):
"""
A mask to index into ``self.data``, which creates a new data array that
logically contains N consecutive groups of sparse data values, where N
is the number of MPI ranks. The i-th group contains the sparse data
values accessible by the i-th MPI rank. Thus, sparse data values along
the boundary of two or more MPI ranks are duplicated.
"""
dmap = self._dist_datamap
dmap = dmap or self._dist_datamap
mask = np.array(flatten(dmap[i] for i in sorted(dmap)), dtype=int)
ret = [slice(None) for i in range(self.ndim)]
ret = [slice(None) for _ in range(self.ndim)]
ret[self._sparse_position] = mask
return tuple(ret)

@property
def _dist_subfunc_scatter_mask(self):
def _dist_subfunc_scatter_mask(self, dmap=None):
"""
This method is analogous to :meth:`_dist_scatter_mask`, although
the mask is now suitable to index into self's SubFunctions, rather
than into ``self.data``.
"""
return self._dist_scatter_mask[self._sparse_position]
return self._dist_scatter_mask(dmap=dmap)[self._sparse_position]

@property
def _dist_gather_mask(self):
def _dist_gather_mask(self, dmap=None):
"""
A mask to index into the ``data`` received upon returning from
``self._dist_alltoall``. This mask creates a new data array in which
duplicate sparse data values have been discarded. The resulting data
array can thus be used to populate ``self.data``.
"""
ret = list(self._dist_scatter_mask)
ret = list(self._dist_scatter_mask(dmap=dmap))
mask = ret[self._sparse_position]
inds = np.unique(mask, return_index=True)[1]
inds.sort()
ret[self._sparse_position] = inds.tolist()

return tuple(ret)

@property
def _dist_subfunc_gather_mask(self):
def _dist_subfunc_gather_mask(self, dmap=None):
"""
This method is analogous to :meth:`_dist_subfunc_scatter_mask`, although
the mask is now suitable to index into self's SubFunctions, rather
than into ``self.data``.
"""
return self._dist_gather_mask[self._sparse_position]
return self._dist_gather_mask(dmap=dmap)[self._sparse_position]

@property
def _dist_count(self):
def _dist_count(self, dmap=None):
"""
A 2-tuple of comm-sized iterables, which tells how many sparse points
is this MPI rank expected to send/receive to/from each other MPI rank.
"""
dmap = self._dist_datamap
dmap = dmap or self._dist_datamap
comm = self.grid.distributor.comm

ssparse = np.array([len(dmap.get(i, [])) for i in range(comm.size)], dtype=int)
Expand All @@ -212,13 +205,12 @@ def _dist_reorder_mask(self):
ret += tuple(i for i, d in enumerate(self.indices) if d is not self._sparse_dim)
return ret

@property
def _dist_alltoall(self):
def _dist_alltoall(self, dmap=None):
"""
The metadata necessary to perform an ``MPI_Alltoallv`` distributing the
sparse data values across the MPI ranks needing them.
"""
ssparse, rsparse = self._dist_count
ssparse, rsparse = self._dist_count(dmap=dmap)

# Per-rank shape of send/recv data
sshape = []
Expand Down Expand Up @@ -254,8 +246,7 @@ def _dist_alltoall(self):

return sshape, scount, sdisp, rshape, rcount, rdisp

@property
def _dist_subfunc_alltoall(self):
def _dist_subfunc_alltoall(self, dmap=None):
"""
The metadata necessary to perform an ``MPI_Alltoallv`` distributing
self's SubFunction values across the MPI ranks needing them.
Expand Down Expand Up @@ -285,13 +276,11 @@ def _arg_defaults(self, alias=None):

# Add in the sparse data (as well as any SubFunction data) belonging to
# self's local domain only
print(self)
t0 = time.process_time()
for k, v in self._dist_scatter().items():
args[mapper[k].name] = v
for i, s in zip(mapper[k].indices, v.shape):
args.update(i._arg_defaults(_min=0, size=s))
print(time.process_time() - t0)

return args

def _eval_at(self, func):
Expand Down Expand Up @@ -617,13 +606,8 @@ def _index_matrix(self, offset):
def gridpoints(self):
if self.coordinates._data is None:
raise ValueError("No coordinates attached to this SparseFunction")
return self._gridpoints(coords=self.coordinates)

@memoized_meth
def _gridpoints(self, coords=None):
coords = coords or self.coordinates
return (
np.floor(coords.data._local - self.grid.origin) / self.grid.spacing
np.floor(self.coordinates.data._local - self.grid.origin) / self.grid.spacing
).astype(int)

def guard(self, expr=None, offset=0):
Expand Down Expand Up @@ -676,9 +660,9 @@ def _decomposition(self):
mapper = {self._sparse_dim: self._distributor.decomposition[self._sparse_dim]}
return tuple(mapper.get(d) for d in self.dimensions)

@property
def _dist_subfunc_alltoall(self):
ssparse, rsparse = self._dist_count
def _dist_subfunc_alltoall(self, dmap=None):
dmap = dmap or self._dist_datamap
ssparse, rsparse = self._dist_count(dmap=dmap)

# Per-rank shape of send/recv `coordinates`
sshape = [(i, self.grid.dim) for i in ssparse]
Expand Down Expand Up @@ -712,22 +696,28 @@ def _dist_scatter(self, data=None):
comm = distributor.comm
mpitype = MPI._typedict[np.dtype(self.dtype).char]

# Compute dist map only once
dmap = self._dist_datamap

# Pack sparse data values so that they can be sent out via an Alltoallv
data = data[self._dist_scatter_mask]
data = data[self._dist_scatter_mask(dmap=dmap)]
data = np.ascontiguousarray(np.transpose(data, self._dist_reorder_mask))

# Send out the sparse point values
_, scount, sdisp, rshape, rcount, rdisp = self._dist_alltoall
_, scount, sdisp, rshape, rcount, rdisp = self._dist_alltoall(dmap=dmap)
scattered = np.empty(shape=rshape, dtype=self.dtype)
comm.Alltoallv([data, scount, sdisp, mpitype],
[scattered, rcount, rdisp, mpitype])
data = scattered

# Unpack data values so that they follow the expected storage layout
data = np.ascontiguousarray(np.transpose(data, self._dist_reorder_mask))

# Pack (reordered) coordinates so that they can be sent out via an Alltoallv
coords = self.coordinates.data._local[self._dist_subfunc_scatter_mask]
coords = self.coordinates.data._local[self._dist_subfunc_scatter_mask(dmap=dmap)]

# Send out the sparse point coordinates
_, scount, sdisp, rshape, rcount, rdisp = self._dist_subfunc_alltoall
_, scount, sdisp, rshape, rcount, rdisp = self._dist_subfunc_alltoall(dmap=dmap)
scattered = np.empty(shape=rshape, dtype=self.coordinates.dtype)
comm.Alltoallv([coords, scount, sdisp, mpitype],
[scattered, rcount, rdisp, mpitype])
Expand All @@ -747,28 +737,33 @@ def _dist_gather(self, data, coords):

comm = distributor.comm

# Compute dist map only once
dmap = self._dist_datamap

# Pack sparse data values so that they can be sent out via an Alltoallv
data = np.ascontiguousarray(np.transpose(data, self._dist_reorder_mask))
# Send back the sparse point values
sshape, scount, sdisp, _, rcount, rdisp = self._dist_alltoall
sshape, scount, sdisp, _, rcount, rdisp = self._dist_alltoall(dmap=dmap)
gathered = np.empty(shape=sshape, dtype=self.dtype)
mpitype = MPI._typedict[np.dtype(self.dtype).char]
comm.Alltoallv([data, rcount, rdisp, mpitype],
[gathered, scount, sdisp, mpitype])
# Unpack data values so that they follow the expected storage layout
gathered = np.ascontiguousarray(np.transpose(gathered, self._dist_reorder_mask))
self._data[:] = gathered[self._dist_gather_mask]
self._data[:] = gathered[self._dist_gather_mask(dmap=dmap)]

if coords is not None:
# Pack (reordered) coordinates so that they can be sent out via an Alltoallv
coords = coords + np.array(self.grid.origin_offset, dtype=self.dtype)
# Send out the sparse point coordinates
sshape, scount, sdisp, _, rcount, rdisp = self._dist_subfunc_alltoall
sshape, scount, sdisp, _, rcount, rdisp = \
self._dist_subfunc_alltoall(dmap=dmap)
gathered = np.empty(shape=sshape, dtype=self.coordinates.dtype)
mpitype = MPI._typedict[np.dtype(self.coordinates.dtype).char]
comm.Alltoallv([coords, rcount, rdisp, mpitype],
[gathered, scount, sdisp, mpitype])
self._coordinates.data._local[:] = gathered[self._dist_subfunc_gather_mask]
self._coordinates.data._local[:] = \
gathered[self._dist_subfunc_gather_mask(dmap=dmap)]

# Note: this method "mirrors" `_dist_scatter`: a sparse point that is sent
# in `_dist_scatter` is here received; a sparse point that is received in
Expand Down
4 changes: 2 additions & 2 deletions examples/seismic/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
def setup_geometry(model, tn, f0=0.010):
# Source and receiver geometries
src_coordinates = np.empty((1, model.dim))
src_coordinates[0, :] = np.array(model.domain_size) * .45
src_coordinates[0, :] = np.array(model.domain_size) * .5
if model.dim > 1:
src_coordinates[0, -1] = model.origin[-1] + model.spacing[-1]

Expand All @@ -25,7 +25,7 @@ def setup_geometry(model, tn, f0=0.010):


def setup_rec_coords(model):
nrecx = model.shape[0] - 1
nrecx = model.shape[0]
recx = np.linspace(model.origin[0], model.domain_size[0], nrecx)

if model.dim == 1:
Expand Down
4 changes: 2 additions & 2 deletions tests/test_mpi.py
Original file line number Diff line number Diff line change
Expand Up @@ -384,8 +384,8 @@ def test_ownership(self, shape, coords, points):
# The domain decomposition is so that the i-th MPI rank gets exactly one
# sparse point `p` and, incidentally, `p` is logically owned by `i`
assert len(sf.gridpoints) == points
assert all(grid.distributor.glb_to_rank(i)[0] == grid.distributor.myrank
for i in sf.gridpoints)
ownership = grid.distributor.glb_to_rank(sf.gridpoints)
assert list(ownership.keys()) == [grid.distributor.myrank]

@pytest.mark.parallel(mode=4)
@pytest.mark.parametrize('coords,expected', [
Expand Down

0 comments on commit 5b3d994

Please sign in to comment.