## Distributed Computing and the dask dashboard

Distributed computing is the idea that you can split one task into many smaller tasks divide those tasks among different workers and the agrigate those results!

What is cool about distributed computing is that this idea __scales__!  You can set this up on your home computer or on a high performance workstation or even on a cluster.  Distributed computing is also nice because you can set limits and tell exactly how much ram you would like to use, how many cores you have or even target multiple GPUs.



### How does this work?

Behind the scenes `dask` handles all of the lazy computations for hyperspy.  There are different scheudlers but the most powerful one is the [distributed](https://distributed.dask.org/en/latest/) backend. 

<img style="left" src="imgs/dask-cluster-manager.svg">

#### Pros:
1. The distributed backend is more bullet proof and won't cause crashes due to RAM
2. The dashboard allows you to profile and identify slow code to improve
3. Sharing of data between nodes is fairly efficient
4. One workflow scale from desktop--> HPC 

#### Cons (Distributed Computing is still a point of active development in hyperspy):
1. .hspy format isn't currently supported (only the .zspy format is)
2. Certain functions aren't currently supported. (The known ones are Lazy decompositions and Orientation mapping in pyxem)
3. Sometimes slower for small operations


#### Starting a distributed cluster the easy way:

This just starts a local cluster on your laptop or desktop

```python 
from dask.distributed import Client
client = Client()  # set up local cluster on your laptop
client
```

#### Starting a distributed cluster the moderately hard way

This starts a cluster running on multiple computers on a HPC cluster using the Slurm scheduler. 

```python 
from dask_jobqueue import SLURMCluster

# you can also use smaller jobs which might get scheduled faster
cluster = SLURMCluster(cores=40,
                       memory ='120GB',
                       walltime="02:00:00", queue='research',)  # Each job takes up one entire Node on the compute cluster

from dask.distributed import Client
cluster.scale(job=5)# ask for 5 nodes
client = Client(cluster) # this client now has 200 cores and 600 GB of RAM  to do whatever we want!
```

## Let's Try it

In [1]:
from dask.distributed import Client
client = Client()  # set up local cluster on your laptop
client

Perhaps you already have a cluster running?
Hosting the HTTP server on port 40615 instead


0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:40615/status,

0,1
Dashboard: http://127.0.0.1:40615/status,Workers: 4
Total threads: 12,Total memory: 15.43 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:45963,Workers: 4
Dashboard: http://127.0.0.1:40615/status,Total threads: 12
Started: Just now,Total memory: 15.43 GiB

0,1
Comm: tcp://127.0.0.1:46221,Total threads: 3
Dashboard: http://127.0.0.1:38351/status,Memory: 3.86 GiB
Nanny: tcp://127.0.0.1:40675,
Local directory: /tmp/dask-scratch-space/worker-xtokyp4y,Local directory: /tmp/dask-scratch-space/worker-xtokyp4y

0,1
Comm: tcp://127.0.0.1:33627,Total threads: 3
Dashboard: http://127.0.0.1:44771/status,Memory: 3.86 GiB
Nanny: tcp://127.0.0.1:36465,
Local directory: /tmp/dask-scratch-space/worker-2bb2e2tk,Local directory: /tmp/dask-scratch-space/worker-2bb2e2tk

0,1
Comm: tcp://127.0.0.1:46393,Total threads: 3
Dashboard: http://127.0.0.1:41635/status,Memory: 3.86 GiB
Nanny: tcp://127.0.0.1:42221,
Local directory: /tmp/dask-scratch-space/worker-nke1b5rx,Local directory: /tmp/dask-scratch-space/worker-nke1b5rx

0,1
Comm: tcp://127.0.0.1:43729,Total threads: 3
Dashboard: http://127.0.0.1:37899/status,Memory: 3.86 GiB
Nanny: tcp://127.0.0.1:42651,
Local directory: /tmp/dask-scratch-space/worker-m3uo6j1f,Local directory: /tmp/dask-scratch-space/worker-m3uo6j1f


In [2]:
import hyperspy.api as hs

In [4]:
# load the lazy dataset
s = hs.load("lazy_dataset.zspy", lazy=True)

In [None]:
# Lets try summing the data 
summed = s.sum(axis=(2,3))

In [None]:
# And then computing the data
summed.compute()

In [None]:
# What about a bit longer of a task?
from scipy.ndimage import center_of_mass

In [None]:
import time
def center_of_mass_function_sleep(image, threshold):
    bool_image = image > threshold
    com = center_of_mass(image * bool_image)
    time.sleep(0.01)
    return com

In [None]:
# center of mass with a sleep added
s_com = s.map(center_of_mass_function_sleep,
              threshold=20,
              inplace=False)

In [None]:
# compute the center of mass
s_com.compute()

## Extra information:

If you want a more detailed walkthrough of the dashboard you can always find more information [here](https://docs.dask.org/en/stable/dashboard.html) A lot of times if you are seeing some performance hits it is good to see why that is occuring.  

Often it is a result of:

1. Overly complicated task graphs. Consider saving intermediary results and reloading them!
2. Transfer speeds between different workers. Consider larger chunks or increasing your networking speed!
3. Slow I-O.  Look into buying arrays of harddisks. (__Seriously...__ they are realatively cheap and your read speed will often scale with the number of disks)

My biggest piece of advice is talk to your IT department. 