# Dask Overview

Dask is a flexible library for parallel computing in Python that makes scaling out your workflow smooth and simple. On the CPU, Dask uses Pandas (NumPy) to execute operations in parallel on DataFrame (array) partitions.

Dask-cuDF extends Dask where necessary to allow its DataFrame partitions to be processed by cuDF GPU DataFrames as opposed to Pandas DataFrames. For instance, when you call dask_cudf.read_csv(…), your cluster’s GPUs do the work of parsing the CSV file(s) with underlying cudf.read_csv(). Dask also supports array based workflows using CuPy.

## When to use Dask
If your workflow is fast enough on a single GPU or your data comfortably fits in memory on a single GPU, you would want to use cuDF or CuPy. If you want to distribute your workflow across multiple GPUs, have more data than you can fit in memory on a single GPU, or want to analyze data spread across many files at once, you would want to use Dask.

One additional benefit Dask provides is that it lets us easily spill data between device and host memory. This can be very useful when we need to do work that would otherwise cause out of memory errors.

In this brief notebook, you'll walk through an example of using Dask on a single GPU. Because we're using Dask, the same code in this notebook would work on two, eight, 16, or 100s of GPUs.

# Creating a Local Cluster

The easiest way to scale workflows on a single node is to use the `LocalCUDACluster` API. This lets us create a GPU cluster, using one worker per GPU by default. In this case, we limit this to a single GPU since we only have one to work with.

We also set a `device_memory_limit` to illustrate how we can spill data between GPU and CPU memory. Artificial memory limits like this reduce our performance if we don't actually need them, but can let us accomplish much larger tasks when we do.

In [None]:
from dask.distributed import Client, fire_and_forget, wait
from dask_cuda import LocalCUDACluster
from dask.utils import parse_bytes

cluster = LocalCUDACluster(
    CUDA_VISIBLE_DEVICES="0",
    dashboard_address=':8887',
    device_memory_limit=parse_bytes("4GB")
)
client = Client(cluster)
client

Please open {$IP_ADDRESS}:8887 in a new tab to watch Dask execute the following tasks.

## Distributed GPU Arrays

Let's create a random matrix and calculate the singular value decomposition of it. This is a fairly complex calculation, so it's a great introduction to Dask. Dask can use `CuPy` to create random arrays.

In [None]:
import cupy as cp
import dask
import dask.array as da

In [None]:
rs = da.random.RandomState(RandomState=cp.random.RandomState, seed=12)  # <-- we specify cupy here

x = rs.random((100000, 1000), chunks=(1000,1000))
x = x.persist() # so quick we don't need to wait

Notice the `persist` call. Like Apache Spark, Dask operations are lazy . Instead of being executed at that moment, most operations are added to a task graph and the actual evaluation is delayed until the result is needed.


Sometimes, though, we want to force the execution of operations. Calling `persist` on a Dask collection fully computes it (or actively computes it in the background), persisting the result into memory. When we’re using distributed systems, we may want to wait until persist is finished before beginning any downstream operations. We can enforce this contract by using `wait`. Wrapping an operation with `wait` will ensure it doesn’t begin executing until all necessary upstream operations have finished.

Let's look at our distributed array.

In [None]:
x

Dask's visual class representation shows us some information about this distributed array. We can see the size of the array, and of individual chunks, among other things. Remember, a Dask array is made up of individual CuPy or NumPy arrays.

Let's take the SVD now.

In [None]:
u, s, v = da.linalg.svd(x)

In [None]:
u

Nothing happened? Right. Dask is lazy. We've just added several hundred tasks to the task graph. We can call `persist` to execute it.

In [None]:
u, s, v = dask.persist(u, s, v)
_ = wait(u)

Now we can take a look at the results.

In [None]:
u[:5, :5].compute()

That's all there is to it. Dask lets us take array workloads and scale up to as many machines as we have!

## cuDF DataFrames to Dask DataFrames

Dask also lets us scale DataFrame workflows. We'll walk through a couple of examples below, and then also highlight how Dask lets us spill data from GPU to CPU memory.

First, we'll create a dataframe with CPU Dask and then send it to the GPU

In [None]:
import cudf
import dask_cudf

In [None]:
ddf = dask_cudf.from_dask_dataframe(dask.datasets.timeseries())
ddf.head()

### Example One: Groupby-Aggregations

In [None]:
ddf.groupby(["id", "name"]).agg({"x":['sum', 'mean']}).head()

Run the code above again.

If you look at the task stream in the dashboard, you'll notice that we're creating the data every time. That's because Dask is lazy. We need to `persist` the data if we want to cache it in memory.

In [None]:
ddf = ddf.persist()
wait(ddf);

In [None]:
ddf.groupby(["id", "name"]).agg({"x":['sum', 'mean']}).head()

This is the same API as cuDF, except it works across many GPUs.

### Example Two: Rolling Windows

We can also do things like rolling window calculations with Dask and GPUs.

In [None]:
ddf.head()

In [None]:
rolling = ddf[['x','y']].rolling(window=3)
type(rolling)

In [None]:
rolling.mean().head()

## Larger than GPU Memory Workflows

What if we needed to scale up even more, but didn't have enough GPUs? Dask handles spilling for us, so we don't need to worry about it. The `device_memory_limit` parameter we used while creating the LocalCluster determines when we shoul start spilling. In this case, we'll start spilling when we've used about 8GB of GPU memory.

To demonstrate, run the following cells and then switch to the dashboard. Watch what happens. We've used a decent chunk of memory already (about 6GB), so we'll need to spill to do large computations.

In [None]:
!nvidia-smi | head -10

In [None]:
rs = da.random.RandomState(RandomState=cp.random.RandomState, seed=12)

x = rs.random((500000, 1000), chunks=(5000,1000))
x

In [None]:
u, s, v = da.linalg.svd(x)
u, s, v = dask.persist(u, s, v)
_ = wait(u)

In [None]:
u[:5, :5].compute()

Watch the Dask Dashboard while this runs. You should see a lot of tasks in the stream like `disk-read` and `disk-write`. Setting a `device_meory_limit` tells dask to spill to CPU memory and potentially disk. This lets us do these large computations even when we're almost out of memory (though in this case, we faked it).

# Summary

RAPIDS lets us scale up and take advantage of GPU acceleration. Dask lets us scale out to multiple machines. Dask supports both CuPy arrays and cuDF DataFrames, with generally the same APIs as the single-machine libraries.

We encourage you to read the Dask [documentation](https://docs.dask.org/en/latest/) to learn more, and also look at our [10 Minute Guide to cuDF and Dask cuDF](https://docs.rapids.ai/api/cudf/nightly/10min.html)