<img src='https://docs.dask.org/en/latest/_images/dask_horizontal.svg' width=400>

# Dask natively scales Python
**Dask provides advanced parallelism for analytics, enabling performance at scale for the tools you love**

_Integrates with existing projects_

_BUILT WITH THE BROADER COMMUNITY_

Dask is open source and freely available. It is developed in coordination with other community projects like Numpy, Pandas, and Scikit-Learn.

*(from the Dask project homepage at dask.org)*

* * *

__What Does This Mean?__
* Built in Python
* Scales *properly* from single laptops to 1000-node clusters
* Leverages and interops with existing Python APIs as much as possible
* Adheres to (Tim Peters') "Zen of Python" (https://www.python.org/dev/peps/pep-0020/) ... especially these elements:
    * Explicit is better than implicit.
    * Simple is better than complex.
    * Complex is better than complicated.
    * Readability counts. <i>[ed: that goes for docs, too!]</i>
    * Special cases aren't special enough to break the rules.
    * Although practicality beats purity.
    * In the face of ambiguity, refuse the temptation to guess.
    * If the implementation is hard to explain, it's a bad idea.
    * If the implementation is easy to explain, it may be a good idea.
* While we're borrowing inspiration, it Dask embodies one of Perl's slogans, making easy things easy and hard things possible
    * Specifically, it supports common data-parallel abstractions like Pandas and Numpy
    * But also allows scheduling arbitary custom computation that doesn't fit a preset mold

## First a little housekeeping

This cell is intended to capture any boilerplate code that we need to set things up on `app.blazingsql.com`, the intention is to simplify running this notebook as a tutorial. Please run the next cell and then don't worry about it.

In [None]:
import dask.config
import getpass
dask.config.set({"distributed.dashboard.link": f"https://app.blazingsql.com/jupyter/user/{getpass.getuser()}/proxy/{{port}}/status"})
!mkdir data
!cd data && wget https://github.com/jacobtomlinson/dask-video-tutorial-2020/raw/master/data/beer_small.csv
!cd data && wget https://github.com/jacobtomlinson/dask-video-tutorial-2020/raw/master/data/pageviews_small.csv

## Dask Dataframes

Let's start with one common use case for Dask: scaling dataframes to 
* larger datasets (which don't fit in memory) and 
* multiple processes (which could be on multiple nodes)

In [None]:
from dask.distributed import Client
from dask_cuda import LocalCUDACluster

cluster = LocalCUDACluster()
client = Client(cluster)

client

In [None]:
import dask_cudf

ddf = dask_cudf.read_csv('data/beer_small.csv', blocksize=12e7)

In [None]:
ddf

### What is this Dask Dataframe?

A large, virtual dataframe divided along the index into multiple dataframes. 

_As we are working with RAPIDS here we will be using [cuDF](https://docs.rapids.ai/api/cudf/stable/) as our sub-dataframe type instead of the traditional Pandas._

<img src="https://docs.dask.org/en/latest/_images/dask-dataframe.svg" width="400px">

In [None]:
ddf.map_partitions(type).compute()

In [None]:
ddf.head()

In [None]:
ddf[ddf.beer_style.str.contains('IPA')].head()

In [None]:
ipa = ddf[ddf.beer_style.str.contains('IPA')]

In [None]:
mean_ipa_review = ipa.groupby('brewery_name').review_overall.agg(['mean','count'])

In [None]:
mean_ipa_review.compute()

`compute` doesn't just run the work, it collects the result to a single, regular Pandas dataframe right here in our initial Python VM.

Having a local result is convenient, but if we are generating large results, we may want (or need) to produce output in parallel to the filesystem, instead. 

There are writing counterparts to read methods which we can use:

- `read_csv` \ `to_csv`
- `read_hdf` \ `to_hdf`
- `read_json` \ `to_json`
- `read_parquet` \ `to_parquet`

In [None]:
mean_ipa_review.to_csv('ipa-*.csv') #the * is where the partition number will go

### Another dataframe example

In [None]:
client.restart()

__Read data__ from the `pageviews_small.csv` file. Use Dask's `blocksize=` parameter to set each partition to max of 100 MB.

In [None]:
import dask_cudf

ddf = dask_cudf.read_csv('data/pageviews_small.csv', sep=' ', blocksize=10e7)

ddf

__Change the column names__ to `project`, `page`, `requests`, and `x` then drop the `x` column.

In [None]:
ddf.columns = ['project', 'page', 'requests', 'x']

ddf2 = ddf.drop('x', axis=1)

ddf2

__Filter__ for `project` matching "en" (English Wikipedia)

In [None]:
ddf3 = ddf2[ddf2.project == 'en']
ddf3

__Count__ how many pages were accessed from English Wikipedia vs. all projects in this dataset. (Note: each project/page combination appears on a unique line, so this amounts to just counting records)

In [None]:
ddf2.count().compute() #all

In [None]:
ddf3.count().compute() #English

__Show the record counts__ for English (en), French (fr), Chinese (zh), and Polish (pl).

In [None]:
ddf4 = ddf2.groupby('project').count().reset_index()

ddf4[ddf4.project.isin(['en', 'fr', 'zh', 'pl'])].compute()

## Dask Dashboard

Here we'll revisit that code one more time.

But this time, we'll focus less on getting the answers, and more on seeing what Dask is doing.

Specifically, we'll look at some of the elements of the Dask Dashboard GUI.

In [None]:
client.restart()

The dashboard is available at the URL above.

When we __Read data__ you'll notice that nothing happens in the Dask GUI widgets, because these operations are just setting up a compute graph which will be executed later:

In [None]:
import dask_cudf

ddf = dask_cudf.read_csv('data/pageviews_small.csv', sep=' ', blocksize=10e7)

ddf.columns = ['project', 'page', 'requests', 'x']

When we __Count__ (and `.compute()`) all the records, we'll see tasks get scheduled. __Before__ running this command, note memory, CPU, etc. in the GUI

In [None]:
ddf.drop('x', axis=1).count().compute() #all

The GUI tells us quite a lot about what's happened. If you really want to see how the computation was decomposed by Dask, you can render a task graph before executing (although you won't normally need to do this):

### Let's look at Dask's Profile View

Note: almost all of Dask's dashboard views update in realtime. The Profile View __does not__. Although Dask is collecting perf data behind the scenes, the profiler timeline doesn't update until you click the "Update" button. 

At that point you can select a time period from the refreshed timeline, and Dask will render a flame graph from that selected period.

## Dask Array

Depending on the focus of your work, Dask Array is likely to be the first interface you use for Dask after Dataframe ... or perhaps just the first interface you use (e.g., if you work primarily with NumPy).

Dask Array implements a subset of the NumPy ndarray interface using blocked algorithms, cutting up the large array into many small arrays. This lets us compute on arrays larger than memory using all of our cores. We coordinate these blocked algorithms using Dask graphs.

Dask arrays coordinate many NumPy arrays arranged into a grid. These NumPy arrays may live on disk or on other machines.

_Again as we are working with RAPIDS here we will be using [CuPy](https://cupy.dev/) as our sub-array type instead of the traditional NumPy._

<img src="https://docs.dask.org/en/latest/_images/dask-array-black-text.svg">

## Dask Arrays

- Dask arrays are chunked, n-dimensional arrays
- Can think of a Dask array as a collection of `ndarray` arrays
- Dask arrays implement a large subset of the NumPy API using blocked algorithms
- For many purposes Dask arrays can serve as drop-in replacements for NumPy arrays

In [None]:
client.restart()

In [None]:
import cupy as cp
import dask.array as da

In [None]:
a_cp = cp.arange(1, 50, 3)
a_cp

In [None]:
a_da = da.arange(1, 50, 3, chunks=5, like=cp.empty(0))
a_da

In [None]:
print(a_da.dtype)
print(a_da.shape)

In [None]:
print(a_da.chunks)
print(a_da.chunksize)

In [None]:
a_da ** 2

In [None]:
(a_da ** 2).compute()

In [None]:
type((a_da ** 2).compute())

Dask arrays support a large portion of the NumPy interface:

- Arithmetic and scalar mathematics: `+`, `*`, `exp`, `log`, ...

- Reductions along axes: `sum()`, `mean()`, `std()`, `sum(axis=0)`, ...

- Tensor contractions / dot products / matrix multiply: `tensordot`

- Axis reordering / transpose: `transpose`

- Slicing: `x[:100, 500:100:-2]`

- Fancy indexing along single axes with lists or numpy arrays: `x[:, [10, 1, 5]]`

- Array protocols like `__array__` and `__array_ufunc__`

- Some linear algebra: `svd`, `qr`, `solve`, `solve_triangular`, `lstsq`, ...

- ...

See the [Dask array API docs](http://docs.dask.org/en/latest/array-api.html) for full details about what portion of the NumPy API is implemented for Dask arrays.

### Blocked Algorithms

Dask arrays are implemented using _blocked algorithms_. These algorithms break up a computation on a large array into many computations on smaller peices of the array. This minimizes the memory load (amount of RAM) of computations and allows for working with larger-than-memory datasets in parallel.

In [None]:
rs = da.random.RandomState(RandomState=cp.random.RandomState)

In [None]:
x = rs.random(20, chunks=5)
x

In [None]:
result = x.sum()
result

In [None]:
result.compute()

Dask supports a large portion of the NumPy API. This can be used to build up more complex computations using the familiar NumPy operations you're used to.

In [None]:
x = rs.random(size=(15, 15), chunks=(10, 5))
x

In [None]:
result = (x + x.T).sum()
result

In [None]:
result.compute()

## Diving deeper with Dask Delayed

*More detailed docs are online at:*
* https://docs.dask.org/en/latest/delayed.html
* https://docs.dask.org/en/latest/futures.html

Sometimes problems don’t fit nicely into one of the high-level collections like Dask arrays or Dask DataFrames. In these cases, you can parallelize custom algorithms using the Dask `delayed` interface. This allows one to manually create task graphs with a light annotation of normal Python code.

In [None]:
import time

def inc(x):
    time.sleep(0.5)
    return x + 1

def double(x):
    time.sleep(0.5)
    return 2 * x

def add(x, y):
    time.sleep(0.5)
    return x + y

In [None]:
%%time

data = [1, 2, 3, 4]

output = []
for x in data:
    a = inc(x)
    b = double(x)
    c = add(a, b)
    output.append(c)

total = sum(output)

Dask `delayed` wraps function calls and delays their execution. Rather than computing results immediately, `delayed` functions record what we want to compute as a task into a graph that we’ll run later on parallel hardware by calling `compute`.

In [None]:
from dask import delayed

In [None]:
lazy_inc = delayed(inc)

In [None]:
inc_output = lazy_inc(3)  #inc(3)
inc_output

In [None]:
# inc_output.visualize()

In [None]:
inc_output.compute()

Using delayed functions, we can build up a task graph for the particular computation we want to perform

In [None]:
double_inc_output = lazy_inc(inc_output)
double_inc_output

In [None]:
# double_inc_output.visualize()

In [None]:
double_inc_output.compute()

We can use `delayed` to make our previous example computation lazy by wrapping all the function calls with delayed

In [None]:
@delayed
def inc(x):
    time.sleep(0.5)
    return x + 1

@delayed
def double(x):
    time.sleep(0.5)
    return 2 * x

@delayed
def add(x, y):
    time.sleep(0.5)
    return x + y

Now `add` returns a `Delayed` object which you can call `compute()` on at a later time

In [None]:
%%time

data = [1, 2, 3, 4]

output = []
for x in data:
    a = inc(x)
    b = double(x)
    c = add(a, b)
    output.append(c)

total = delayed(sum)(output)
total

In [None]:
# total.visualize()

In [None]:
%%time

total.compute()

Check out the [Dask delayed best practices](http://docs.dask.org/en/latest/delayed-best-practices.html) page to avoid some common problems when using `delayed`. 

## `concurrent.futures` interface

The Dask distributed scheduler implements a superset of Python's [`concurrent.futures`](https://docs.python.org/3/library/concurrent.futures.html) interface that allows for finer control and asynchronous computation.

The `submit` function sends a function and arguments to the distributed scheduler for processing. They return `Future` objects that refer to remote data on the cluster. The `Future` returns immediately while the computations run remotely in the background. There is no blocking of the local Python session.

In [None]:
import random

def inc(x):
    time.sleep(random.uniform(0, 2))
    return x + 1

In [None]:
f = client.submit(inc, 7.2)  # Submits inc(7.2) to the distributed scheduler
print(f)
print(type(f))

Once the computation for the `Future` is complete, you can retrieve the result using the `.result()` method

In [None]:
print(f)

In [None]:
f.result()

The `map` function can be used to apply a function on a sequence of arguments (similar to the built-in Python `map` function).

To delete `Futures` in distributed memory, use the `del` keyword

In [None]:
del f

In [None]:
data = range(10)
futures = client.map(inc, data)
futures

Here a list of `Futures` are returned, one for each item in the sequence of arguments. 

In [None]:
futures

In [None]:
results = client.gather(futures)
# Same as results = [future.result() for future in futures]

In [None]:
results

Notice what happens if we run the same calculation:

In [None]:
data = range(10)
futures = client.map(inc, data)
futures

The results are ready right away ... and ... the keys are the same. That's because all of the same objects are involved, and the results are still in the cluster memory.

The `concurrent.futures` API even allows you to submit tasks based on the output of other tasks. This gives more flexibility in situations where the computations may evolve over time.

```python
from dask.distributed import as_completed

seq = as_completed(futures)

for future in seq:
    y = future.result()
    if condition(y):
        new_future = client.submit(...)
        seq.add(new_future)  # add back into the loop
```

## dask_ml (and cuml)

Building on these primitives Dask-ML provides scalable machine learning in Python using [Dask](https://dask.org/) alongside popular machine learning libraries like [Scikit-Learn](http://scikit-learn.org/).

The idea is to support Pandas + Scikit style ML for parallel scenarios, with code patterns you're used to:

```python
import dask.dataframe as dd
df = dd.read_parquet('...')
data = df[['age', 'income', 'married']]
labels = df['outcome']

from dask_ml.linear_model import LogisticRegression
lr = LogisticRegression()
lr.fit(data, labels)
```

Modern machine learning algorithms employ a wide variety of techniques. Scaling these requires a similarly wide variety of different approaches. Generally solutions fall into the following three categories:

### Parallelize Scikit-Learn Directly

Scikit-Learn already provides parallel computing on a single machine with [Joblib](http://joblib.readthedocs.io/en/latest/). Dask extends this parallelism to many machines in a cluster. This works well for modest data sizes but large computations, such as random forests, hyper-parameter optimization, and more.

```python
from dask.distributed import Client
import joblib

client = Client()  # Connect to a Dask Cluster

with joblib.parallel_backend('dask'):
    # Your normal scikit-learn code here
```

See [Dask-ML Joblib documentation](https://ml.dask.org/joblib.html) for more information.

*Note that this is an active collaboration with the Scikit-Learn development team. This functionality is progressing quickly but is in a state of rapid change.*

### Reimplement Scalable Algorithms with Dask Array

Some machine learning algorithms are easy to write down as Numpy algorithms. In these cases we can replace Numpy arrays with Dask arrays to achieve scalable algorithms easily. This is employed for [linear models](https://ml.dask.org/glm.html), [pre-processing](https://ml.dask.org/preprocessing.html), and [clustering](https://ml.dask.org/clustering.html).

```python
from dask_ml.preprocessing import Categorizer, DummyEncoder
from dask_ml.linear_model import LogisticRegression

lr = LogisticRegression()
lr.fit(data, labels)
```

### Partner with other distributed libraries

Other machine learning libraries like XGBoost and TensorFlow already have distributed solutions that work quite well. Dask-ML makes no attempt to re-implement these systems. Instead, Dask-ML makes it easy to use normal Dask workflows to prepare and set up data, then it deploys XGBoost or Tensorflow *alongside* Dask, and hands the data over.

```python
from dask_ml.xgboost import XGBRegressor

est = XGBRegressor(...)
est.fit(train, train_labels)
```

See [Dask-ML + XGBoost](https://ml.dask.org/xgboost.html) or [Dask-ML + TensorFlow](https://ml.dask.org/tensorflow.html) documentation for more information.

### cuML

[cuML](https://docs.rapids.ai/api/cuml/stable/) extends work done in Dask and Dask-ML to bring distributed execution to GPU accelerated algorithms.

cuML is a suite of fast, GPU-accelerated machine learning algorithms designed for data science and analytical tasks. With API that mirrors Sklearn’s, and we provide practitioners with the easy fit-predict-transform paradigm without ever having to program on a GPU.

The `cuml.dask` subpackage takes some of the GPU accelerated algorithms and provides [multi-node multi-GPU](https://docs.rapids.ai/api/cuml/stable/api.html#multi-node-multi-gpu-algorithms) implementations which use Dask to scale out over many machines and GPUs.

## Distributed

As we covered at the beginning Dask has the ability to run work on mulitple machines using the distributed scheduler.

Until now we have actually been using the distributed scheduler for our work, but just on a single machine.

When we instantiate a `LocalCUDACluster()` object with no arguments it will attempt to locate all GPUs and launch one Dask GPU worker per GPU.

We then pass this cluster object to our `Client()` in order for work to be executed on our cluster.

Let's explore the `LocalCUDACluster` object ourselves and see what it is doing.

In [None]:
LocalCUDACluster?

Our cluster object has attributes and methods which we can use to access information about our cluster. For instance we can get the log output from the scheduler and all the workers with the `get_logs()` method.

In [None]:
cluster.get_logs()

We can access the url that the Dask dashboard is being hosted at.

In [None]:
cluster.dashboard_link

## Scalable clusters

The `LocalCUDACluster` is a great tool for launching a Dask cluster on a single machine with one or more GPUs. But as your workloads grow you may need to spread to multiple machines. Dask has many subprojects which handle provisioning clusters on various platforms.

We currently have cluster managers for [Kubernetes](https://kubernetes.dask.org/en/latest/), [Hadoop/Yarn](https://yarn.dask.org/en/latest/), [cloud platforms](https://cloudprovider.dask.org/en/latest/) and [batch systems including PBS, SLURM and SGE](http://jobqueue.dask.org/en/latest/).

These cluster managers allow users who have access to resources such as these to bootstrap Dask clusters on to them. If an institution wishes to provide a central service that users can request Dask clusters from there is also [Dask Gateway](https://gateway.dask.org/).

With some cluster managers it is possible to increase and descrease the number of workers either by calling `cluster.scale(n)` in your code where `n` is the desired number of workers. Or you can let Dask do this dynamically by calling `cluster.adapt(minimum=1, maximum=100)` where minimum and maximum are your preferred limits for Dask to abide to.

It is always good to keep your minimum to at least 1 as Dask will start running work on a single worker in order to profile how long things take and extrapolate how many additional workers it thinks it needs. Getting new workers may take time depending on your setup so keeping this at 1 or above means this profilling will start immediately.

## Best Practices and Wrapup

The Dask docs collect a number of best practices:
* Dataframe: https://docs.dask.org/en/latest/dataframe-best-practices.html
* Array: https://docs.dask.org/en/latest/array-best-practices.html
* Delayed: https://docs.dask.org/en/latest/delayed-best-practices.html 
* Overall: https://docs.dask.org/en/latest/best-practices.html

### Partitions/Chunks and Tasks

Remember that Dask is a scheduler for regular Python functions operating on (and producing) regular Python objects.

Your partitions, chunks, or data segments should be small enough to comfortably fit in RAM for each worker thread/core.

That is...
* if you have a 1GB worker with 1 core, want to keep your partitions below 1GB
* with 2 x 1 GB workers with 1 cores, we still want partitions below 1GB
* with n x 4 GB workers with 2 cores per worker, we want partitions below 2 GB

It's also good to take into account that more memory may be used for operations than the data chunk size itself, and that it's helpful to have a few chunks of data available to keep Dask's worker cores busy. 

So we might want to take those numbers above and make them 2-4x smaller (or, equivalently, create 2-4x as many partitions).

Generally speaking, a lot of tasks is not a bad thing. Scheduling overhead for each additional task is typically less than 1 millisecond, and can be a lot less.

That said, if you have, say, a billion tasks, those milliseconds will add up to minutes. In that case you may want to simplify your task graph or use larger (and hence fewer) partitions/chunks.

### Caching (Persistence)

The results of computations can be cached in the cluster memory, so that they are available for reuse, or for use to derive subsequent results.

(See: `persist` which is available on `Client`, `Bag`, `Array`, `Dataframe`, etc.)

Use caching wisely (not indiscriminately) and monitor memory usage using the `Workers` and `Memory` dashboard panes.

### Data Formats and Compression

Use compression schemes which are *splittable* and allow random access, so that processing your files in parallel is more flexible, e.g., Snappy, LZ4 instead of gzip.

For datasets, consider files (and collections of files) in Parquet, ORC, HDF5, etc.

