Skip to content

Commit

Permalink
Merge pull request #2617 from CSSFrancis/improve_lazy_operations
Browse files Browse the repository at this point in the history
Improve lazy operations and Chunking
  • Loading branch information
ericpre committed Apr 7, 2021
2 parents 97a2075 + f8ab0ce commit 499824b
Show file tree
Hide file tree
Showing 15 changed files with 442 additions and 175 deletions.
8 changes: 4 additions & 4 deletions .github/workflows/tests.yml
Expand Up @@ -11,7 +11,7 @@ jobs:
MPLBACKEND: agg
PIP_ARGS: --upgrade -e
PYTEST_ARGS: --pyargs hyperspy --reruns 3 -n 2 --instafail
PYTEST_ARGS_COVERAGE:
PYTEST_ARGS_COVERAGE:
strategy:
fail-fast: false
matrix:
Expand All @@ -20,7 +20,7 @@ jobs:
PIP_SELECTOR: ['[all, tests]']
DEPENDENCIES_DEV: [false]
include:
# test oldest supported version of main dependencies on python 3.6
# test oldest supported version of main dependencies on python 3.6
- os: ubuntu
PYTHON_VERSION: 3.6
OLDEST_SUPPORTED_VERSION: true
Expand Down Expand Up @@ -49,7 +49,7 @@ jobs:
PYTHON_VERSION: 3.9
PIP_SELECTOR: '[all, tests]'
exclude:
# redundant build (same as coverage)
# redundant build (same as coverage)
- os: ubuntu
PYTHON_VERSION: 3.7

Expand Down Expand Up @@ -88,7 +88,7 @@ jobs:
pytest ${{ env.PYTEST_ARGS }} ${{ matrix.PYTEST_ARGS_COVERAGE }}
- name: Upload coverage to Codecov
if: ${{ always() }} && ${{ matrix.PYTEST_ARGS_COVERAGE }}
if: ${{ always() }} && ${{ matrix.PYTEST_ARGS_COVERAGE }}
uses: codecov/codecov-action@v1

build_doc:
Expand Down
2 changes: 2 additions & 0 deletions CHANGES.rst
Expand Up @@ -49,6 +49,8 @@ RELEASE_next_patch (Unreleased)
* Fix setting offset in rebin: the offset was changed in the wrong axis (`#2690 <https://github.com/hyperspy/hyperspy/pull/2690>`_)
* Fix reading XRF bruker file, close `#2689 <https://github.com/hyperspy/hyperspy/issues/2689>`_ (`#2694 <https://github.com/hyperspy/hyperspy/pull/2694>`_)
* Speed up setting CI on azure pipeline (`#2694 <https://github.com/hyperspy/hyperspy/pull/2694>`_)
* Fix performance issue with the lazy map method of lazy (`#2617 <https://github.com/hyperspy/hyperspy/pull/2617>`_)


Changelog
*********
Expand Down
18 changes: 16 additions & 2 deletions doc/dev_guide/lazy_computations.rst
Expand Up @@ -18,15 +18,29 @@ the two arrays are indeed almost identical, the most important differences are
shape of the result depends on the values and cannot be inferred without
execution. Hence, few operations can be run on ``res`` lazily, and it should
be avoided if possible.
- **Computations in Dask are Lazy**: Dask only preforms a computation when it has to. For example
the sum function isn't run until compute is called. This also means that some function can be
applied to only some portion of the data.

.. code-block::python
summed_lazy_signal = lazy_signal.sum(axis=lazy_signal.axes_manager.signal_axes) # Dask sets up tasks but does not compute
summed_lazy_signal.inav[0:10].compute() # computes sum for only 0:10
summed_lazy_signal.compute() # runs sum function
The easiest way to add new methods that work both with arbitrary navigation
dimensions and ``LazySignals`` is by using the ``map`` (or, for more control,
``_map_all`` or ``_map_iterate``) method to map your function ``func`` across
dimensions and ``LazySignals`` is by using the ``map`` method to map your function ``func`` across
all "navigation pixels" (e.g. spectra in a spectrum-image). ``map`` methods
will run the function on all pixels efficiently and put the results back in the
correct order. ``func`` is not constrained by ``dask`` and can use whatever
code (assignment, etc.) you wish.

The ``map`` function is flexible and should be able to handle most operations that
operate on some signal. If you add a ``BaseSignal`` with the same navigation size
as the signal it will be iterated alongside the mapped signal otherwise a keyword
argument is assumed to be constant and is applied to every signal.

If the new method cannot be coerced into a shape suitable for ``map``, separate
cases for lazy signals will have to be written. If a function operates on
arbitrary-sized arrays and the shape of the output can be known before calling,
Expand Down
1 change: 0 additions & 1 deletion hyperspy/_signals/hologram_image.py
Expand Up @@ -97,7 +97,6 @@ def _parse_sb_size(s, reference, sb_position, sb_size, parallel):
sb_size = BaseSignal(sb_size)
if isinstance(sb_size.data, daArray):
sb_size = sb_size.as_lazy()

if sb_size.axes_manager.navigation_size != s.axes_manager.navigation_size:
if sb_size.axes_manager.navigation_size:
raise ValueError('Sideband size dimensions do not match '
Expand Down
209 changes: 136 additions & 73 deletions hyperspy/_signals/lazy.py
Expand Up @@ -38,7 +38,8 @@
get_signal_chunk_slice)
from hyperspy.misc.hist_tools import histogram_dask
from hyperspy.misc.machine_learning import import_sklearn
from hyperspy.misc.utils import multiply, dummy_context_manager, isiterable
from hyperspy.misc.utils import (multiply, dummy_context_manager, isiterable,
process_function_blockwise, guess_output_signal_size,)


_logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -131,6 +132,41 @@ def compute(self, close_file=False, show_progressbar=None, **kwargs):

compute.__doc__ %= SHOW_PROGRESSBAR_ARG

def rechunk(self,
nav_chunks="auto",
sig_chunks=-1,
inplace=True,
**kwargs):
"""Rechunks the data using the same rechunking formula from Dask expect
that the navigation and signal chunks are defined seperately. Note, for most
functions sig_chunks should remain None so that it spans the entire signal.
Parameters:
nav_chunks : {tuple, int, "auto", None}
The navigation block dimensions to create.
-1 indicates the full size of the corresponding dimension.
Default is “auto” which automatically determines chunk sizes.
sig_chunks : {tuple, int, "auto", None}
The signal block dimensions to create.
-1 indicates the full size of the corresponding dimension.
Default is -1 which automatically spans the full signal dimension
**kwargs : dict
Any other keyword arguments for `dask.array.rechunk`
"""
if not isinstance(sig_chunks, tuple):
sig_chunks = (sig_chunks,)*len(self.axes_manager.signal_shape)
if not isinstance(nav_chunks, tuple):
nav_chunks = (nav_chunks,)*len(self.axes_manager.navigation_shape)
new_chunks = nav_chunks + sig_chunks
if inplace:
self.data = self.data.rechunk(new_chunks,
**kwargs)
else:
return self._deepcopy_with_new_data(self.data.rechunk(new_chunks,
**kwargs)
)


def close_file(self):
"""Closes the associated data file if any.
Expand Down Expand Up @@ -225,6 +261,11 @@ def _get_dask_chunks(self, axis=None, dtype=None):
chunks.append((dc.shape[i], ))
return tuple(chunks)

def _get_navigation_chunk_size(self):
nav_axes = self.axes_manager.navigation_indices_in_array
nav_chunks = tuple([self.data.chunks[i] for i in sorted(nav_axes)])
return nav_chunks

def _make_lazy(self, axis=None, rechunk=False, dtype=None):
self.data = self._lazy_data(axis=axis, rechunk=rechunk, dtype=dtype)

Expand Down Expand Up @@ -521,87 +562,109 @@ def _map_all(self, function, inplace=True, **kwargs):

def _map_iterate(self,
function,
iterating_kwargs=(),
iterating_kwargs=None,
show_progressbar=None,
parallel=None,
max_workers=None,
ragged=None,
ragged=False,
inplace=True,
output_signal_size=None,
output_dtype=None,
**kwargs):
if ragged not in (True, False):
raise ValueError('"ragged" kwarg has to be bool for lazy signals')
_logger.debug("Entering '_map_iterate'")

size = max(1, self.axes_manager.navigation_size)
from hyperspy.misc.utils import (create_map_objects,
map_result_construction)
func, iterators = create_map_objects(function, size, iterating_kwargs,
**kwargs)
iterators = (self._iterate_signal(), ) + iterators
res_shape = self.axes_manager._navigation_shape_in_array
# no navigation
if not len(res_shape) and ragged:
res_shape = (1,)

all_delayed = [dd(func)(data) for data in zip(*iterators)]

if ragged:
if inplace:
raise ValueError("In place computation is not compatible with "
"ragged array for lazy signal.")
# Shape of the signal dimension will change for the each nav.
# index, which means we can't predict the shape and the dtype needs
# to be python object to support numpy ragged array
sig_shape = ()
sig_dtype = np.dtype('O')
# unpacking keyword arguments
if iterating_kwargs is None:
iterating_kwargs = {}
nav_indexes = self.axes_manager.navigation_indices_in_array
if ragged and inplace:
raise ValueError("Ragged and inplace are not compatible with a lazy signal")
chunk_span = np.equal(self.data.chunksize, self.data.shape)
chunk_span = [chunk_span[i] for i in self.axes_manager.signal_indices_in_array]
if not all(chunk_span):
_logger.info("The chunk size needs to span the full signal size, rechunking...")
old_sig = self.rechunk(inplace=False)
else:
one_compute = all_delayed[0].compute()
# No signal dimension for scalar
if np.isscalar(one_compute):
sig_shape = ()
sig_dtype = type(one_compute)
old_sig = self
autodetermine = (output_signal_size is None or output_dtype is None) # try to guess output dtype and sig size?
nav_chunks = old_sig._get_navigation_chunk_size()
args = ()
arg_keys = ()
for key in iterating_kwargs:
if iterating_kwargs[key]._lazy:
if iterating_kwargs[key]._get_navigation_chunk_size() != nav_chunks:
iterating_kwargs[key].rechunk(nav_chunks=nav_chunks)
else:
sig_shape = one_compute.shape
sig_dtype = one_compute.dtype
pixels = [
da.from_delayed(
res, shape=sig_shape, dtype=sig_dtype) for res in all_delayed
]
if ragged:
if show_progressbar is None:
from hyperspy.defaults_parser import preferences
show_progressbar = preferences.General.show_progressbar
# We compute here because this is not sure if this is possible
# to make a ragged dask array: we need to provide a chunk size...
res_data = np.empty(res_shape, dtype=sig_dtype)
_logger.info("Lazy signal is computed to make the ragged array.")
if show_progressbar:
cm = ProgressBar
iterating_kwargs[key] = iterating_kwargs[key].as_lazy()
iterating_kwargs[key].rechunk(nav_chunks=nav_chunks)
extra_dims = (len(old_sig.axes_manager.signal_shape) -
len(iterating_kwargs[key].axes_manager.signal_shape))
if extra_dims > 0:
old_shape = iterating_kwargs[key].data.shape
new_shape = old_shape + (1,)*extra_dims
args += (iterating_kwargs[key].data.reshape(new_shape), )
else:
cm = dummy_context_manager
with cm():
try:
for i, pixel in enumerate(pixels):
res_data.flat[i] = pixel.compute()
except MemoryError:
raise MemoryError("The use of 'ragged' array requires the "
"computation of the lazy signal.")
args += (iterating_kwargs[key].data, )
arg_keys += (key,)
if autodetermine: #trying to guess the output d-type and size from one signal
testing_kwargs = {}
for key in iterating_kwargs:
test_ind = (0,) * len(old_sig.axes_manager.navigation_axes)
testing_kwargs[key] = np.squeeze(iterating_kwargs[key].inav[test_ind].data).compute()
testing_kwargs = {**kwargs, **testing_kwargs}
test_data = np.array(old_sig.inav[(0,) * len(old_sig.axes_manager.navigation_shape)].data.compute())
output_signal_size, output_dtype = guess_output_signal_size(test_signal=test_data,
function=function,
ragged=ragged,
**testing_kwargs)
# Dropping/Adding Axes
if output_signal_size == old_sig.axes_manager.signal_shape:
drop_axis = None
new_axis = None
axes_changed = False
else:
if len(pixels) > 0:
for step in reversed(res_shape):
_len = len(pixels)
starts = range(0, _len, step)
ends = range(step, _len + step, step)
pixels = [
da.stack(
pixels[s:e], axis=0) for s, e in zip(starts, ends)
]
res_data = pixels[0]

res = map_result_construction(
self, inplace, res_data, ragged, sig_shape, lazy=not ragged)

return res
axes_changed = True
if len(output_signal_size) != len(old_sig.axes_manager.signal_shape):
drop_axis = old_sig.axes_manager.signal_indices_in_array
new_axis = tuple(range(len(output_signal_size)))
else:
drop_axis = [it for (o, i, it) in zip(output_signal_size,
old_sig.axes_manager.signal_shape,
old_sig.axes_manager.signal_indices_in_array)
if o != i]
new_axis = drop_axis
chunks = tuple([old_sig.data.chunks[i] for i in sorted(nav_indexes)]) + output_signal_size
mapped = da.map_blocks(process_function_blockwise,
old_sig.data,
*args,
function=function,
nav_indexes=nav_indexes,
drop_axis=drop_axis,
new_axis=new_axis,
output_signal_size=output_signal_size,
dtype=output_dtype,
chunks=chunks,
arg_keys=arg_keys,
**kwargs)
if inplace:
self.data = mapped
sig = self
else:
sig = self._deepcopy_with_new_data(mapped)
if ragged:
sig.axes_manager.remove(sig.axes_manager.signal_axes)
sig._lazy = True
sig._assign_subclass()

return sig
# remove if too many axes
if axes_changed:
sig.axes_manager.remove(sig.axes_manager.signal_axes[len(output_signal_size):])
# add additional required axes
for ind in range(
len(output_signal_size) - sig.axes_manager.signal_dimension, 0, -1):
sig.axes_manager._append_axis(output_signal_size[-ind], navigate=False)
if not ragged:
sig.get_dimensions_from_data()
return sig

def _iterate_signal(self):
if self.axes_manager.navigation_size < 2:
Expand Down

0 comments on commit 499824b

Please sign in to comment.