Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve lazy operations and Chunking #2617

Merged
Merged
Show file tree
Hide file tree
Changes from 34 commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
975f281
Rewrote map_iterate function for lazy signals
CSSFrancis Jan 9, 2021
67880a3
Added in process function blockwise to support lazy mapping
CSSFrancis Jan 9, 2021
135a3c0
Added in automated signal size and datatype determination. There are …
CSSFrancis Jan 12, 2021
1416f42
fixed some tests which weren't testing the right things.
CSSFrancis Jan 14, 2021
f078c8e
Fixed some peak finding
CSSFrancis Jan 14, 2021
4e059f7
cleaned up some more failing tests realated to new lazy ragged operat…
CSSFrancis Jan 14, 2021
20ee37f
Renamed variable away from protected namespace
CSSFrancis Jan 14, 2021
063ecba
fixed my own error I created
CSSFrancis Jan 14, 2021
2434e91
fixed my own error I created (part2)
CSSFrancis Jan 15, 2021
5c69155
Changed conflicting tests with singleton signals.
CSSFrancis Jan 15, 2021
c803334
Added in new documentation
CSSFrancis Jan 15, 2021
840573e
Added in ability to explicitly define output datatype
CSSFrancis Jan 15, 2021
2746e27
End of the day commit
CSSFrancis Jan 15, 2021
aba5fe6
Added in the ability to directly pass signal datatype and size as arg…
CSSFrancis Jan 21, 2021
7e749fb
Cleaned up some of the code to make it more readable
CSSFrancis Jan 22, 2021
d2d6f57
Merge branch 'RELEASE_next_patch' into improve_lazy_operations
CSSFrancis Jan 22, 2021
d19d4e5
Skipped tests which try to call flags attribute of dask arrays which …
CSSFrancis Feb 25, 2021
ea63977
Merge branch 'RELEASE_next_patch' into improve_lazy_operations
CSSFrancis Feb 25, 2021
9ec3949
Cleaned up testing
CSSFrancis Feb 25, 2021
7623917
Rechunking concatenated data so that chunks span signal dimension.
CSSFrancis Mar 1, 2021
d274187
bumped numpy from 1.16 -> 1.17
CSSFrancis Mar 1, 2021
6c49f57
bumped numpy from 1.16 -> 1.17
CSSFrancis Mar 1, 2021
56c71f6
bumped dask from 2.0 -> 2.1.0
CSSFrancis Mar 1, 2021
ff03ba9
Bump numpy to 1.17.1 to avoid regression with np.lexsort.
ericpre Mar 1, 2021
989429b
Updated documentation and simplified predictive methods
CSSFrancis Mar 15, 2021
5546191
Merge remote-tracking branch 'origin/improve_lazy_operations' into im…
CSSFrancis Mar 15, 2021
83e9a0a
Fixed align 2d method
CSSFrancis Mar 16, 2021
29ed563
Merge branch 'RELEASE_next_patch' into improve_lazy_operations
CSSFrancis Mar 16, 2021
bdf4e75
Computed peak locations rather than returning lazy positions
CSSFrancis Mar 16, 2021
62ac3fe
Rechunking using kwargs instead of passing keywords
CSSFrancis Mar 17, 2021
c4751cc
Improved documentation and minor code changes
CSSFrancis Mar 22, 2021
5defd64
Fixed map function with iterating signal beside it
CSSFrancis Mar 25, 2021
678eab6
Added documentation, replaced default it_kwarg value to None
CSSFrancis Mar 26, 2021
86cd97c
Added in test to assert chunks iterating alongside properly
CSSFrancis Mar 26, 2021
9645831
Got rid of duplication of map_iterate method
CSSFrancis Apr 1, 2021
9617444
Merge branch 'RELEASE_next_patch' into improve_lazy_operations
CSSFrancis Apr 1, 2021
87aad0d
Added in tests to up line coverage
CSSFrancis Apr 5, 2021
38d7132
Removed automatic conversion to non lazy signal
CSSFrancis Apr 5, 2021
5438aa7
Should increase coverage for lazy iterating signal
CSSFrancis Apr 5, 2021
5a3eba5
Merge remote-tracking branch 'upstream/RELEASE_next_patch' into impro…
ericpre Apr 7, 2021
f8ab0ce
Add entry to changelog.
ericpre Apr 7, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
8 changes: 4 additions & 4 deletions .github/workflows/tests.yml
Expand Up @@ -11,7 +11,7 @@ jobs:
MPLBACKEND: agg
PIP_ARGS: --upgrade -e
PYTEST_ARGS: --pyargs hyperspy --reruns 3 -n 2 --instafail
PYTEST_ARGS_COVERAGE:
PYTEST_ARGS_COVERAGE:
strategy:
fail-fast: false
matrix:
Expand All @@ -20,7 +20,7 @@ jobs:
PIP_SELECTOR: ['[all, tests]']
DEPENDENCIES_DEV: [false]
include:
# test oldest supported version of main dependencies on python 3.6
# test oldest supported version of main dependencies on python 3.6
- os: ubuntu
PYTHON_VERSION: 3.6
OLDEST_SUPPORTED_VERSION: true
Expand Down Expand Up @@ -49,7 +49,7 @@ jobs:
PYTHON_VERSION: 3.9
PIP_SELECTOR: '[all, tests]'
exclude:
# redundant build (same as coverage)
# redundant build (same as coverage)
- os: ubuntu
PYTHON_VERSION: 3.7

Expand Down Expand Up @@ -88,7 +88,7 @@ jobs:
pytest ${{ env.PYTEST_ARGS }} ${{ matrix.PYTEST_ARGS_COVERAGE }}

- name: Upload coverage to Codecov
if: ${{ always() }} && ${{ matrix.PYTEST_ARGS_COVERAGE }}
if: ${{ always() }} && ${{ matrix.PYTEST_ARGS_COVERAGE }}
uses: codecov/codecov-action@v1

build_doc:
Expand Down
18 changes: 16 additions & 2 deletions doc/dev_guide/lazy_computations.rst
Expand Up @@ -18,15 +18,29 @@ the two arrays are indeed almost identical, the most important differences are
shape of the result depends on the values and cannot be inferred without
execution. Hence, few operations can be run on ``res`` lazily, and it should
be avoided if possible.
- **Computations in Dask are Lazy**: Dask only preforms a computation when it has to. For example
the sum function isn't run until compute is called. This also means that some function can be
applied to only some portion of the data.

.. code-block::python
summed_lazy_signal = lazy_signal.sum(axis=lazy_signal.axes_manager.signal_axes) # Dask sets up tasks but does not compute
summed_lazy_signal.inav[0:10].compute() # computes sum for only 0:10
summed_lazy_signal.compute() # runs sum function



The easiest way to add new methods that work both with arbitrary navigation
dimensions and ``LazySignals`` is by using the ``map`` (or, for more control,
``_map_all`` or ``_map_iterate``) method to map your function ``func`` across
dimensions and ``LazySignals`` is by using the ``map`` method to map your function ``func`` across
all "navigation pixels" (e.g. spectra in a spectrum-image). ``map`` methods
will run the function on all pixels efficiently and put the results back in the
correct order. ``func`` is not constrained by ``dask`` and can use whatever
code (assignment, etc.) you wish.

The ``map`` function is flexible and should be able to handle most operations that
operate on some signal. If you add a ``BaseSignal`` with the same navigation size
as the signal it will be iterated alongside the mapped signal otherwise a keyword
argument is assumed to be constant and is applied to every signal.

If the new method cannot be coerced into a shape suitable for ``map``, separate
cases for lazy signals will have to be written. If a function operates on
arbitrary-sized arrays and the shape of the output can be known before calling,
Expand Down
1 change: 0 additions & 1 deletion hyperspy/_signals/hologram_image.py
Expand Up @@ -97,7 +97,6 @@ def _parse_sb_size(s, reference, sb_position, sb_size, parallel):
sb_size = BaseSignal(sb_size)
if isinstance(sb_size.data, daArray):
sb_size = sb_size.as_lazy()

if sb_size.axes_manager.navigation_size != s.axes_manager.navigation_size:
if sb_size.axes_manager.navigation_size:
raise ValueError('Sideband size dimensions do not match '
Expand Down
211 changes: 138 additions & 73 deletions hyperspy/_signals/lazy.py
Expand Up @@ -38,7 +38,8 @@
get_signal_chunk_slice)
from hyperspy.misc.hist_tools import histogram_dask
from hyperspy.misc.machine_learning import import_sklearn
from hyperspy.misc.utils import multiply, dummy_context_manager, isiterable
from hyperspy.misc.utils import (multiply, dummy_context_manager, isiterable,
process_function_blockwise, guess_output_signal_size,)


_logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -131,6 +132,41 @@ def compute(self, close_file=False, show_progressbar=None, **kwargs):

compute.__doc__ %= SHOW_PROGRESSBAR_ARG

def rechunk(self,
nav_chunks="auto",
sig_chunks=-1,
inplace=True,
**kwargs):
"""Rechunks the data using the same rechunking formula from Dask expect
that the navigation and signal chunks are defined seperately. Note, for most
functions sig_chunks should remain None so that it spans the entire signal.

Parameters:
ericpre marked this conversation as resolved.
Show resolved Hide resolved
nav_chunks : {tuple, int, "auto", None}
The navigation block dimensions to create.
-1 indicates the full size of the corresponding dimension.
Default is “auto” which automatically determines chunk sizes.
sig_chunks : {tuple, int, "auto", None}
The signal block dimensions to create.
-1 indicates the full size of the corresponding dimension.
Default is -1 which automatically spans the full signal dimension
**kwargs : dict
Any other keyword arguments for `dask.array.rechunk`
"""
if not isinstance(sig_chunks, tuple):
ericpre marked this conversation as resolved.
Show resolved Hide resolved
sig_chunks = (sig_chunks,)*len(self.axes_manager.signal_shape)
if not isinstance(nav_chunks, tuple):
nav_chunks = (nav_chunks,)*len(self.axes_manager.navigation_shape)
new_chunks = nav_chunks + sig_chunks
if inplace:
self.data = self.data.rechunk(new_chunks,
**kwargs)
else:
return self._deepcopy_with_new_data(self.data.rechunk(new_chunks,
**kwargs)
)


def close_file(self):
"""Closes the associated data file if any.

Expand Down Expand Up @@ -225,6 +261,11 @@ def _get_dask_chunks(self, axis=None, dtype=None):
chunks.append((dc.shape[i], ))
return tuple(chunks)

def _get_navigation_chunk_size(self):
nav_axes = self.axes_manager.navigation_indices_in_array
nav_chunks = tuple([self.data.chunks[i] for i in sorted(nav_axes)])
return nav_chunks

def _make_lazy(self, axis=None, rechunk=False, dtype=None):
self.data = self._lazy_data(axis=axis, rechunk=rechunk, dtype=dtype)

Expand Down Expand Up @@ -521,87 +562,111 @@ def _map_all(self, function, inplace=True, **kwargs):

def _map_iterate(self,
function,
iterating_kwargs=(),
iterating_kwargs=None,
show_progressbar=None,
parallel=None,
max_workers=None,
ragged=None,
ragged=False,
inplace=True,
output_signal_size=None,
output_dtype=None,
**kwargs):
if ragged not in (True, False):
raise ValueError('"ragged" kwarg has to be bool for lazy signals')
_logger.debug("Entering '_map_iterate'")

size = max(1, self.axes_manager.navigation_size)
from hyperspy.misc.utils import (create_map_objects,
map_result_construction)
func, iterators = create_map_objects(function, size, iterating_kwargs,
**kwargs)
iterators = (self._iterate_signal(), ) + iterators
res_shape = self.axes_manager._navigation_shape_in_array
# no navigation
if not len(res_shape) and ragged:
res_shape = (1,)

all_delayed = [dd(func)(data) for data in zip(*iterators)]

if ragged:
if inplace:
raise ValueError("In place computation is not compatible with "
"ragged array for lazy signal.")
# Shape of the signal dimension will change for the each nav.
# index, which means we can't predict the shape and the dtype needs
# to be python object to support numpy ragged array
sig_shape = ()
sig_dtype = np.dtype('O')
# unpacking keyword arguments
if iterating_kwargs is None:
iterating_kwargs = {}
nav_indexes = self.axes_manager.navigation_indices_in_array
if ragged and inplace:
raise ValueError("Ragged and inplace are not compatible with a lazy signal")
chunk_span = np.equal(self.data.chunksize, self.data.shape)
chunk_span = [chunk_span[i] for i in self.axes_manager.signal_indices_in_array]
ericpre marked this conversation as resolved.
Show resolved Hide resolved
if not all(chunk_span):
_logger.info("The chunk size needs to span the full signal size, rechunking...")
old_sig = self.rechunk(inplace=False)
else:
one_compute = all_delayed[0].compute()
# No signal dimension for scalar
if np.isscalar(one_compute):
sig_shape = ()
sig_dtype = type(one_compute)
old_sig = self
autodetermine = (output_signal_size is None or output_dtype is None) # try to guess output dtype and sig size?
nav_chunks = old_sig._get_navigation_chunk_size()
args = ()
arg_keys = ()
for key in iterating_kwargs:
if iterating_kwargs[key]._lazy:
if iterating_kwargs[key]._get_navigation_chunk_size() != nav_chunks:
iterating_kwargs[key].rechunk(nav_chunks=nav_chunks)
else:
sig_shape = one_compute.shape
sig_dtype = one_compute.dtype
pixels = [
da.from_delayed(
res, shape=sig_shape, dtype=sig_dtype) for res in all_delayed
]
if ragged:
if show_progressbar is None:
from hyperspy.defaults_parser import preferences
show_progressbar = preferences.General.show_progressbar
# We compute here because this is not sure if this is possible
# to make a ragged dask array: we need to provide a chunk size...
res_data = np.empty(res_shape, dtype=sig_dtype)
_logger.info("Lazy signal is computed to make the ragged array.")
if show_progressbar:
cm = ProgressBar
iterating_kwargs[key] = iterating_kwargs[key].as_lazy()
iterating_kwargs[key].rechunk(nav_chunks=nav_chunks)
extra_dims = (len(old_sig.axes_manager.signal_shape) -
len(iterating_kwargs[key].axes_manager.signal_shape))
if extra_dims > 0:
old_shape = iterating_kwargs[key].data.shape
new_shape = old_shape + (1,)*extra_dims
args += (iterating_kwargs[key].data.reshape(new_shape), )
else:
cm = dummy_context_manager
with cm():
try:
for i, pixel in enumerate(pixels):
res_data.flat[i] = pixel.compute()
except MemoryError:
raise MemoryError("The use of 'ragged' array requires the "
"computation of the lazy signal.")
args += (iterating_kwargs[key].data, )
arg_keys += (key,)
if autodetermine: #trying to guess the output d-type and size from one signal
testing_kwargs = {}
for key in iterating_kwargs:
test_ind = (0,) * len(old_sig.axes_manager.navigation_axes)
testing_kwargs[key] = np.squeeze(iterating_kwargs[key].inav[test_ind].data).compute()
testing_kwargs = {**kwargs, **testing_kwargs}
test_data = np.array(old_sig.inav[(0,) * len(old_sig.axes_manager.navigation_shape)].data.compute())
output_signal_size, output_dtype = guess_output_signal_size(test_signal=test_data,
function=function,
ragged=ragged,
**testing_kwargs)
# Dropping/Adding Axes
if output_signal_size == old_sig.axes_manager.signal_shape:
drop_axis = None
new_axis = None
axes_changed = False
else:
if len(pixels) > 0:
for step in reversed(res_shape):
_len = len(pixels)
starts = range(0, _len, step)
ends = range(step, _len + step, step)
pixels = [
da.stack(
pixels[s:e], axis=0) for s, e in zip(starts, ends)
]
res_data = pixels[0]

res = map_result_construction(
self, inplace, res_data, ragged, sig_shape, lazy=not ragged)

return res
axes_changed = True
if len(output_signal_size) != len(old_sig.axes_manager.signal_shape):
drop_axis = old_sig.axes_manager.signal_indices_in_array
new_axis = tuple(range(len(output_signal_size)))
else:
drop_axis = [it for (o, i, it) in zip(output_signal_size,
old_sig.axes_manager.signal_shape,
old_sig.axes_manager.signal_indices_in_array)
if o != i]
new_axis = drop_axis
chunks = tuple([old_sig.data.chunks[i] for i in sorted(nav_indexes)]) + output_signal_size
mapped = da.map_blocks(process_function_blockwise,
old_sig.data,
*args,
function=function,
nav_indexes=nav_indexes,
drop_axis=drop_axis,
new_axis=new_axis,
output_signal_size=output_signal_size,
dtype=output_dtype,
chunks=chunks,
arg_keys=arg_keys,
**kwargs)
if inplace:
self.data = mapped
sig = self
else:
sig = self._deepcopy_with_new_data(mapped)
if ragged:
sig.axes_manager.remove(sig.axes_manager.signal_axes)
sig._lazy = True
sig._assign_subclass()

return sig
# remove if too many axes
if axes_changed:
sig.axes_manager.remove(sig.axes_manager.signal_axes[len(output_signal_size):])
# add additional required axes
for ind in range(
len(output_signal_size) - sig.axes_manager.signal_dimension, 0, -1):
sig.axes_manager._append_axis(output_signal_size[-ind], navigate=False)
if not ragged:
sig.get_dimensions_from_data()
if len(sig.axes_manager._axes) == 0:
sig.axes_manager._append_axis(1, navigate=True, name='Scalar')
return sig

def _iterate_signal(self):
if self.axes_manager.navigation_size < 2:
Expand Down