Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Map Clean Up #3198

Merged
merged 23 commits into from Sep 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
6bd2a9a
Refactor: Remove parallel option in favor of setting outside of map f…
CSSFrancis Jul 19, 2023
afe9608
NewFeature: Added navigation chunks parameter for setting the navigat…
CSSFrancis Jul 19, 2023
d2dd0f5
Testing: Removed all references to parallel kwarg
CSSFrancis Jul 20, 2023
e35f476
Testing: Removed parallel reference in documentation
CSSFrancis Jul 20, 2023
69d116e
Documentation: Added examples using Single threaded and distributed b…
CSSFrancis Jul 20, 2023
e52b342
Documentation: Add changelog
CSSFrancis Jul 20, 2023
f14b8d8
Testing: Add tests for setting chunks
CSSFrancis Jul 20, 2023
272d7d2
Documentation: Added additional example for map function
CSSFrancis Jul 20, 2023
904a4e0
Refactor: Remove parallel option in favor of setting outside of map f…
CSSFrancis Jul 19, 2023
dec3661
NewFeature: Added navigation chunks parameter for setting the navigat…
CSSFrancis Jul 19, 2023
59a26f0
Testing: Removed all references to parallel kwarg
CSSFrancis Jul 20, 2023
0e78479
Testing: Removed parallel reference in documentation
CSSFrancis Jul 20, 2023
10759f9
Documentation: Added examples using Single threaded and distributed b…
CSSFrancis Jul 20, 2023
7a728cd
Documentation: Add changelog
CSSFrancis Jul 20, 2023
868157b
Testing: Add tests for setting chunks
CSSFrancis Jul 20, 2023
79ffe2a
Documentation: Added additional example for map function
CSSFrancis Jul 20, 2023
380cf35
Tweak documentation
ericpre Sep 2, 2023
c22e028
Fix `_map_all`: remove axis when necessary
ericpre Sep 2, 2023
13e7f7e
Rename `max_workers` to `num_workers` to be consistent with dask
ericpre Sep 2, 2023
1f584d8
Merge remote-tracking branch 'origin/map_clean_up' into map_clean_up
CSSFrancis Sep 6, 2023
59494af
Documentation: Moved documentation for dask-scheduling
CSSFrancis Sep 6, 2023
9a341f5
Documentation: Fixed codeblock not showing up and fixed spelling for …
CSSFrancis Sep 7, 2023
1b2f0cc
Documentation: Fixed spelling error
CSSFrancis Sep 7, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
115 changes: 113 additions & 2 deletions doc/user_guide/big_data.rst
Expand Up @@ -275,7 +275,7 @@ Lazy data processing on GPUs requires explicitly transferring the data to the
GPU.

On linux, it is recommended to use the
`dask_cuda <https://docs.rapids.ai/api/dask-cuda/stable/index.html>`_ library
`dask_cuda <https://docs.rapids.ai/api/dask-cuda/stable/index.html>`_ library
(not supported on windows) to manage the dask scheduler. As for CPU lazy
processing, if the dask scheduler is not specified, the default scheduler
will be used.
Expand Down Expand Up @@ -318,7 +318,7 @@ Most curve-fitting functionality will automatically work on models created from
lazily loaded signals. HyperSpy extracts the relevant chunk from the signal and fits to that.

The linear ``'lstsq'`` optimizer supports fitting the entire dataset in a vectorised manner
using :py:func:`dask.array.linalg.lstsq`. This can give potentially enormous performance benefits over fitting
using :py:func:`dask.array.linalg.lstsq`. This can give potentially enormous performance benefits over fitting
with a nonlinear optimizer, but comes with the restrictions explained in the :ref:`linear fitting<linear_fitting-label>` section.

Practical tips
Expand Down Expand Up @@ -433,6 +433,117 @@ compute the result of all functions that are affected by the axes
parameters. This is the reason why e.g. the result of
:py:meth:`~._signals.signal1d.Signal1D.shift1D` is not lazy.

.. _dask_backends:

Dask Backends
-------------

Dask is a flexible library for parallel computing in Python. All of the lazy operations in
hyperspy run through dask. Dask can be used to run computations on a single machine or
scaled to a cluster. The following example shows how to use dask to run computations on a
variety of different hardware:

Single Threaded Scheduler
^^^^^^^^^^^^^^^^^^^^^^^^^

The single threaded scheduler in dask is useful for debugging and testing. It is not
recommended for general use.

.. code-block:: python

>>> import dask
>>> import hyperspy.api as hs
>>> import numpy as np
>>> import dask.array as da

>>> # setting the scheduler to single-threaded globally
>>> dask.config.set(scheduler='single-threaded')

Alternatively, you can set the scheduler to single-threaded for a single function call by
setting the ``scheduler`` keyword argument to ``'single-threaded'``.

Or for something like plotting you can set the scheduler to single-threaded for the
duration of the plotting call by using the ``with dask.config.set`` context manager.

.. code-block:: python

>>> s.compute(scheduler="single-threaded") # uses single-threaded scheduler

>>> with dask.config.set(scheduler='single-threaded'):
>>> s.plot() # uses single-threaded scheduler to compute each chunk and then passes one chunk the memory

Single Machine Schedulers
^^^^^^^^^^^^^^^^^^^^^^^^^
Dask has two schedulers available for single machines.

1. Threaded Scheduler:
Fastest to set up but only provides parallelism through threads so only non python functions will be parallelized.
This is good if you have largely numpy code and not too many cores.
2. Processes Scheduler:
Each task (and all of the necessary dependencies) are shipped to different processes. As such it has a larger set
up time. This preforms well for python dominated code.

.. code-block:: python

>>> import dask
>>> dask.config.set(scheduler='processes') # overwrite default with multiprocessing scheduler
>>> # Any hyperspy code will now use the multiprocessing scheduler
>>> s.compute() # uses multiprocessing scheduler

>>> dask.config.set(scheduler='threads') # overwrite default with threading scheduler
>>> # Any hyperspy code will now use the threading scheduler
>>> s.compute() # uses threading scheduler


Distributed Scheduler
^^^^^^^^^^^^^^^^^^^^^

The recommended way to use dask is with the distributed scheduler. This allows you to scale your computations
to a cluster of machines. The distributed scheduler can be used on a single machine as well. ``dask-distributed``
also gives you access to the dask dashboard which allows you to monitor your computations.

Some operations such as the matrix decomposition algorithms in hyperspy don't currently work with
the distributed scheduler.

.. code-block:: python

>>> from dask.distributed import Client
>>> from dask.distributed import LocalCluster
>>> import dask.array as da
>>> import hyperspy.api as hs

>>> cluster = LocalCluster()
>>> client = Client(cluster)
>>> client
>>> # Any calculation will now use the distributed scheduler
>>> s # lazy signal
>>> s.plot() # uses distributed scheduler to compute each chunk and then passes one chunk the memory
>>> s.compute() # uses distributed scheduler

Running computation on remote cluster can be done easily using ``dask_jobqueue``

.. code-block:: python
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move this example here, the example of the gallery needs to be run and I am not that it would be possible/easy on CI.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea I think maybe the distributed workflow would run but not the slurm workflow so you are right we should move it.


>>> from dask_jobqueue import SLURMCluster # or what ever scheduler you use
>>> from dask.distributed import Client
>>> cluster = SLURMCluster(cores=48,
memory='120Gb',
walltime="01:00:00",
queue='research')
>>> cluster.scale(jobs=3) # get 3 nodes
>>> client = Client(cluster)
>>> client

Any calculation will now use the distributed scheduler

.. code-block:: python

>>> s = hs.datasets.example_signals.EDS_SEM_Spectrum()
>>> repeated_data = da.repeat(da.array(s.data[np.newaxis, :]),10, axis=0)
>>> s = hs.signals.Signal1D(repeated_data).as_lazy()
>>> summed = s.map(np.sum, inplace=False)
>>> s.compute()


Limitations
-----------
Expand Down
26 changes: 0 additions & 26 deletions doc/user_guide/signal/generic_tools.rst
Expand Up @@ -345,32 +345,6 @@ data (default, ``True``) or storing it to a new signal (``False``).
(512, 512)
(724, 724)

.. _parallel-map-label:

The execution can be sped up by passing ``parallel`` keyword to the
:py:meth:`~.api.signals.BaseSignal.map` method:

.. code-block:: python

>>> import time
>>> def slow_func(data):
... time.sleep(1.)
... return data + 1
>>> s = hs.signals.Signal1D(np.arange(40).reshape((20, 2)))
>>> s
<Signal1D, title: , dimensions: (20|2)>
>>> s.map(slow_func, parallel=False)
100%|██████████████████████████████████████| 20/20 [00:20<00:00, 1.00s/it]
>>> # some operations will be done in parallel:
>>> s.map(slow_func, parallel=True)
100%|██████████████████████████████████████| 20/20 [00:02<00:00, 6.73it/s]

.. note::

HyperSpy implements *thread-based* parallelism for the :py:meth:`~.api.signals.BaseSignal.map`
method. You can control the number of threads that are created by passing an integer value
to the ``max_workers`` keyword argument. By default, it will use ``min(32, os.cpu_count())``.

.. versionadded:: 1.4
Iterating over signal using a parameter with no navigation dimension.

Expand Down
11 changes: 6 additions & 5 deletions doc/user_guide/signal2d.rst
Expand Up @@ -50,18 +50,19 @@ Sub-pixel accuracy can be achieved in two ways:
# combined upsampling and statistical method
>>> shifts = s.estimate_shift2D(reference="stat", sub_pixel_factor=20)

If you have a large stack of images, you can perform the image alignment step in
parallel by passing ``parallel=True``. You can control the number of threads used
with the ``max_workers`` argument. See the :ref:`map documentation <parallel-map-label>`
for more information.
If you have a large stack of images, the image alignment is automatically done in
parallel.

You can control the number of threads used with the ``num_workers`` argument. Or by adjusting
the scheduler of the :ref:`dask backend <dask_backends>`.

.. code-block:: python

# Estimate shifts
>>> shifts = s.estimate_shift2D()

# Align images in parallel using 4 threads
>>> s.align2D(shifts=shifts, parallel=True, max_workers=4)
>>> s.align2D(shifts=shifts, num_workers=4)

.. _signal2D.crop:

Expand Down
10 changes: 4 additions & 6 deletions hyperspy/_signals/complex_signal.py
Expand Up @@ -31,8 +31,7 @@
)
from hyperspy.docstrings.signal import (
SHOW_PROGRESSBAR_ARG,
PARALLEL_ARG,
MAX_WORKERS_ARG,
NUM_WORKERS_ARG,
LAZYSIGNAL_DOC,
)
from hyperspy.misc.utils import parse_quantity
Expand Down Expand Up @@ -168,7 +167,7 @@ def change_dtype(self, dtype):
'Complex data can only be converted into other complex dtypes!')

def unwrapped_phase(self, wrap_around=False, seed=None,
show_progressbar=None, parallel=None, max_workers=None):
show_progressbar=None, num_workers=None):
"""Return the unwrapped phase as an appropriate HyperSpy signal.

Parameters
Expand All @@ -184,7 +183,6 @@ def unwrapped_phase(self, wrap_around=False, seed=None,
seed of the PRNG to achieve deterministic behavior.
%s
%s
%s

Returns
-------
Expand All @@ -204,11 +202,11 @@ def unwrapped_phase(self, wrap_around=False, seed=None,
phase = self.phase
phase.map(unwrap_phase, wrap_around=wrap_around, seed=seed,
show_progressbar=show_progressbar, ragged=False,
parallel=parallel, max_workers=max_workers)
num_workers=num_workers)
phase.metadata.General.title = f'unwrapped {phase.metadata.General.title}'
return phase # Now unwrapped!

unwrapped_phase.__doc__ %= (SHOW_PROGRESSBAR_ARG, PARALLEL_ARG, MAX_WORKERS_ARG)
unwrapped_phase.__doc__ %= (SHOW_PROGRESSBAR_ARG, NUM_WORKERS_ARG)

def __call__(self, axes_manager=None, power_spectrum=False,
fft_shift=False, as_numpy=None):
Expand Down
11 changes: 4 additions & 7 deletions hyperspy/_signals/eels.py
Expand Up @@ -46,8 +46,7 @@
)
from hyperspy.docstrings.signal import (
SHOW_PROGRESSBAR_ARG,
PARALLEL_ARG,
MAX_WORKERS_ARG,
NUM_WORKERS_ARG,
SIGNAL_MASK_ARG,
NAVIGATION_MASK_ARG,
LAZYSIGNAL_DOC,
Expand Down Expand Up @@ -1032,7 +1031,7 @@ def fourier_ratio_deconvolution(self, ll,

def richardson_lucy_deconvolution(self, psf, iterations=15,
show_progressbar=None,
parallel=None, max_workers=None):
num_workers=None):
"""1D Richardson-Lucy Poissonian deconvolution of
the spectrum by the given kernel.

Expand All @@ -1047,7 +1046,6 @@ def richardson_lucy_deconvolution(self, psf, iterations=15,
increasing the value will increase the noise amplification.
%s
%s
%s

Raises
------
Expand Down Expand Up @@ -1085,8 +1083,7 @@ def deconv_function(signal, kernel=None,

ds = self.map(deconv_function, kernel=psf, iterations=iterations,
psf_size=psf_size, show_progressbar=show_progressbar,
parallel=parallel, max_workers=max_workers,
ragged=False, inplace=False)
num_workers=num_workers, ragged=False, inplace=False)

ds.metadata.General.title += (
' after Richardson-Lucy deconvolution %i iterations' %
Expand All @@ -1096,7 +1093,7 @@ def deconv_function(signal, kernel=None,
'_after_R-L_deconvolution_%iiter' % iterations)
return ds

richardson_lucy_deconvolution.__doc__ %= (SHOW_PROGRESSBAR_ARG, PARALLEL_ARG, MAX_WORKERS_ARG)
richardson_lucy_deconvolution.__doc__ %= (SHOW_PROGRESSBAR_ARG, NUM_WORKERS_ARG)

def _are_microscope_parameters_missing(self, ignore_parameters=[]):
"""
Expand Down