# A (very) quick intro to Dask

**Dask** is a very popular library for parallel and distributed computing in Python. It has two components:

- **Dynamic task scheduling** optimized for computation. This is similar to [Apache Airflow](https://airflow.apache.org/), Luigi, Celery, or Make, but optimized for interactive computational workloads.
- **Large data structures** like NumPy arrays (Dask Array), Pandas dataframes (Dask DataFrame), and Python lists (Dask Bag — similar to PySpark) that work on larger-than-memory or distributed environments. These parallel collections run on top of dynamic task schedulers.

Read more about it:

- [Why Dask?](https://docs.dask.org/en/latest/why.html)
- [Examples](https://examples.dask.org/)

## The Dashboard

This is optional, but a good idea while you're learning.

In [1]:
from dask.distributed import Client

client = Client(processes=False,
                threads_per_worker=8,
                n_workers=1,
                memory_limit='4GB')
client

0,1
Connection method: Cluster object,Cluster type: LocalCluster
Dashboard: http://192.168.2.73:8787/status,

0,1
Status: running,Using processes: False
Dashboard: http://192.168.2.73:8787/status,Workers: 1
Total threads:  8,Total memory:  3.73 GiB

0,1
Comm: inproc://192.168.2.73/3065/1,Workers: 1
Dashboard: http://192.168.2.73:8787/status,Total threads:  8
Started:  Just now,Total memory:  3.73 GiB

0,1
Comm: inproc://192.168.2.73/3065/3,Total threads: 8
Dashboard: http://192.168.2.73:50997/status,Memory: 3.73 GiB
Nanny: None,
Local directory: /Users/matt/Dropbox/dev/geocomp-ml/master/dask-worker-space/worker-r8oxps3_,Local directory: /Users/matt/Dropbox/dev/geocomp-ml/master/dask-worker-space/worker-r8oxps3_


## Make arrays, do stuff

In [6]:
import dask.array as da

x = da.random.random((16_000, 16_000), chunks=(2000, 2000))
x

Unnamed: 0,Array,Chunk
Bytes,1.91 GiB,30.52 MiB
Shape,"(16000, 16000)","(2000, 2000)"
Count,64 Tasks,64 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 1.91 GiB 30.52 MiB Shape (16000, 16000) (2000, 2000) Count 64 Tasks 64 Chunks Type float64 numpy.ndarray",16000  16000,

Unnamed: 0,Array,Chunk
Bytes,1.91 GiB,30.52 MiB
Shape,"(16000, 16000)","(2000, 2000)"
Count,64 Tasks,64 Chunks
Type,float64,numpy.ndarray


This Dask array supports most of the NumPy interface (but not `np.linalg`). For example:

In [7]:
y = x + x.T
z = (y[::2, 1000:-1000]**2).mean(axis=1)
z

Unnamed: 0,Array,Chunk
Bytes,62.50 kiB,7.81 kiB
Shape,"(8000,)","(1000,)"
Count,408 Tasks,8 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 62.50 kiB 7.81 kiB Shape (8000,) (1000,) Count 408 Tasks 8 Chunks Type float64 numpy.ndarray",8000  1,

Unnamed: 0,Array,Chunk
Bytes,62.50 kiB,7.81 kiB
Shape,"(8000,)","(1000,)"
Count,408 Tasks,8 Chunks
Type,float64,numpy.ndarray


This has not actually been computed yet, It's just a computation graph.

When you want your data as a NumPy array, you can compute this thing like so:

In [8]:
z.compute()  # Note that this blocks (i.e. waits for result).



array([1.17367514, 1.17736514, 1.17381643, ..., 1.16174151, 1.15940619,
       1.16690619])









Be careful, this thing will come into memory! Only do this if you know the result is small enough to fit.

This is the expensive bit, so call `compute()` as little as possible...

In [9]:
# Don't do mi, ma = z.min().compute(), z.max().compute
# Do:
import dask

mi, ma = dask.compute(z.min(), z.max())
mi, ma





(1.1393791566433689, 1.1951163858454663)



































Eventually you have a processed object that you want to query or operate on in various ways. Persist processed objects in memory — if you have room! — when you can. It's much faster.

**And if it fits in the memory of a single machine, you can stop using `dask` at this point. Only continue with `dask` if your memory is also distributed.**

In [45]:
import numpy as np

rms = np.sqrt(z)
rms

Unnamed: 0,Array,Chunk
Bytes,62.50 kiB,7.81 kiB
Shape,"(8000,)","(1000,)"
Count,416 Tasks,8 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 62.50 kiB 7.81 kiB Shape (8000,) (1000,) Count 416 Tasks 8 Chunks Type float64 numpy.ndarray",8000  1,

Unnamed: 0,Array,Chunk
Bytes,62.50 kiB,7.81 kiB
Shape,"(8000,)","(1000,)"
Count,416 Tasks,8 Chunks
Type,float64,numpy.ndarray


In [46]:
rms = rms.persist()  # Computation occurs in the background.

To move it into physical memory on a single machine:

In [47]:
rms = rms.compute()
rms

array([1.07965428, 1.07862667, 1.07074784, ..., 1.08183045, 1.07631351,
       1.08036942])

Now `rms` is just a NumPy array.

## Reading a large dataset

Dask has readers of its own, so in general you can avoid reading a lot of files and appending to a Dask object.

From [the docs](https://docs.dask.org/en/latest/best-practices.html?highlight=persist#dataframes):

```
# Don't do this:
ddf = ... a dask dataframe ...
for fn in filenames:
    df = pandas.read_csv(fn)  # Read locally with Pandas
    ddf = ddf.append(df)      # Give to Dask
    
# Do this:
ddf = dd.read_csv(filenames)
```

The same goes for reading HDF5 / NetCDF files, or 

## Tutorials and examples using `dask`

- [The Dask Tutorial](https://github.com/dask/dask-tutorial)
- There are [lots of machine learning examples](https://examples.dask.org/) using `sklearn` and PyTorch, among other tools.
- [Using Dask with seismic data via SEISNC](https://segysak.readthedocs.io/en/latest/examples/example_segysak_dask.html#Lazy-loading-from-SEISNC-using-chunking) (and read about [the SEISNC specification](https://segysak.readthedocs.io/en/latest/seisnc-standard.html))
- [Load large image data with Dask Array](https://blog.dask.org/2019/06/20/load-image-data)
- [Why PanGeo chose `dask`](https://stories.dask.org/en/latest/pangeo.html)

---

© 2021 Agile Scientific