Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

mpi: Fix data distribution bugs [part 2] #1949

Merged
merged 10 commits into from Sep 14, 2022
Merged

mpi: Fix data distribution bugs [part 2] #1949

merged 10 commits into from Sep 14, 2022

Conversation

rhodrin
Copy link
Contributor

@rhodrin rhodrin commented Jun 22, 2022

Fix for issues #1862 and #1892 + associated tests.

@rhodrin rhodrin added bug-py WIP Still work in progress MPI mpi-related labels Jun 22, 2022
@codecov
Copy link

codecov bot commented Jun 22, 2022

Codecov Report

Merging #1949 (29112c5) into master (12995d3) will increase coverage by 0.00%.
The diff coverage is 92.68%.

@@           Coverage Diff            @@
##           master    #1949    +/-   ##
========================================
  Coverage   87.90%   87.91%            
========================================
  Files         214      214            
  Lines       36504    36613   +109     
  Branches     5513     5538    +25     
========================================
+ Hits        32090    32189    +99     
- Misses       3897     3907    +10     
  Partials      517      517            
Impacted Files Coverage Δ
devito/data/utils.py 89.12% <18.18%> (-3.46%) ⬇️
devito/data/data.py 95.79% <100.00%> (+0.62%) ⬆️
tests/conftest.py 84.41% <100.00%> (-0.11%) ⬇️
tests/test_data.py 97.87% <100.00%> (+0.11%) ⬆️
devito/ir/support/basic.py 91.98% <0.00%> (-0.22%) ⬇️

Help us with your feedback. Take ten seconds to tell us how you rate us. Have a feature suggestion? Share it here.

devito/data/data.py Show resolved Hide resolved
@@ -366,6 +420,11 @@ def _normalize_index(self, idx):

def _process_args(self, idx, val):
"""If comm_type is parallel we need to first retrieve local unflipped data."""
if (len(as_tuple(idx)) < len(val.shape)) and (len(val.shape) <= len(self.shape)):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This "looks" expensive or am I wrong?
Is process_args called for all points?

Copy link
Contributor

@FabioLuporini FabioLuporini left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great! Only minor comments left.

def _prune_shape(self, shape):
# Reduce distributed MPI `Data`'s shape to that of an equivalently
# sliced numpy array.
decomposition = tuple([d for d in self._decomposition if d.size > 1])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nitpicking

tuple(d for d in ...)

is fine -- you don't need the inner list comprehension

tuple([...])

def _check_idx(func):
"""Check if __getitem__/__setitem__ may require communication across MPI ranks."""
@wraps(func)
def wrapper(data, *args, **kwargs):
glb_idx = args[0]
if len(args) > 1 and isinstance(args[1], Data) \
is_gather = True if (kwargs and isinstance(kwargs['gather_rank'], int)) \
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is_gather = kwargs and isinstance(...)

@@ -190,7 +202,45 @@ def __str__(self):
def __getitem__(self, glb_idx, comm_type, gather_rank=None):
loc_idx = self._index_glb_to_loc(glb_idx)
is_gather = True if isinstance(gather_rank, int) else False
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is_gather = isinstance(...)

devito/data/utils.py Show resolved Hide resolved
if reshape and (0 not in reshape) and (reshape != retval.shape):
return retval.reshape(reshape)
if not is_gather:
newshape = tuple([s for s, i in zip(retval.shape, loc_idx)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as before, you may drop the [ ]

@@ -190,7 +202,45 @@ def __str__(self):
def __getitem__(self, glb_idx, comm_type, gather_rank=None):
loc_idx = self._index_glb_to_loc(glb_idx)
is_gather = True if isinstance(gather_rank, int) else False
if comm_type is index_by_index or is_gather:
if is_gather and comm_type == gather:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

comm_type is gather

# sliced numpy array.
decomposition = tuple([d for d in self._decomposition if d.size > 1])
retval = self.reshape(shape)
retval._decomposition = decomposition
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reshape creates a new Data I presume, so why do we have to reset the _decomposition here? I would expect this to happen in the constructor or at least __array_finalize__, so why it's not the case?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After a _reshape the _decomposition is set to None.

As I mentioned previously, I was thinking of overriding reshape for our Data type but this turned out to be a bit of a mess. These 'reductions' are very specific, which is why we can get away with the above. Can maybe talk about this a bit more though.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't numpy has a no copy reshape with view?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If able, the method returns a view:

This will be a new view object if possible; otherwise, it will be a copy. Note there is no guarantee of the memory layout (C- or Fortran- contiguous) of the returned array.

@@ -190,7 +202,45 @@ def __str__(self):
def __getitem__(self, glb_idx, comm_type, gather_rank=None):
loc_idx = self._index_glb_to_loc(glb_idx)
is_gather = True if isinstance(gather_rank, int) else False
if comm_type is index_by_index or is_gather:
if is_gather and comm_type == gather:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Refresh my mind please -- what does the user need to write so that we end up here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For MPI gathers, the decorator will infer how to gather based on the slice (or lack of) supplied by the user. (That is, gather vs index_by_index is set by the decorator, not by the user).

return retval
else:
return None
elif comm_type is index_by_index or is_gather:
# Retrieve the pertinent local data prior to mpi send/receive operations
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mpi -> MPI ?

g.data[0, :, :] = dat1
f1.data[:] = g.data[0, ::-1, ::-1]
result = np.array(f1.data[:])
if LEFT in glb_pos_map[x] and LEFT in glb_pos_map[y]:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These lines can be done parametrically as well, but its pretty clear this way, not sure if it is gonna be better parametrically

def _check_idx(func):
"""Check if __getitem__/__setitem__ may require communication across MPI ranks."""
@wraps(func)
def wrapper(data, *args, **kwargs):
glb_idx = args[0]
if len(args) > 1 and isinstance(args[1], Data) \
is_gather = kwargs and isinstance(kwargs['gather_rank'], int)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

guarantied to have gather_rank? maybe is_gather = isinstance(kwargs.get('gather_rank', None) int)

is_gather = True if isinstance(gather_rank, int) else False
if comm_type is index_by_index or is_gather:
is_gather = isinstance(gather_rank, int)
if is_gather and comm_type is gather:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not just if gather_rank and ....

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

0 is a valid tank so I think we need this?

comm = self._distributor.comm
rank = comm.Get_rank()

sendbuf = np.array(self[:].flatten())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

flattent makes a copy no? wouldn something like self.flat work here (1D view of ndarray without copy)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, indeed. Changed this to self.flat[:].


# If necessary, add the time index to the `topology` as this will
# be required to correctly construct various maps.
if len(np.amax(dat_len)) > len(topology):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is amax really needed instead of max?

@mloubout mloubout merged commit a647eca into master Sep 14, 2022
@mloubout mloubout deleted the fix_mpi_slicing_p2 branch September 14, 2022 17:46
This was linked to issues Sep 19, 2022
@georgebisbas georgebisbas mentioned this pull request Sep 19, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug-py MPI mpi-related
Projects
None yet
Development

Successfully merging this pull request may close these issues.

MPI gather for TimeFunction MPI slicing broken
4 participants