## Using dask on a SLURM HPC
This notebook demonstrate how we can run tasks in parallel using dask and SLURM 

### Installing the packages
```
pip install dask
pip install distributed
pip install cloudpickle
```

### Starting the dask cluster. 
There is a good explanation of how this works on [Matthew Rocklin's blog](http://matthewrocklin.com/blog/work/2016/02/26/dask-distributed-part-3). In summary, we need to 

1. start `dask-scheduler` on one machine. This is the node python connects to.
2. start an arbitrary amound of `dask-worker`s on the cluster using SLURM. 

#### 1. Starting `dask-scheduler`
Fairly simple. Open a shell, type `dask-scheduler`, hit Enter. You are good to go. 
You will see something like:

```
distributed.scheduler - INFO - -----------------------------------------------
distributed.scheduler - INFO -   Scheduler at:        10.114.185.14:8786  
distributed.scheduler - INFO -        http at:        10.114.185.14:9786
distributed.bokeh.application - INFO - Web UI: http://10.114.185.14:8787/status/
distributed.scheduler - INFO - -----------------------------------------------
```

You will need the IP and Port listed at *Scheduler at* to connect the workers and to connect from Python. 


####  2. Starting dask-workers.
I prepared a simple shell-script for that. Open a shell, run `./submit-workers.sh <SCHEDULER_HOST> <NUMBER_OF_WORKERS>`. It will submit your workers to slurm. 

**Example**:
In the above case, this would be
```
./submit-workers.sh 10.114.185.14:8786 10
```

**Note**: Each worker already uses 8 threads. The above command would therefore reserve you 80 CPUs. You can adjust that and other options in `./start-worker.sh`. 

### Let's try it

In [26]:
from distributed import Executor, progress

In [27]:
e = Executor("10.114.185.14:8786")

Dummy function that we want to apply: 

In [24]:
def heavy_stuff(number):
    for i in range(1000000):
        x = number ** number
    return number ** 2

Dummy objects:

In [28]:
objects = [x for x in range(20)]

### Running this locally takes some time: 

In [29]:
[x for x in map(heavy_stuff, objects)]

[0,
 1,
 4,
 9,
 16,
 25,
 36,
 49,
 64,
 81,
 100,
 121,
 144,
 169,
 196,
 225,
 256,
 289,
 324,
 361]

### Running it on the cluster: 

In [33]:
future = e.map(heavy_stuff, objects)

In [34]:
progress(future)

In [35]:
[f.result() for f in future]

[0,
 1,
 4,
 9,
 16,
 25,
 36,
 49,
 64,
 81,
 100,
 121,
 144,
 169,
 196,
 225,
 256,
 289,
 324,
 361]