# Using Dask within an HPC Environment
### RMACC 2022 Tutorial | 4 August, 2022

<img src=images/NCAR-contemp-logo-blue.png width=500px alt="NCAR Logo">

Brian Vanderwende  
HPC User Support Consultant  
[vanderwb@ucar.edu](mailto:vanderwb@ucar.edu)
---

#### Setting up Python to run Dask

The `miniconda` package environment manager makes it very easy to deploy Dask anywhere.

1. If conda is not available on your system, first install it:

```
# For example - on x86 Linux platforms
wget https://repo.anaconda.com/miniconda/Miniconda3-latest-Linux-x86_64.sh
bash Miniconda3-latest-Linux-x86_64.sh
# Now start a new shell to finish conda integration into your environment
```

2. Use conda (or mamba) to create a new Python environment with required and [useful] packages:

```
conda create -n my-dask-env -c conda-forge dask dask-jobqueue [matplotlib python-graphviz jupyterlab ipywidgets dask-labextension globus-cli]
```

3. Activate the environment and run `python`, `ipython`, or use in Jupyter as a language kernel

#### Downloading the dataset

If you wish to follow along during or after the tutorial session, you will need to download the data we use in the following commands. I have made these data available via a Globus endpoint, which you can access at https://www.globus.org. Search for the *collection* called `RMACC 2022 - HPC Dask Tutorial` and download the data to the main directory of the GitHub repo (the one containing this notebook).

## Arrays, series, and dataframes - why we like Dask
Complex data structures enable data science in Python:
* [NumPy arrays](https://numpy.org/doc/stable/)
* [Pandas series and dataframes](https://pandas.pydata.org/)
* [XArray arrays](https://docs.xarray.dev/)

*But datasets are getting larger all of the time! What if my data science is too big to fit into memory, or takes too long to complete an analysis?*

### Dask increases the size of possible work from *fits-in-memory* to *fits-on-disk* (sometimes doing it faster) via distributed parallelism

That said, **Dask should only be used when necessary as it incurs overhead.** Avoid Dask if you can easily:
* Speed up your code with use of compiled routines in libraries like NumPy
* Profile and optimize your serial code to minimize bottlenecks
* Read in a subset of data to gain the insight you need

## Warming up with some Pandas


In [None]:
# Make plots display inline in the notebook
%matplotlib inline

At NCAR, we have created a nice utility called `qhist` to provide historical data on interactive and batch compute jobs.

#### Load historical job data from 2022 June

In [None]:
import pandas as pd
june_jobs = pd.read_csv('data/jobs-202206.dat')
jj = june_jobs

In [None]:
# Display the top five rows of the dataframe
jj.head()

#### How many jobs did users run in June?

In [None]:
len(jj)

#### How many jobs ran in each queue?

In [None]:
# Create a "groupby object" based on the selected columns
# which can return info like the count of each type of the column value
jj.groupby('Queue').size()

In [None]:
# Here we easily plot the prior data using matplotlib from pandas
jj.groupby('Queue').size().plot.bar(logy = True)

#### Where do Dask jobs run?

In [None]:
# Select only one column of data
jj['Job Name'].head()

In [None]:
# Get the number of each value in Job Name (similar to size above)
jj['Job Name'].value_counts().head()

In [None]:
# Create a new dataframe of job entries with the Job Name "dask-worker"
dj = jj[jj['Job Name'] == 'dask-worker']
dj.groupby('Queue').size()

*Is this what we expect? Dask tasks tend to be **bursty, high-throughput** computing.*

#### What are typical worker CPU and memory needs?

In [None]:
# groupby operations can be on multiple columns and ones generated by manipulating a column
dj.groupby(['NCPUs', dj['Req Mem (GB)'] // 10 * 10]).size()

#### How much of the request memory is wasted?

In [None]:
# Creating a new column
dj['Unused Mem'] = dj['Req Mem (GB)'] - dj['Used Mem(GB)']

In [None]:
# Check all datatypes of the dataframe (here we see usedmem did not get interpreted properly)
dj.dtypes

In [None]:
# Let's cast used mem as the dtype that we want (float)
dj['Used Mem(GB)'] = dj['Used Mem(GB)'].astype(float)

In [None]:
# To avoid the above problem, we need to make sure we copy data in our dask job dataframe, and not reference it
dj = jj[jj['Job Name'] == 'dask-worker'].copy()
dj['Used Mem(GB)'] = dj['Used Mem(GB)'].astype(float)
dj.dtypes

In [None]:
dj['Unused Mem (%)'] = (dj['Req Mem (GB)'] - dj['Used Mem(GB)']) / dj['Req Mem (GB)'] * 100.0

In [None]:
dj.groupby(dj['Unused Mem (%)'] // 10 * 10).size().plot.bar(xlabel = '% Unused', ylabel = 'Number of Jobs')

*Typically on busy systems, wasted resources means less job throughput, meaning...*  

**LONGER WAIT TIMES :-(**

#### How much of the requested walltime do workers actually use?

In [None]:
dj['Job Start'].head()

In [None]:
# This could also be done with a list comprehension!
time_cols = ['Job Submit', 'Job Start', 'Job End']

for col in time_cols:
    dj[col] = pd.to_datetime(dj[col])

In [None]:
dj.dtypes

In [None]:
dj['elapsed'] = (dj['Job End'] - dj['Job Start']).dt.total_seconds() / 3600

In [None]:
dj.groupby(dj['Walltime (h)'] // 1).mean()['elapsed']

#### How big is the June 2022 dataset?

In [None]:
# Shell commands can be run with "!' in a notebook
!ls -lh data/jobs-202206.dat

In [None]:
# dataframe method to get metadata
jj.info()

In [None]:
import sys

# Define function to display variable size in MiB
def var_size(in_var):
    result = sys.getsizeof(in_var) / 1024 / 1024
    print(f"Size of variable: {result:.2f} MiB")

In [None]:
var_size(jj)

In [None]:
# deep option will include memory used by mutable objects
jj.info(memory_usage='deep')

---
## Scaling up to larger datasets

In [None]:
!ls -lh data/jobs-*

#### Get the busiest day in the dataset
Pandas can concatenate data to load data spread across multiple files:
```
import glob
df = pd.concat(pd.read_csv(f) for f in glob.glob("data/jobs-*"))
```
*However, this may exhaust memory if the data are large enough*  

For this problem, we can first get the busiest day of each month, and then find the busiest among the months

In [None]:
import time, glob

def busy_day(pbs_file):
    print(pbs_file)
    df = pd.read_csv(pbs_file, parse_dates = time_cols)
    result = df.groupby(df['Job Start'].dt.date).size().agg(['idxmax','max'])
    result = result.to_frame().transpose()
    return result

In [None]:
%%time
bd = pd.concat([busy_day(f) for f in glob.glob("data/jobs-*")]).set_index('idxmax')
bd['max'].astype(int).idxmax()

***Dask allows us to conceptualize all of these files as a single dataframe!***

In [None]:
# Let's do a little cleanup (Python will not return the memory to the OS, however)
del jj, dj, bd

---
<img src=images/dask_horizontal.svg width=800px alt="Dask Logo"></img>  
*Image credit: Anaconda, Inc. and contributors*

## What is Dask?

Dask is a Python package for scaling up data science workflows in parallel.

* Features task-based parallelism
* Can easily scale from 1 to 1000s of workers
* Inherits subset of objects and methods from libraries like NumPy, Pandas, XArray, and scikit-learn
* Prototype code serially and then easily parallelize using Dask
* Smoothly run the same workflow on laptops to clusters via `schedulers`

#### Dask collections

A Dask *collection* is the fundamental thing we wish to parallelize. Most of the time, you will probably use one of the following *high-level* (big)data structures:

| Collection | Serial | Dask |
|-|-|-|
| Arrays | numpy.array | dask.array.from_array |
| Dataframes | pandas.read_csv | dask.dataframe.read_csv |
| Unstructured | [1,2,3] | dask.bag.from_sequence([1,2,3]) |

Dask also features two *low-level* collection types - `delayed` and `futures`.

* **delayed** - run any arbitrary Python function using Dask task parallelism (think looped function calls)
* **futures** - similar to delayed but allows for concurrency on the client (think backgrounded processes)

#### Lazy evaluation

Many programming languages (*including Python!*) use eager/strict evaluation. For example:

In [None]:
x = 4
x = x + 2
x

As soon as the cell is run, the assignment commands are executed and the value of `x` is changed.  

Most Dask collections instead use *lazy evaluation*. When a command is run, a `delayed` object is returned that contains the *task graph* - providing instructions to the Dask workers on what to do. However, nothing happens until you tell it to with `.compute()`!

*The one exception is Dask futures, which are executed as soon as the requisite data are available.*

#### Using Dask at its most basic...

In [None]:
import numpy as np
import dask.array as da

# Create a 2D NumPy array
narr = np.random.random((10000,10000))

In [None]:
# Split the array into chunks for Dask to parallelize over
darr = da.from_array(narr, chunks=(5000, 5000))

In [None]:
var_size(narr)

In [None]:
# Remember, this variable is only a facimile of the full array which will be split across workers
var_size(darr)

In [None]:
# Dask does give us ways to see the full size of the data (often much larger than your client machine!)
print("Size of dataset:  {:.2f} MiB".format(darr.nbytes / 1024**2))

In [None]:
darr

In [None]:
%%time
# The %%time magic measures the execution time of the whole cell
narr.mean()

In [None]:
%%time
# Remember, we are not doing any computation here, just constructing our task graph
tg = darr.mean()

In [None]:
%%time
tg.compute()

In [None]:
del narr, darr

####
![Dask flow](images/dask-overview.svg)  
*Image credit: Anaconda, Inc. and contributors*
---
## Dask Distributed - using *clusters* to scale

#### The Dask scheduler - our task orchestrator

When a computational task is submitted, the Dask distributed *scheduler* sends it off to a Dask *cluster* - simply a collection of Dask *workers*. A worker is typically a separate Python process on either the local host or a remote machine.  

* **worker** - a Python interpretor which will perform work on a subset of our dataset
* **cluster** - an object containing instructions for starting and talking to workers
* **scheduler** - sends tasks from our task graph to workers
* **client** - a local object that points to the scheduler (*often local but not always*)

There are many ways to spread these components across local and remote computers, but we will focus on the most common in HPC contexts:

1. Create a cluster pointing to local or scheduled remote resources
2. Create a client pointing to a local scheduler
3. Submit work to the client
4. Run `.compute()` to begin worker execution
5. Repeat steps 3+4 as necessary
6. Shut down the Dask client and workers

## Let's return to our HPC job dataset

#### Create a "LocalCluster" Client with Dask

In [None]:
import glob
from dask.distributed import LocalCluster, Client
cluster = LocalCluster()
cluster

In [None]:
client = Client(cluster)
client

#### Load our jobs dataset into a Dask dataframe

In [None]:
import dask.dataframe as dd
job_files = glob.glob('data/jobs-*.dat')

In [None]:
# Avoid reading the data in with pandas, even if it can fit within your client machine's memory
djj = dd.read_csv(job_files, parse_dates = time_cols)
djj

* As promised, data has not yet been read into the dataframe (lazy evaluation)
* So how does Dask know the `dtype` of each column?
* Note the number of tasks... do they match the number of workers?

In [None]:
djj.head()

In [None]:
# We could use the Dask recommendation, but some inspection of the data reveals that
# the value used for missing is the problem: the string '-'
djj = dd.read_csv(job_files, parse_dates = time_cols, na_values = '-')

In [None]:
djj.head()

#### Now we can use Dask to find the busiest day in the dataset

In [None]:
# Many of the pandas operations work the same way on a Dask dataframe
top_day = djj.groupby(djj['Job Start'].dt.date).size()
top_day

In [None]:
%%time
r = top_day.compute()
r.agg(['idxmax','max'])

#### What about the mean memory usage of Dask jobs by month?

In [None]:
r = djj[djj['Job Name'] == 'dask-worker'].groupby(djj['Job Start'].dt.month).mean()['Used Mem(GB)']

In [None]:
r.compute()

In [None]:
# Some operations allowed in pandas would not be reliable to run in parallel, so we must adjust our code
ddj = djj[djj['Job Name'] == 'dask-worker']
t = ddj.groupby([ddj['Job Start'].dt.year, ddj['Job Start'].dt.month]).mean()['Used Mem(GB)']
r = t.compute().sort_index()

In [None]:
r

In [None]:
# This only uses the results data computed by Dask, so it should not launch any new parallel work
r.plot.area(xlabel = 'Month', ylabel = 'Used Mem (GB)')

#### Nice, but what did Dask do?

In [None]:
# Requires ipywidgets
t.dask

In [None]:
# Requires python-graphviz (not pygraphviz!)
t.visualize()

Using a client with a `LocalCluster` gives us more insight and control over the simple thread pool approach

* Set number of workers and threads-per-worker
* *Plus many other configuration options!*
* See the client / cluster diagnostics HTML page

In [None]:
# Shutdown the client & local cluster
client.shutdown()

---
## Using Dask on an HPC cluster

Fundamentally, the only thing we need to do to switch to a cluster environment is provide Dask with a way to access workers on other hosts.

So we switch from a `LocalCluster` to a distributed one that uses SSH, Kubernetes, or a batch scheduler to access worker resources. Typical HPC environments will use  *Slurm*, *PBS*, *LSF*, etc.

#### `dask-jobqueue` provides batch cluster types

```
from dask_jobqueue import SLURMCluster

cluster = SLURMCluster(processes=1,
                       threads=2,
                       memory="10GB",
                       project="ABC1234",
                       walltime="01:00:00",
                       queue="htc")
```

This provides a template with which the Dask and batch schedulers can spin up workers.


#### Let's start a PBS Cluster
*The following cells are designed to run on the Casper system at NCAR. The steps from here on out will need to be modified for your particular system.*

In [None]:
from dask_jobqueue import PBSCluster
from dask.distributed import Client

In [None]:
cluster = PBSCluster(
     cores=1,
     memory='2GiB',
     processes=1,
     local_directory='$TMPDIR/dask/spill',
     resource_spec='select=1:ncpus=1:mem=2GB',
     queue='casper',
     walltime='10:00',
     interface='ib0'
)

#### Validate the configuration by printing the job script

In [None]:
print(cluster.job_script())

Things I'm looking for:
1. PBS resource request matches what I'm assigning to the worker (e.g., memory units!)
2. Where are my job logs going? (for debugging)
3. Are there configuration settings I didn't expect?

#### Scaling the batch cluster

By default, our `PBSCluster` starts with no active workers. We need to *scale* the cluster up to fit our needs. Clusters can be scaled number or resource. The following are equivalent:
```
cluster.scale(4)
cluster.scale(cores=4)
cluster.scale(memory='8 GB')
```
Running these commands will immediately submit batch jobs to start workers, so it's typical to wait until we're ready to do work...

In [None]:
#client = Client(cluster)
# It can be nice to wait longer for client success on busy / sluggish systems
client = Client(cluster, timeout="60s")

In [None]:
# Lets use our job scheduler to watch the workers spin up
!qstat -u vanderwb

In [None]:
# The wait blocks our notebook until the workers are ready - a synchronization point
cluster.scale(4)
client.wait_for_workers(4)
client.ncores()

In [None]:
!qstat -u vanderwb

#### Now, let's again find the busiest day in our logs

In [None]:
job_files = glob.glob('data/jobs-*.dat')
djj = dd.read_csv(job_files, parse_dates = time_cols, na_values = '-')
top_day = djj.groupby(djj['Job Start'].dt.date).size()

In [None]:
%%time
r = top_day.compute()
r.agg(['idxmax','max'])

#### What if we double the size of our cluster?

In [None]:
cluster.scale(8)
client.wait_for_workers(8)
client.ncores()

*Are we creating 8 new workers here?*

In [None]:
%%time
r = top_day.compute()
r.agg(['idxmax','max'])

In [None]:
# It's always a good idea to scale down when done - otherwise we waste compute resources
cluster.scale(0)
client.ncores()

In [None]:
client.shutdown()

#### Clusters can also adaptively scale

For interactive, exploratory work, *adaptive scaling* can be useful. This allows the cluster to dynamically scale up and down based on the (Dask) scheduler's estimation of resource needs. This capability is highly customizable, but one basic method would be to set bounds on the number of worker jobs that can be used:
```
cluster.adapt(minimum=0, maximum=12)
```

### Performance Considerations
* Ideally, you want to have more partitions than workers (*and evenly divisible!*) so that no workers remain idle for long
* Using **processes** for workers avoids issues with Python's global interpretor lock (GIL) and is a reasonable default approach
* Using **threads** for workers may be beneficial when running libraries that bypass the GIL with compiled code like NumPy  
  *If you use threads, make sure you pay attention to settings like `OMP_NUM_THREADS` when using C extensions*
* Very deep task graphs (millions of tasks) can have overhead in the hours - use larger partitions if possible in this scenario
* When using a distributed cluster, manually **persist** reused data into worker memory (loading from RAM is much faster than disk)

```
# Here we persist the dataframe after setting the index, since that is a heavyweight operation we do not wish to repeat!
df = dd.read_csv(data_files)
df = df.set_index('timestep')
df = client.persist(df)

res1 = df.groupby('location').size().compute()
res2 = df.groupby('location').mean().compute()
```

### Configuring Dask
When scaling up from local (workstation) to distributed (HPC cluster) usage, it becomes very important to customize how Dask works. There are multiple ways to change configuration, from high to low precedence:

1. Runtime setting within Python itself
2. Environment variables before launching Python
3. User-specific YAML config file in `~/.config/dask`
4. python/etc/dask
5. /etc/dask or $DASK_ROOT_CONFIG if set

In [None]:
# YAML configuration settings can be retrieved from within Python too
from dask import config
config.refresh()
config.get('jobqueue.pbs')

## Logs and tracking the cluster and worker state

A lot of information and debug output can be found in the worker logs. When using `dask-jobqueue`, these logs will be written to the job log location shown in the job script above. Write them to an easy to find location, and be mindful that you may generate a lot of files!

In [None]:
!ls dask-worker-logs

*Using a directory specific to the workflow, date, or job ID can help avoid log file confusion.*

You can also print out the logs from **active** workers using the following client method:
```
client.get_worker_logs()
```

#### Setting log verbosity
Dask uses standard Python logging levels, shown here from least to most verbose:  
```
CRITICAL -> ERROR -> WARNING -> INFO -> DEBUG
```
These settings can be set in YAML config files or in the script. For example:
```
logging:
  distributed.client: info
  distributed.nanny: info
  distributed.worker: debug
```

## Some observations from NCAR support

#### Workers take too long to start

For any number of reasons, workers may take a while to start, causing the `nanny` to terminate the worker and your cluster scaling to fail. You may see messages like this in your logs:
```
2022-07-19 14:25:10,663 - distributed.nanny - INFO - Closing Nanny at 'tcp://10.12.206.4:44646'.
2022-07-19 14:25:10,667 - distributed.nanny - INFO - Nanny asking worker to close
2022-07-19 14:25:13,617 - distributed.nanny - WARNING - Worker process still alive after 3.9999900817871095 seconds, killing
2022-07-19 14:25:13,674 - distributed.dask_worker - INFO - Timed out starting worker
2022-07-19 14:25:13,677 - distributed.dask_worker - INFO - End worker
```
**Solution:** increase the `death-timeout` parameter from 60 seconds to a higher number. Note that this parameter is used to terminate desync'd workers, so avoid setting it too high!

#### Worker is killed
```
KilledWorker: ('__call__-6af7aa29-2a09-45f3-a5e2-207c06562672', <Worker 'tcp://10.194.211.132:11927', memory: 0, processing: 1>) 
```

There are many possibilities here unfortunately, but common ones can include out-of-memory conditions (easier when spilling to disk is disabled) or running out of disk space/quota.

#### Running your Dask tasks serially is powerful in debugging
If you can temporarily switch to the non-distributed scheduler (e.g., you aren't using any asynchronous features like futures), you can run that one in single-thread mode. Then, you can use the Python debugger or the `%debug` cell magic in a notebook to examine the call stack upon an exception.
```
da.compute(scheduler="single-threaded")
```
[This StackOverflow post](https://stackoverflow.com/questions/44193979/how-do-i-run-a-dask-distributed-cluster-in-a-single-thread) gives detail on running the distributed scheduler with a single thread, but it is not as simple.

#### A not-insignificant number of errors are fixed by upgrading software...

## Suggested Best Practices

**General Dask use**
* Before starting, analyze whether your problem truly justifies Dask overhead
* If possible, prototype your code serially and then augment with distributed Dask

**Dask configuration**
* For reproducibility, ease of sharing, and clarity, put required settings in the script or notebook
* Quality of life and/or community standard practices can go in user YAML config file

**Larger scale considerations**
* Many workers can put high load on network file systems in spill situations; disable spill or use local storage
* Dask has *experimental* support for UCX for worker communication to use the full speed of a high-speed network

**Resource utilization**
* Choose generous memory defaults at first, **but then refine/reduce!**
* Avoiding swapping to disk as much as possible; swap causes MAJOR performance penalties
* Delete your cluster object once done with it to terminate idle worker jobs

**Job turnaround**
* Keep worker resource needs low: most times this will decrease scheduler wait times
* Use adaptive scaling during exploratory work and manually scale in production
* If your scheduler is tuned for large jobs, consider using [dask-mpi](http://mpi.dask.org/en/latest/) instead of dask-jobqueue

## Resources for further learning
* [Dask documentation](https://docs.dask.org/en/stable/)
* [Dask Distributed documentation](https://distributed.dask.org/en/stable/)
* [Dask-jobqueue documentation](https://jobqueue.dask.org/en/latest/)
* [Using Dask with GPUs](https://docs.dask.org/en/stable/gpu.html)

---
## Addendum: Using JupyterLab on HPC systems

Ideally, you have access to JupyterHub, which provides a web portal to notebooks, terminals, and Dask Distributed dashboards. If not, you will need to create SSH tunnels to forward the port for Jupyter *and, if desired, the Dask dashboard.*  

**Remote System**
```
conda activate my-dask-env
jupyter lab --no-browser [--port 8888]
```
**Local System**
```
ssh -N -L 8888:localhost:8888 remote.hpc.system.edu
```
Then, you would navigate to `http://localhost:8888` in your browser and sign into JupyterLab. Once you start a distributed dask cluster, you will then have its port number (8787 by default if unoccupied).  

Fortunately, you do not need to forward the Dask cluster port, as Jupyter can proxy it for you. You instead can use the following URL in your browser:
```
http://localhost:<jupyter_port>/proxy/<cluster_port>/status
```