# Dask Overview

Dask is a flexible library for parallel computing in Python that makes scaling out your workflow smooth and simple. On the CPU, Dask uses Pandas (NumPy) to execute operations in parallel on DataFrame (array) partitions.

Dask-cuDF extends Dask where necessary to allow its DataFrame partitions to be processed by cuDF GPU DataFrames as opposed to Pandas DataFrames. For instance, when you call dask_cudf.read_csv(…), your cluster’s GPUs do the work of parsing the CSV file(s) with underlying cudf.read_csv(). Dask also supports array based workflows using CuPy.

## When to use Dask
If your workflow is fast enough on a single GPU or your data comfortably fits in memory on a single GPU, you would want to use cuDF or CuPy. If you want to distribute your workflow across multiple GPUs, have more data than you can fit in memory on a single GPU, or want to analyze data spread across many files at once, you would want to use Dask.

One additional benefit Dask provides is that it lets us easily spill data between device and host memory. This can be very useful when we need to do work that would otherwise cause out of memory errors.

### Multi-GPU and multi-node scaling

**Dask** is a task-based library for parallel scheduling and execution. Although it is certainly possible to use the task-scheduling machinery directly to implement customized parallel workflows (we do it in NVTabular), most users only interact with Dask through a Dask Collection API. The most popular "collection" API's include:

* Dask DataFrame: Dask-based version of the Pandas DataFrame/Series API. Note that dask_cudf is just a wrapper around this collection module (dask.dataframe).
* Dask Array: Dask-based version of the NumPy array API
* Dask Bag: Similar to a Dask-based version of PyToolz or a Pythonic version of PySpark RDD

For example, Dask DataFrame provides a convenient API for decomposing large Pandas (or cuDF) DataFrame/Series objects into a collection of DataFrame partitions.

<img src="./imgs/dask-dataframe.svg" width="20%">

We use **dask_cudf** to process large datasets as a collection of cuDF dataframes instead of Pandas. CuDF is a GPU DataFrame library for loading, joining, aggregating, filtering, and otherwise manipulating data.
<br><br>
**Dask enables easily to schedule tasks for multiple workers: multi-GPU or multi-node. We just need to initialize a Dask cluster (`LocalCUDACluster`) and NVTabular will use the cluster to execute the workflow.**

## Here is the list of modules in the lab:

- **Creating a Local Cluster**<br> Learn how to create a GPU cluster and find the amount of resources available to your cluster.
- **Distributed GPU Arrays**<br> We will study how Dask Array provides chunked algorithms on top of Numpy-like libraries like Numpy and CuPy, enabling us to operate on more data.


# Creating a Local Cluster

The easiest way to scale workflows on a single node is to use the `LocalCUDACluster` API. This lets us create a GPU cluster, using one worker per GPU by default.

In [None]:
import numba
import warnings
from dask_cuda import LocalCUDACluster
from dask.distributed import Client
import nvtabular as nvt
from nvtabular.utils import pynvml_mem_size, device_mem_size
from dask.utils import parse_bytes

dask_workdir = "scratch/workdir"

The following code will automatically generate the parameters for the local CUDA cluster. It will infer the number of GPUs, calculate memory limits that work across a vast array of scenarios, and so on.

In [None]:
# Dask dashboard
dashboard_port = "8787"

# Deploy a Single-Machine Multi-GPU Cluster
protocol = "ucx"  # "tcp" or "ucx"
if numba.cuda.is_available():
    NUM_GPUS = list(range(len(numba.cuda.gpus)))
else:
    NUM_GPUS = []
visible_devices = ",".join([str(n) for n in NUM_GPUS])  # Delect devices to place workers
device_limit_frac = 0.7  # Spill GPU-Worker memory to host at this limit.
device_pool_frac = 0.8
part_mem_frac = 0.15

# Use total device size to calculate args.device_limit_frac
device_size = device_mem_size(kind="total")
device_limit = int(device_limit_frac * device_size)
device_pool_size = int(device_pool_frac * device_size)
part_size = int(part_mem_frac * device_size)

# Check if any device memory is already occupied
for dev in visible_devices.split(","):
    fmem = pynvml_mem_size(kind="free", index=int(dev))
    used = (device_size - fmem) / 1e9
    if used > 1.0:
        warnings.warn(f"BEWARE - {used} GB is already occupied on device {int(dev)}!")

cluster = None  # (Optional) Specify existing scheduler port

We can now initialize the CUDA cluster.

In [None]:
if cluster is None:
    cluster = LocalCUDACluster(
        protocol=protocol,
        n_workers=len(visible_devices.split(",")),
        CUDA_VISIBLE_DEVICES=visible_devices,
        device_memory_limit=device_limit,
        local_directory=dask_workdir,
        dashboard_address=":" + dashboard_port,
        rmm_pool_size=(device_pool_size // 256) * 256
    )

We can now start the local cluster.

Before we do so, please take a look at the options available to us in the `Client(...)` constructor. Instead of initializing a cluster locally, another option available to us is connecting to a remote CUDA cluster. Such cluster might not only include multiple GPUs, but can also span multiple nodes. This enables scaling to running on arbitrarily large data.

In [None]:
client = Client(cluster)
client.cluster

Workers provide two functions:

- Compute tasks as directed by the scheduler
- Store and serve computed results to other workers or clients


Each worker contains a ThreadPool that it uses to evaluate tasks as requested by the scheduler.
- It stores the results of these tasks locally and serves them to other workers or clients on demand. 
- If the worker is asked to evaluate a task for which it does not have all of the necessary data then it will reach out to its peer workers to gather the necessary dependencies.

Each worker sends computations to a thread in a concurrent.futures.ThreadPoolExecutor for computation. These computations occur in the same process as the Worker communication server so that they can access and share data efficiently between each other. For the purposes of data locality all threads within a worker are considered the same worker.

## Distributed GPU Arrays

Let's create a random matrix and calculate the singular value decomposition of it. This is a fairly complex calculation, so it's a great introduction to Dask. Dask can use `CuPy` to create random arrays.

In [None]:
import numpy as np
import dask
import dask.array as da
import cupy as cp

In [None]:
rs = da.random.RandomState(RandomState=cp.random.RandomState, seed=12)  # <-- we specify cupy here

x = rs.random((100000, 1000), chunks=(10000,1000))
x = x.persist() # so quick we don't need to wait

Notice the `persist` call. Like Apache Spark, Dask operations are lazy . Instead of being executed at that moment, most operations are added to a task graph and the actual evaluation is delayed until the result is needed.


Sometimes, though, we want to force the execution of operations. Calling `persist` on a Dask collection fully computes it (or actively computes it in the background), persisting the result into memory. When we’re using distributed systems, we may want to wait until persist is finished before beginning any downstream operations. We can enforce this contract by using `wait`. Wrapping an operation with `wait` will ensure it doesn’t begin executing until all necessary upstream operations have finished.

Let's look at our distributed array.

In [None]:
x

Dask's visual class representation shows us some information about this distributed array. We can see the size of the array, and of individual chunks, among other things. Remember, a Dask array is made up of individual CuPy or NumPy arrays.

Let's take the SVD now.

In [None]:
u, s, v = da.linalg.svd(x)

In [None]:
u

We've just added several hundred tasks to the task graph. We can call `persist` to execute it.

In [None]:
u, s, v = dask.persist(u, s, v)

Now we can take a look at the results.

In [None]:
u[:5, :5].compute()

That's all there is to it. Dask lets us take array workloads and scale up to as many machines as we have!

## Using an existing cluster

In [None]:
from dask.distributed import Client
client = Client('tcp://127.0.0.1:37615')

In [None]:
client

In [None]:
from dask_cuda import LocalCUDACluster
cluster = LocalCUDACluster(protocol = 'ucx')

In [None]:
cluster.close()

# Conclusion

RAPIDS lets us scale up and take advantage of GPU acceleration. Dask lets us scale out to multiple machines. Dask supports both CuPy arrays and cuDF DataFrames, with generally the same APIs as the single-machine libraries. We encourage you to read the Dask [documentation](https://docs.dask.org/en/latest/) to learn more, and also look at our [10 Minute Guide to cuDF and Dask cuDF](https://docs.rapids.ai/api/cudf/nightly/10min.html). After this brief introduction on Dask, we will learn how to use CuDF with Dask in detail in the next lab, and perform data operations on multiple levels.

## Licensing
  
This material is released by OpenACC-Standard.org, in collaboration with NVIDIA Corporation, under the Creative Commons Attribution 4.0 International (CC BY 4.0).