Skip to content

Commit

Permalink
mpi: move distribution from property to method taking the map as input
Browse files Browse the repository at this point in the history
  • Loading branch information
mloubout committed Jul 2, 2021
1 parent 3d425fa commit bf680e0
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 51 deletions.
3 changes: 1 addition & 2 deletions devito/mpi/distributed.py
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ def glb_to_rank(self, index):
The index, or array of indices, for which the owning MPI rank(s) is
retrieved.
"""
# assert isinstance(index, np.array)
assert isinstance(index, np.ndarray)
if index.shape[0] == 0:
return None
elif sum(index.shape) == 1:
Expand All @@ -329,7 +329,6 @@ def glb_to_rank(self, index):
ret[r] = filter_ordered(inds[0])
return ret


@property
def neighborhood(self):
"""
Expand Down
3 changes: 0 additions & 3 deletions devito/types/dense.py
Original file line number Diff line number Diff line change
Expand Up @@ -1452,9 +1452,6 @@ def _arg_values(self, **kwargs):
def parent(self):
return self._parent

def __hash__(self):
return hash(self.data._local.tobytes())

_pickle_kwargs = Function._pickle_kwargs + ['parent']


Expand Down
83 changes: 39 additions & 44 deletions devito/types/sparse.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@
from devito.types.basic import Symbol
from devito.types.equation import Eq, Inc

import time

__all__ = ['SparseFunction', 'SparseTimeFunction', 'PrecomputedSparseFunction',
'PrecomputedSparseTimeFunction', 'MatrixSparseTimeFunction']

Expand Down Expand Up @@ -125,7 +123,7 @@ def _support(self):
injection/interpolation operators.
"""
max_shape = np.array(self.grid.shape).reshape(1, self.grid.dim)
minmax = lambda arr : np.minimum(max_shape, np.maximum(0, arr))
minmax = lambda arr: np.minimum(max_shape, np.maximum(0, arr))
return np.stack([minmax(self.gridpoints + s) for s in self._point_support],
axis=2)

Expand All @@ -137,62 +135,57 @@ def _dist_datamap(self):
dmap = self.grid.distributor.glb_to_rank(self._support)
return dmap or {}

@property
def _dist_scatter_mask(self):
def _dist_scatter_mask(self, dmap=None):
"""
A mask to index into ``self.data``, which creates a new data array that
logically contains N consecutive groups of sparse data values, where N
is the number of MPI ranks. The i-th group contains the sparse data
values accessible by the i-th MPI rank. Thus, sparse data values along
the boundary of two or more MPI ranks are duplicated.
"""
dmap = self._dist_datamap
dmap = dmap or self._dist_datamap
mask = np.array(flatten(dmap[i] for i in sorted(dmap)), dtype=int)
ret = [slice(None) for i in range(self.ndim)]
ret = [slice(None) for _ in range(self.ndim)]
ret[self._sparse_position] = mask
return tuple(ret)

@property
def _dist_subfunc_scatter_mask(self):
def _dist_subfunc_scatter_mask(self, dmap=None):
"""
This method is analogous to :meth:`_dist_scatter_mask`, although
the mask is now suitable to index into self's SubFunctions, rather
than into ``self.data``.
"""
return self._dist_scatter_mask[self._sparse_position]
return self._dist_scatter_mask(dmap=dmap)[self._sparse_position]

@property
def _dist_gather_mask(self):
def _dist_gather_mask(self, dmap=None):
"""
A mask to index into the ``data`` received upon returning from
``self._dist_alltoall``. This mask creates a new data array in which
duplicate sparse data values have been discarded. The resulting data
array can thus be used to populate ``self.data``.
"""
ret = list(self._dist_scatter_mask)
ret = list(self._dist_scatter_mask(dmap=dmap))
mask = ret[self._sparse_position]
inds = np.unique(mask, return_index=True)[1]
inds.sort()
ret[self._sparse_position] = inds.tolist()

return tuple(ret)

@property
def _dist_subfunc_gather_mask(self):
def _dist_subfunc_gather_mask(self, dmap=None):
"""
This method is analogous to :meth:`_dist_subfunc_scatter_mask`, although
the mask is now suitable to index into self's SubFunctions, rather
than into ``self.data``.
"""
return self._dist_gather_mask[self._sparse_position]
return self._dist_gather_mask(dmap=dmap)[self._sparse_position]

@property
def _dist_count(self):
def _dist_count(self, dmap=None):
"""
A 2-tuple of comm-sized iterables, which tells how many sparse points
is this MPI rank expected to send/receive to/from each other MPI rank.
"""
dmap = self._dist_datamap
dmap = dmap or self._dist_datamap
comm = self.grid.distributor.comm

ssparse = np.array([len(dmap.get(i, [])) for i in range(comm.size)], dtype=int)
Expand All @@ -210,13 +203,12 @@ def _dist_reorder_mask(self):
ret += tuple(i for i, d in enumerate(self.indices) if d is not self._sparse_dim)
return ret

@property
def _dist_alltoall(self):
def _dist_alltoall(self, dmap=None):
"""
The metadata necessary to perform an ``MPI_Alltoallv`` distributing the
sparse data values across the MPI ranks needing them.
"""
ssparse, rsparse = self._dist_count
ssparse, rsparse = self._dist_count(dmap=dmap)

# Per-rank shape of send/recv data
sshape = []
Expand Down Expand Up @@ -252,8 +244,7 @@ def _dist_alltoall(self):

return sshape, scount, sdisp, rshape, rcount, rdisp

@property
def _dist_subfunc_alltoall(self):
def _dist_subfunc_alltoall(self, dmap=None):
"""
The metadata necessary to perform an ``MPI_Alltoallv`` distributing
self's SubFunction values across the MPI ranks needing them.
Expand Down Expand Up @@ -283,13 +274,11 @@ def _arg_defaults(self, alias=None):

# Add in the sparse data (as well as any SubFunction data) belonging to
# self's local domain only
print(self)
t0 = time.process_time()
for k, v in self._dist_scatter().items():
args[mapper[k].name] = v
for i, s in zip(mapper[k].indices, v.shape):
args.update(i._arg_defaults(_min=0, size=s))
print(time.process_time() - t0)

return args

def _eval_at(self, func):
Expand Down Expand Up @@ -615,13 +604,8 @@ def _index_matrix(self, offset):
def gridpoints(self):
if self.coordinates._data is None:
raise ValueError("No coordinates attached to this SparseFunction")
return self._gridpoints(coords=self.coordinates)

@memoized_meth
def _gridpoints(self, coords=None):
coords = coords or self.coordinates
return (
np.floor(coords.data._local - self.grid.origin) / self.grid.spacing
np.floor(self.coordinates.data._local - self.grid.origin) / self.grid.spacing
).astype(int)

def guard(self, expr=None, offset=0):
Expand Down Expand Up @@ -674,9 +658,9 @@ def _decomposition(self):
mapper = {self._sparse_dim: self._distributor.decomposition[self._sparse_dim]}
return tuple(mapper.get(d) for d in self.dimensions)

@property
def _dist_subfunc_alltoall(self):
ssparse, rsparse = self._dist_count
def _dist_subfunc_alltoall(self, dmap=None):
dmap = dmap or self._dist_datamap
ssparse, rsparse = self._dist_count(dmap=dmap)

# Per-rank shape of send/recv `coordinates`
sshape = [(i, self.grid.dim) for i in ssparse]
Expand Down Expand Up @@ -710,22 +694,28 @@ def _dist_scatter(self, data=None):
comm = distributor.comm
mpitype = MPI._typedict[np.dtype(self.dtype).char]

# Compute dist map only once
dmap = self._dist_datamap

# Pack sparse data values so that they can be sent out via an Alltoallv
data = data[self._dist_scatter_mask]
data = data[self._dist_scatter_mask(dmap=dmap)]
data = np.ascontiguousarray(np.transpose(data, self._dist_reorder_mask))

# Send out the sparse point values
_, scount, sdisp, rshape, rcount, rdisp = self._dist_alltoall
_, scount, sdisp, rshape, rcount, rdisp = self._dist_alltoall(dmap=dmap)
scattered = np.empty(shape=rshape, dtype=self.dtype)
comm.Alltoallv([data, scount, sdisp, mpitype],
[scattered, rcount, rdisp, mpitype])
data = scattered

# Unpack data values so that they follow the expected storage layout
data = np.ascontiguousarray(np.transpose(data, self._dist_reorder_mask))

# Pack (reordered) coordinates so that they can be sent out via an Alltoallv
coords = self.coordinates.data._local[self._dist_subfunc_scatter_mask]
coords = self.coordinates.data._local[self._dist_subfunc_scatter_mask(dmap=dmap)]

# Send out the sparse point coordinates
_, scount, sdisp, rshape, rcount, rdisp = self._dist_subfunc_alltoall
_, scount, sdisp, rshape, rcount, rdisp = self._dist_subfunc_alltoall(dmap=dmap)
scattered = np.empty(shape=rshape, dtype=self.coordinates.dtype)
comm.Alltoallv([coords, scount, sdisp, mpitype],
[scattered, rcount, rdisp, mpitype])
Expand All @@ -745,28 +735,33 @@ def _dist_gather(self, data, coords):

comm = distributor.comm

# Compute dist map only once
dmap = self._dist_datamap

# Pack sparse data values so that they can be sent out via an Alltoallv
data = np.ascontiguousarray(np.transpose(data, self._dist_reorder_mask))
# Send back the sparse point values
sshape, scount, sdisp, _, rcount, rdisp = self._dist_alltoall
sshape, scount, sdisp, _, rcount, rdisp = self._dist_alltoall(dmap=dmap)
gathered = np.empty(shape=sshape, dtype=self.dtype)
mpitype = MPI._typedict[np.dtype(self.dtype).char]
comm.Alltoallv([data, rcount, rdisp, mpitype],
[gathered, scount, sdisp, mpitype])
# Unpack data values so that they follow the expected storage layout
gathered = np.ascontiguousarray(np.transpose(gathered, self._dist_reorder_mask))
self._data[:] = gathered[self._dist_gather_mask]
self._data[:] = gathered[self._dist_gather_mask(dmap=dmap)]

if coords is not None:
# Pack (reordered) coordinates so that they can be sent out via an Alltoallv
coords = coords + np.array(self.grid.origin_offset, dtype=self.dtype)
# Send out the sparse point coordinates
sshape, scount, sdisp, _, rcount, rdisp = self._dist_subfunc_alltoall
sshape, scount, sdisp, _, rcount, rdisp = \
self._dist_subfunc_alltoall(dmap=dmap)
gathered = np.empty(shape=sshape, dtype=self.coordinates.dtype)
mpitype = MPI._typedict[np.dtype(self.coordinates.dtype).char]
comm.Alltoallv([coords, rcount, rdisp, mpitype],
[gathered, scount, sdisp, mpitype])
self._coordinates.data._local[:] = gathered[self._dist_subfunc_gather_mask]
self._coordinates.data._local[:] = \
gathered[self._dist_subfunc_gather_mask(dmap=dmap)]

# Note: this method "mirrors" `_dist_scatter`: a sparse point that is sent
# in `_dist_scatter` is here received; a sparse point that is received in
Expand Down
4 changes: 2 additions & 2 deletions examples/seismic/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
def setup_geometry(model, tn, f0=0.010):
# Source and receiver geometries
src_coordinates = np.empty((1, model.dim))
src_coordinates[0, :] = np.array(model.domain_size) * .45
src_coordinates[0, :] = np.array(model.domain_size) * .5
if model.dim > 1:
src_coordinates[0, -1] = model.origin[-1] + model.spacing[-1]

Expand All @@ -25,7 +25,7 @@ def setup_geometry(model, tn, f0=0.010):


def setup_rec_coords(model):
nrecx = model.shape[0] - 1
nrecx = model.shape[0]
recx = np.linspace(model.origin[0], model.domain_size[0], nrecx)

if model.dim == 1:
Expand Down

0 comments on commit bf680e0

Please sign in to comment.