# Using dask-distributed on keeling


<div class="alert alert-block alert-success">
  1. Packages to install via <b>pip</b> or <b>conda</b>
</div>

- `dask`
- `dask-distributed`
- `ipywidgets`

<div class="alert alert-block alert-warning">
  2. Other steps
</div>

Copy `/data/keeling/a/snesbitt/.config/dask/jobqueue.yaml` to your `$HOME/.config/dask/jobqueue.yaml`.

## Notes
Due to the configuration of keeling, you can either run on g or h nodes only.  This setup will only work on g nodes.  If you want to work on h nodes, then edit `$HOME/.config/dask/jobqueue.yaml` and change `g20` to `h20` under `job-extra:`. Don't try to use both, you will have issues with the network configuration.


<div class="alert alert-info">
   3. Getting started
</div>


Start up your notebook on the keeling head node.  You may want to use the `screen` command to have a semi-permanent session running on there.  You can use `screen -r` to re-enter the session if you get disconnected.

Start up a jupyter notebook session as normal on that session.

`jupyter notebook --port=XXXX --ip=127.0.0.1 --no-browser`

Then, ssh to keeling using that port:

`ssh keeling.earth.illinois.edu -L XXXX:127.0.0.1:XXXX`

<div class="alert alert-info">
4. Coding using dask
</div>




In [1]:
from dask_jobqueue import SLURMCluster

The configuration in `jobqueue.yaml` will use 20 cores on each `keeling` node by default.

 However we can setup our cluster directly using jupyter-notebook:

In [2]:
cluster = SLURMCluster(
                       queue="seseml",
                       memory='10GB',
                       cores=10,
                       processes=1,
                       walltime='02:30:00',
                       scheduler_options={
                                          'host': '172.22.179.3:7222', 
                                          'dashboard_address': '7999',
                                          },                        
                      )

* queue: the nodes to use within Keeling (e.g., `'sesempi'`). To check wich queue to use you can check it by using `sinfo` in a keeling terminal
* memory: ammount of RAM memory per job to allocate (e.g., `'10GB'`)
* cores: number of cores per job to use (e.g., `'10'`).
* processes: number of processess (jobs) to use (e.g., `'1'`)
* walltime: maximum expected time to run the script. (e.g., `'02:30:00'`)
* scheduler_options: dictionary with kwargs 
    * `'host': '172.22.179.3:ZZZZ'`
    * `'dashboard_address': 'WWWW'`. This `'WWWW'` port will allow to see the dask-dashboard on your browser. It has to be also tunneled using `ssh keeling.earth.illinois.edu -L WWWW:127.0.0.1:WWWW`


Now we can scale our cluster

In [3]:
cluster.scale(1)

In [4]:
cluster

Tab(children=(HTML(value='<div class="jp-RenderedHTMLCommon jp-RenderedHTML jp-mod-trusted jp-OutputArea-outpu…

Check wich nodes were assigned

In [5]:
%%bash
squeue -u $USER

             JOBID PARTITION     NAME     USER ST       TIME  NODES NODELIST(REASON)
            590773    seseml dask-wor alfonso8  R       0:00      1 keeling-d01


In [6]:
from dask.distributed import Client

In [7]:
client = Client(cluster)

In [8]:
client

0,1
Connection method: Cluster object,Cluster type: dask_jobqueue.SLURMCluster
Dashboard: http://172.22.179.3:7999/status,

0,1
Dashboard: http://172.22.179.3:7999/status,Workers: 0
Total threads: 0,Total memory: 0 B

0,1
Comm: tcp://172.22.179.3:7222,Workers: 0
Dashboard: http://172.22.179.3:7999/status,Total threads: 0
Started: Just now,Total memory: 0 B


Here is where you define the function for computation to map to the cluster.

In [9]:
import time
def slow_increment(x):
    time.sleep(1)
    return x + 1

And here, let's map the jobs to the cluster.  This could be a file list or a range of numbers as here.

In [10]:
from dask.distributed import progress

In [11]:
futures = client.map(slow_increment,range(1000))

In [12]:
progress(futures)

VBox()

Can we speed it up? Let's scale up to 8 jobs

In [None]:
cluster.scale(8)
client = Client(cluster)

In [None]:
%%bash
squeue -u $USER

In [None]:
futures = client.map(slow_increment,range(1000))
progress(futures)

Dask-distributed functionalities:

- [Dask DataFrame](https://docs.dask.org/en/latest/dataframe.html) to work with large Pandas dataframe
- [Dask Array](https://docs.dask.org/en/latest/array.html) mimics Numpy arrays
- [Dask Bag](https://docs.dask.org/en/latest/bag.html) mimics iterators, Toolz, and PySpark 
- [Dask Delayed](https://docs.dask.org/en/latest/delayed.html) mimics for loops and wraps custom code
- [Dask concurrent.futures](https://docs.dask.org/en/latest/futures.html) interface provides general submission of custom tasks

<div class="alert alert-info">
5. Xarray and Dask
</div>

In [None]:
import xarray as xr

#### Sample dataset

In [None]:
ds = xr.tutorial.open_dataset('air_temperature',
                              chunks={'lat': 25, 'lon': 25, 'time': -1})
ds

In [None]:
# Selecting the air datarray
da = ds['air'] 
da

In [None]:
# computing the monthly mean
da2 = da.groupby('time.month').mean('time')

In [None]:
# computing the anomaly
da3 = da - da2

In [None]:
# this is a lazy dataset, computation must be performed
da3

In [None]:
# Dask and xarray will use the already setup cluster to compute da3
# To check dask-dashboard status you can use http://127.0.0.1:WWWW/ in your browser after tuneling the WWWW port
da3.compute()

In [None]:
# plotting data 
da.resample(time='1w').mean('time').std('time').load().plot(figsize=(12, 8))

<div class="alert alert-info">
5. Finally, client and cluster must be closed. 
</div>


In [None]:
client.close()

In [None]:
cluster.close()