Skip to content

Commit

Permalink
Merge pull request #2703 from magnunor/non_lazy_map_improvements
Browse files Browse the repository at this point in the history
Improve non-lazy map method
  • Loading branch information
ericpre committed Jan 5, 2022
2 parents c43bda5 + a4a75d3 commit 1ffb3fa
Show file tree
Hide file tree
Showing 8 changed files with 1,051 additions and 495 deletions.
50 changes: 46 additions & 4 deletions doc/user_guide/signal.rst
Original file line number Diff line number Diff line change
Expand Up @@ -997,7 +997,8 @@ arguments as in the following example.
If all function calls do not return identically-shaped results, only navigation
information is preserved, and the final result is an array where
each element corresponds to the result of the function (or arbitrary object
type). As such, most HyperSpy functions cannot operate on such Signal, and the
type). These are :ref:`ragged arrays<signal.ragged>` and has the dtype `object`.
As such, most HyperSpy functions cannot operate on such signals, and the
data should be accessed directly.

The ``inplace`` keyword (by default ``True``) of the
Expand All @@ -1012,12 +1013,13 @@ data (default, ``True``) or storing it to a new signal (``False``).
>>> result = image_stack.map(scipy.ndimage.rotate,
... angle=angles.T,
... inplace=False,
... ragged=True,
... reshape=True)
100%|████████████████████████████████████████████| 4/4 [00:00<00:00, 18.42it/s]
>>> result
<BaseSignal, title: , dimensions: (4|)>
>>> image_stack.data.dtype
>>> result.data.dtype
dtype('O')
>>> for d in result.data.flat:
... print(d.shape)
Expand All @@ -1037,9 +1039,9 @@ The execution can be sped up by passing ``parallel`` keyword to the
>>> def slow_func(data):
... time.sleep(1.)
... return data + 1
>>> s = hs.signals.Signal1D(np.arange(20).reshape((20,1)))
>>> s = hs.signals.Signal1D(np.arange(40).reshape((20, 2)))
>>> s
<Signal1D, title: , dimensions: (20|1)>
<Signal1D, title: , dimensions: (20|2)>
>>> s.map(slow_func, parallel=False)
100%|██████████████████████████████████████| 20/20 [00:20<00:00, 1.00s/it]
>>> # some operations will be done in parallel:
Expand Down Expand Up @@ -1068,6 +1070,46 @@ navigation dimension of s.
100%|██████████| 10/10 [00:00<00:00, 2573.19it/s]
.. _lazy_output-map-label:

.. versionadded:: 1.7
Get result as lazy signal

Especially when working with very large datasets, it can be useful to
not do the computation immediately. For example if it would make you run
out of memory. In that case, the `lazy_output` parameter can be used.

.. code-block:: python
>>> from scipy.ndimage import gaussian_filter
>>> s = hs.signals.Signal2D(np.random.random((4, 4, 128, 128)))
>>> s_out = s.map(gaussian_filter, sigma=5, inplace=False, lazy_output=True)
<LazySignal2D, title: , dimensions: (4, 4|128, 128)>
`s_out` can then be saved to a hard drive, to avoid it being loaded into memory.
Alternatively, it can be computed and loaded into memory using `s_out.compute()`

.. code-block:: python
>>> s_out.save("gaussian_filter_file.hspy")
Another advantage of using `lazy_output=True` is the ability to "chain" operations,
by running `map` on the output from a previous `map` operation.
For example, first running a Gaussian filter, followed by peak finding. This can
improve the computation time, and reduce the memory need.

.. code-block:: python
>>> s_out = s.map(scipy.ndimage.gaussian_filter, sigma=5, inplace=False, lazy_output=True)
>>> from skimage.feature import blob_dog
>>> s_out1 = s_out.map(blob_dog, threshold=0.05, inplace=False, ragged=True, lazy_output=False)
>>> s_out1
<BaseSignal, title: , dimensions: (4, 4|ragged)>
This is especially relevant for very large datasets, where memory use can be a
limiting factor.


Cropping
^^^^^^^^

Expand Down
133 changes: 1 addition & 132 deletions hyperspy/_signals/lazy.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,7 @@
get_signal_chunk_slice)
from hyperspy.misc.hist_tools import histogram_dask
from hyperspy.misc.machine_learning import import_sklearn
from hyperspy.misc.utils import (multiply, dummy_context_manager, isiterable,
process_function_blockwise, guess_output_signal_size,)
from hyperspy.misc.utils import multiply, dummy_context_manager, isiterable


_logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -706,136 +705,6 @@ def _calculate_summary_statistics(self, rechunk=True):
da.nanmax(data), )
return _mean, _std, _min, _q1, _q2, _q3, _max

def _map_all(self, function, inplace=True, **kwargs):
calc_result = dd(function)(self.data, **kwargs)
if inplace:
self.data = da.from_delayed(calc_result, shape=self.data.shape,
dtype=self.data.dtype)
return None
return self._deepcopy_with_new_data(calc_result)

def _map_iterate(self,
function,
iterating_kwargs=None,
show_progressbar=None,
parallel=None,
max_workers=None,
ragged=False,
inplace=True,
output_signal_size=None,
output_dtype=None,
**kwargs):
# unpacking keyword arguments
if iterating_kwargs is None:
iterating_kwargs = {}
elif isinstance(iterating_kwargs, (tuple, list)):
iterating_kwargs = dict((k, v) for k, v in iterating_kwargs)

nav_indexes = self.axes_manager.navigation_indices_in_array
if ragged and inplace:
raise ValueError("Ragged and inplace are not compatible with a lazy signal")
chunk_span = np.equal(self.data.chunksize, self.data.shape)
chunk_span = [chunk_span[i] for i in self.axes_manager.signal_indices_in_array]
if not all(chunk_span):
_logger.info("The chunk size needs to span the full signal size, rechunking...")
old_sig = self.rechunk(inplace=False)
else:
old_sig = self
autodetermine = (output_signal_size is None or output_dtype is None) # try to guess output dtype and sig size?
nav_chunks = old_sig._get_navigation_chunk_size()
args = ()
arg_keys = ()

for key in iterating_kwargs:
if not isinstance(iterating_kwargs[key], BaseSignal):
iterating_kwargs[key] = BaseSignal(iterating_kwargs[key].T).T
warnings.warn(
"Passing arrays as keyword arguments can be ambigous. "
"This is deprecated and will be removed in HyperSpy 2.0. "
"Pass signal instances instead.",
VisibleDeprecationWarning)
if iterating_kwargs[key]._lazy:
if iterating_kwargs[key]._get_navigation_chunk_size() != nav_chunks:
iterating_kwargs[key].rechunk(nav_chunks=nav_chunks)
else:
iterating_kwargs[key] = iterating_kwargs[key].as_lazy()
iterating_kwargs[key].rechunk(nav_chunks=nav_chunks)
extra_dims = (len(old_sig.axes_manager.signal_shape) -
len(iterating_kwargs[key].axes_manager.signal_shape))
if extra_dims > 0:
old_shape = iterating_kwargs[key].data.shape
new_shape = old_shape + (1,)*extra_dims
args += (iterating_kwargs[key].data.reshape(new_shape), )
else:
args += (iterating_kwargs[key].data, )
arg_keys += (key,)

if autodetermine: #trying to guess the output d-type and size from one signal
testing_kwargs = {}
for key in iterating_kwargs:
test_ind = (0,) * len(old_sig.axes_manager.navigation_axes)
testing_kwargs[key] = np.squeeze(iterating_kwargs[key].inav[test_ind].data).compute()
testing_kwargs = {**kwargs, **testing_kwargs}
test_data = np.array(old_sig.inav[(0,) * len(old_sig.axes_manager.navigation_shape)].data.compute())
output_signal_size, output_dtype = guess_output_signal_size(test_signal=test_data,
function=function,
ragged=ragged,
**testing_kwargs)
# Dropping/Adding Axes
if output_signal_size == old_sig.axes_manager.signal_shape:
drop_axis = None
new_axis = None
axes_changed = False
else:
axes_changed = True
if len(output_signal_size) != len(old_sig.axes_manager.signal_shape):
drop_axis = old_sig.axes_manager.signal_indices_in_array
new_axis = tuple(range(len(nav_indexes), len(nav_indexes) + len(output_signal_size)))
else:
drop_axis = [it for (o, i, it) in zip(output_signal_size,
old_sig.axes_manager.signal_shape,
old_sig.axes_manager.signal_indices_in_array)
if o != i]
new_axis = drop_axis
chunks = tuple([old_sig.data.chunks[i] for i in sorted(nav_indexes)]) + output_signal_size
mapped = da.map_blocks(process_function_blockwise,
old_sig.data,
*args,
function=function,
nav_indexes=nav_indexes,
drop_axis=drop_axis,
new_axis=new_axis,
output_signal_size=output_signal_size,
dtype=output_dtype,
chunks=chunks,
arg_keys=arg_keys,
**kwargs)
if inplace:
self.data = mapped
sig = self
else:
sig = self._deepcopy_with_new_data(mapped)
if ragged:
axes_dicts = self.axes_manager._get_axes_dicts(
self.axes_manager.navigation_axes
)
sig.axes_manager.__init__(axes_dicts)
sig.axes_manager._ragged = True
sig._assign_subclass()
return sig

# remove if too many axes
if axes_changed:
sig.axes_manager.remove(sig.axes_manager.signal_axes[len(output_signal_size):])
# add additional required axes
for ind in range(
len(output_signal_size) - sig.axes_manager.signal_dimension, 0, -1):
sig.axes_manager._append_axis(size=output_signal_size[-ind],
navigate=False)
if not ragged:
sig.get_dimensions_from_data()
return sig

def _block_iterator(self,
flat_signal=True,
get=dask.threaded.get,
Expand Down
50 changes: 28 additions & 22 deletions hyperspy/_signals/signal1d.py
Original file line number Diff line number Diff line change
Expand Up @@ -558,11 +558,13 @@ def interpolating_function(dat):
**kwargs)
dat[i1:i2] = dat_int(list(range(i1, i2)))
return dat
self._map_iterate(interpolating_function,
ragged=False,
parallel=parallel,
show_progressbar=show_progressbar,
max_workers=max_workers)
self.map(
interpolating_function,
ragged=False,
parallel=parallel,
show_progressbar=show_progressbar,
max_workers=max_workers,
)
self.events.data_changed.trigger(obj=self)

interpolate_in_between.__doc__ %= (SHOW_PROGRESSBAR_ARG, PARALLEL_ARG, MAX_WORKERS_ARG)
Expand Down Expand Up @@ -671,7 +673,7 @@ def estimate_shift1D(
if interpolate is True:
shift_array = shift_array / ip
shift_array = shift_array * axis.scale
if self._lazy:
if shift_signal._lazy:
# We must compute right now because otherwise any changes to the
# axes_manager of the signal later in the workflow may result in
# a wrong shift_array
Expand Down Expand Up @@ -1488,7 +1490,8 @@ def find_peaks1D_ohaver(self, xdim=None,
ragged=True,
parallel=parallel,
max_workers=max_workers,
inplace=False)
inplace=False,
lazy_output=False)
return peaks.data

find_peaks1D_ohaver.__doc__ %= (PARALLEL_ARG, MAX_WORKERS_ARG)
Expand Down Expand Up @@ -1556,19 +1559,20 @@ def estimate_peak_width(
)
parallel = False

axis = self.axes_manager.signal_axes[0]
# x = axis.axis
# axis is a keyword already used by self.map so calling this axis_arg
# to avoid "parameter collision
axis_arg = self.axes_manager.signal_axes[0]
maxval = self.axes_manager.navigation_size
show_progressbar = show_progressbar and maxval > 0

def estimating_function(spectrum,
window=None,
factor=0.5,
axis=None):
x = axis.axis
axis_arg=None):
x = axis_arg.axis
if window is not None:
vmax = axis.index2value(spectrum.argmax())
slices = axis._get_array_slices(
vmax = axis_arg.index2value(spectrum.argmax())
slices = axis_arg._get_array_slices(
slice(vmax - window * 0.5, vmax + window * 0.5))
spectrum = spectrum[slices]
x = x[slices]
Expand All @@ -1582,15 +1586,17 @@ def estimating_function(spectrum,
else:
return np.full((2,), np.nan)

both = self._map_iterate(estimating_function,
window=window,
factor=factor,
axis=axis,
ragged=False,
inplace=False,
parallel=parallel,
show_progressbar=show_progressbar,
max_workers=None)
both = self.map(
estimating_function,
window=window,
factor=factor,
axis_arg=axis_arg,
ragged=False,
inplace=False,
parallel=parallel,
show_progressbar=show_progressbar,
max_workers=max_workers,
)
left, right = both.T.split()
width = right - left
if factor == 0.5:
Expand Down
10 changes: 10 additions & 0 deletions hyperspy/docstrings/signal.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,16 @@
``None``, the default from the preferences settings is used. The number
of threads is controlled by the ``max_workers`` argument."""

LAZY_OUTPUT_ARG = \
"""lazy_output : None or bool
If ``True``, the output will be returned as a lazy signal. This means
the calculation itself will be delayed until either compute() is used,
or the signal is stored as a file.
If ``False``, the output will be returned as a non-lazy signal, this
means the outputs will be calculated directly, and loaded into memory.
If ``None`` the output will be lazy if the input signal is lazy, and
non-lazy if the input signal is non-lazy."""

MAX_WORKERS_ARG = \
"""max_workers : None or int
Maximum number of threads used when ``parallel=True``. If None, defaults
Expand Down

0 comments on commit 1ffb3fa

Please sign in to comment.