<img src="https://docs.dask.org/en/latest/_images/dask_horizontal.svg" align="right" width="30%"/>

<center>
    <h1>
Dask Tutorial for PyHEP 2022
    </h1>
</center>

Dask is a pure Python library for parallel and distributed computing designed to scale up workflows in the PyData ecosystem.

The two main components of Dask:

- The **collections library(ies)** (sometimes called "Dask core") listed here in alphabetical order:
  - `dask.array`: chunked NumPy
  - `dask.bag`: partitioned Python iterables
  - `dask.dataframe`: partitioned Pandas
  - `dask.delayed`: custom algorithms
- The **execution engines** (task schedulers)
  - The distributed engine is its own project (`distributed`, sometimes called "Dask Distributed")
 

<div style="text-align: center;">
  <img src="https://docs.dask.org/en/stable/_images/dask-overview.svg" align="center" width="70%"/>
</div>

There are also a number of other projects in the Dask ecosystem that leverage both upstream components.

First Example (using `dask.delayed`)
------------------------------------

We'll start with a simple `dask.delayed` example that covers _a lot_ of how Dask works:

In [1]:
import dask
from dask.delayed import delayed

def inc(x):
    return x + 1

inc = delayed(inc)

In [2]:
inc(7)

Delayed('inc-e0cde641-0587-4cec-a4c9-607e98654968')

Notice that this just creates a `Delayed` object.

In [3]:
eight = inc(7)

We have to ask Dask to determine the result via `compute()`

In [4]:
eight.compute()

8

We can start to construct a more complex task graph by chaining function calls:

In [5]:
@delayed
def inc(x):
    return x + 1

@delayed
def add(x, y):
    return x + y

In [6]:
five = add(inc(1), inc(2))

In [7]:
five.compute()

5

We can inspect the complete task graph to see how dask accomplishing computing the result of the collection:

In [8]:
delayed_task_graph = five.dask.to_dict()

In [9]:
for i, (k, v) in enumerate(delayed_task_graph.items()):
    if i != 0:
        print("\n")
    print("The key (label) of a task:   ", k)
    print("The task itself (Lisp S-exp):", v)

The key (label) of a task:    add-8d05b2de-6a47-4f03-a386-fe14a91fdad8
The task itself (Lisp S-exp): (<function add at 0x12e5dc670>, 'inc-61bbe258-c26c-4122-bc10-4323d5de091b', 'inc-e09adbe9-f352-495a-8624-4ce9dc5f3d41')


The key (label) of a task:    inc-61bbe258-c26c-4122-bc10-4323d5de091b
The task itself (Lisp S-exp): (<function inc at 0x12e5dc9d0>, 1)


The key (label) of a task:    inc-e09adbe9-f352-495a-8624-4ce9dc5f3d41
The task itself (Lisp S-exp): (<function inc at 0x12e5dc9d0>, 2)


There is a much better method of inspection! (`visualize()`)

In [10]:
five.visualize(engine="cytoscape")

CytoscapeWidget(cytoscape_layout={'name': 'dagre', 'rankDir': 'BT', 'nodeSep': 10, 'edgeSep': 10, 'spacingFact…

Second Example
--------------

Let's take a look at an example that illustrates something closer to a real workflow: reading and operating on files to produce a histogram. Our example will have two steps:

1. Load an uproot TTree by file and tree name
2. Calculate something from information in the file
3. Histogram the calculation

We'll look at the workflow while leveraging Dask, and compare to a workflow without Dask

In [11]:
import uproot
import awkward as ak
import hist
import time
from skhep_testdata import data_path
paths = [data_path("uproot-Zmumu.root")] * 4

In [12]:
@delayed
def read_tree(file_name, tree_name):
    time.sleep(1)  # faking making the file larger
    return uproot.open(file_name)[tree_name]

@delayed
def calculation(tree):
    arrs = tree.arrays()
    return abs(arrs.E1 - arrs.E2)
    
@delayed
def histo(data, bins, range):
    h = hist.Hist(hist.axis.Regular(bins=bins, start=range[0], stop=range[1], name="abs(E1-E2)"))
    h.fill(data)
    return h

In [13]:
histos = []
for p in paths:
    tree = read_tree(p, "events")
    calc = calculation(tree)
    h = histo(calc, 20, (0, 200))
    histos.append(h)

In [14]:
histos

[Delayed('histo-fd273546-e128-458a-b0e9-3dedde455ba1'),
 Delayed('histo-5ee159f8-a67c-42eb-8440-1366b9f4c9bf'),
 Delayed('histo-619e7ead-7cd4-491f-92f8-a08b97e8e2bb'),
 Delayed('histo-fbfdf677-af29-409e-b31f-f4bad89dfdf3')]

In [15]:
sum(histos).visualize(engine="cytoscape")

CytoscapeWidget(cytoscape_layout={'name': 'dagre', 'rankDir': 'BT', 'nodeSep': 10, 'edgeSep': 10, 'spacingFact…

In [16]:
sum(histos).compute()

In [17]:
%%timeit
sum(histos).compute()

1.19 s ± 10.7 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


Now without Dask:

In [18]:
def s_read_tree(file_name, tree_name):
    time.sleep(1)  # faking making the file larger
    return uproot.open(file_name)[tree_name]

def s_calculation(tree):
    arrs = tree.arrays()
    return abs(arrs.E1 - arrs.E2)
    
def s_histo(data, bins, range):
    h = hist.Hist(hist.axis.Regular(bins=bins, start=range[0], stop=range[1], name="abs(E1-E2)"))
    h.fill(data)
    return h

In [19]:
%%timeit
s_histos = []
for p in paths:
    tree = s_read_tree(p, "events")
    calc = s_calculation(tree)
    h = histo(calc, 20, (0, 200))
    s_histos.append(h)
sum(s_histos)

4.3 s ± 9.38 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


dask.array
==========

While `dask.delayed` is incredibly flexible and can turn almost any Python function into a node in a task graph, the other collection libraries are designed to provide task graph creation as a near drop in replacement to existing PyData libraries. The NumPy API is meant to be recreated with `dask.array`. Arrays in `dask.array` are chunked and lazily evaluated NumPy arrays. The data nodes in a task graph are just NumPy arrays: Dask doesn't create a new array computation kernel library. 

<center>
<img src="https://docs.dask.org/en/stable/_images/dask-array.svg" width="50%">
</center>

In [20]:
import numpy as np
import dask.array as da

In [21]:
a1 = np.ones((10,))

In [22]:
a1.sum()

10.0

In [23]:
a1[:5].sum() + a1[5:].sum()

10.0

In [24]:
a2 = da.ones((10,), chunks=5)

In [25]:
a2

Unnamed: 0,Array,Chunk
Bytes,80 B,40 B
Shape,"(10,)","(5,)"
Count,1 Graph Layer,2 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 80 B 40 B Shape (10,) (5,) Count 1 Graph Layer 2 Chunks Type float64 numpy.ndarray",10  1,

Unnamed: 0,Array,Chunk
Bytes,80 B,40 B
Shape,"(10,)","(5,)"
Count,1 Graph Layer,2 Chunks
Type,float64,numpy.ndarray


In [26]:
a2.sum()

Unnamed: 0,Array,Chunk
Bytes,8 B,8 B
Shape,(),()
Count,3 Graph Layers,1 Chunks
Type,float64,numpy.ndarray
Array Chunk Bytes 8 B 8 B Shape () () Count 3 Graph Layers 1 Chunks Type float64 numpy.ndarray,,

Unnamed: 0,Array,Chunk
Bytes,8 B,8 B
Shape,(),()
Count,3 Graph Layers,1 Chunks
Type,float64,numpy.ndarray


In [27]:
a3 = a2 + 1
dask.compute(a2.sum(), a3)

(10.0, array([2., 2., 2., 2., 2., 2., 2., 2., 2., 2.]))

Chaining together function calls with `dask.array` is very similar to what we did with `dask.delayed`: it simply builds up the task graph. However, now we get to use the ubiquitous NumPy API.

In [28]:
x = da.ones((25000, 25000), chunks=(5000, 5000))
x

Unnamed: 0,Array,Chunk
Bytes,4.66 GiB,190.73 MiB
Shape,"(25000, 25000)","(5000, 5000)"
Count,1 Graph Layer,25 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 4.66 GiB 190.73 MiB Shape (25000, 25000) (5000, 5000) Count 1 Graph Layer 25 Chunks Type float64 numpy.ndarray",25000  25000,

Unnamed: 0,Array,Chunk
Bytes,4.66 GiB,190.73 MiB
Shape,"(25000, 25000)","(5000, 5000)"
Count,1 Graph Layer,25 Chunks
Type,float64,numpy.ndarray


In [29]:
y = x + x.T
z = da.mean(y[::2, :5000:2], axis=1)

In [30]:
z.visualize(engine="cytoscape", color="order")

CytoscapeWidget(cytoscape_layout={'name': 'dagre', 'rankDir': 'BT', 'nodeSep': 10, 'edgeSep': 10, 'spacingFact…

In [31]:
z.visualize(engine="cytoscape", color="order", optimize_graph=True)

CytoscapeWidget(cytoscape_layout={'name': 'dagre', 'rankDir': 'BT', 'nodeSep': 10, 'edgeSep': 10, 'spacingFact…

In [32]:
z

Unnamed: 0,Array,Chunk
Bytes,97.66 kiB,19.53 kiB
Shape,"(12500,)","(2500,)"
Count,6 Graph Layers,5 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 97.66 kiB 19.53 kiB Shape (12500,) (2500,) Count 6 Graph Layers 5 Chunks Type float64 numpy.ndarray",12500  1,

Unnamed: 0,Array,Chunk
Bytes,97.66 kiB,19.53 kiB
Shape,"(12500,)","(2500,)"
Count,6 Graph Layers,5 Chunks
Type,float64,numpy.ndarray


In [33]:
z.compute()

array([2., 2., 2., ..., 2., 2., 2.])

dask.dataframe
==============

The NumPy/dask.array relationship is mirrored for Pandas with dask.dataframe. DataFrames(Series) in dask.dataframe are partitioned and lazily evalualated Pandas DataFrames(Series). The data nodes in a task graph are pandas objects.

<center>
<img src="https://docs.dask.org/en/stable/_images/dask-dataframe.svg" width="35%">
</center>

In [34]:
from dask.datasets import timeseries

In [35]:
ddf = timeseries()

In [36]:
ddf

Unnamed: 0_level_0,id,name,x,y
npartitions=30,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
2000-01-01,int64,object,float64,float64
2000-01-02,...,...,...,...
...,...,...,...,...
2000-01-30,...,...,...,...
2000-01-31,...,...,...,...


In [45]:
ddf.groupby("name")[["x", "y"]].mean().compute()

Unnamed: 0_level_0,x,y
name,Unnamed: 1_level_1,Unnamed: 2_level_1
Alice,-0.000339,0.001541
Bob,-0.001554,-0.0001
Charlie,-0.001189,0.005744
Dan,0.001327,0.000629
Edith,0.000268,-8.7e-05
Frank,-0.001845,0.000122
George,0.001186,-0.000571
Hannah,0.000356,-0.004046
Ingrid,0.000678,0.000631
Jerry,-0.002163,0.000935


distributed
===========

So far we've been using the default execution engine at each `.compute()` call: we're using all discovered threads. This parallelizes computation on a laptop, for example.

The distributed engine uses a client + cluster model, where the cluster is composed of a scheduler and any number of workers.

<center>
<img src="images/distributed.png" width=60%"/>
</center>

In [38]:
from distributed import Client, LocalCluster
cluster = LocalCluster()

2022-09-15 11:34:22,525 - distributed.diskutils - INFO - Found stale lock file and directory '/var/folders/g1/wjgd2b0x2l538zwy6f6nx8qc0000gp/T/dask-worker-space/worker-i43qeu2b', purging
2022-09-15 11:34:22,527 - distributed.diskutils - INFO - Found stale lock file and directory '/var/folders/g1/wjgd2b0x2l538zwy6f6nx8qc0000gp/T/dask-worker-space/worker-szdr9fnu', purging
2022-09-15 11:34:22,527 - distributed.diskutils - INFO - Found stale lock file and directory '/var/folders/g1/wjgd2b0x2l538zwy6f6nx8qc0000gp/T/dask-worker-space/worker-f6j615uk', purging
2022-09-15 11:34:22,528 - distributed.diskutils - INFO - Found stale lock file and directory '/var/folders/g1/wjgd2b0x2l538zwy6f6nx8qc0000gp/T/dask-worker-space/worker-d4otld7u', purging
2022-09-15 11:34:22,529 - distributed.diskutils - INFO - Found stale lock file and directory '/var/folders/g1/wjgd2b0x2l538zwy6f6nx8qc0000gp/T/dask-worker-space/worker-zc31j7io', purging


In [39]:
cluster

0,1
Dashboard: http://127.0.0.1:8787/status,Workers: 5
Total threads: 10,Total memory: 16.00 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:62181,Workers: 5
Dashboard: http://127.0.0.1:8787/status,Total threads: 10
Started: Just now,Total memory: 16.00 GiB

0,1
Comm: tcp://127.0.0.1:62205,Total threads: 2
Dashboard: http://127.0.0.1:62206/status,Memory: 3.20 GiB
Nanny: tcp://127.0.0.1:62186,
Local directory: /var/folders/g1/wjgd2b0x2l538zwy6f6nx8qc0000gp/T/dask-worker-space/worker-hrpo4jxf,Local directory: /var/folders/g1/wjgd2b0x2l538zwy6f6nx8qc0000gp/T/dask-worker-space/worker-hrpo4jxf

0,1
Comm: tcp://127.0.0.1:62199,Total threads: 2
Dashboard: http://127.0.0.1:62201/status,Memory: 3.20 GiB
Nanny: tcp://127.0.0.1:62185,
Local directory: /var/folders/g1/wjgd2b0x2l538zwy6f6nx8qc0000gp/T/dask-worker-space/worker-u7krfizy,Local directory: /var/folders/g1/wjgd2b0x2l538zwy6f6nx8qc0000gp/T/dask-worker-space/worker-u7krfizy

0,1
Comm: tcp://127.0.0.1:62211,Total threads: 2
Dashboard: http://127.0.0.1:62212/status,Memory: 3.20 GiB
Nanny: tcp://127.0.0.1:62188,
Local directory: /var/folders/g1/wjgd2b0x2l538zwy6f6nx8qc0000gp/T/dask-worker-space/worker-7gnmven4,Local directory: /var/folders/g1/wjgd2b0x2l538zwy6f6nx8qc0000gp/T/dask-worker-space/worker-7gnmven4

0,1
Comm: tcp://127.0.0.1:62200,Total threads: 2
Dashboard: http://127.0.0.1:62202/status,Memory: 3.20 GiB
Nanny: tcp://127.0.0.1:62184,
Local directory: /var/folders/g1/wjgd2b0x2l538zwy6f6nx8qc0000gp/T/dask-worker-space/worker-l2u0a_r4,Local directory: /var/folders/g1/wjgd2b0x2l538zwy6f6nx8qc0000gp/T/dask-worker-space/worker-l2u0a_r4

0,1
Comm: tcp://127.0.0.1:62208,Total threads: 2
Dashboard: http://127.0.0.1:62209/status,Memory: 3.20 GiB
Nanny: tcp://127.0.0.1:62187,
Local directory: /var/folders/g1/wjgd2b0x2l538zwy6f6nx8qc0000gp/T/dask-worker-space/worker-a65qwysm,Local directory: /var/folders/g1/wjgd2b0x2l538zwy6f6nx8qc0000gp/T/dask-worker-space/worker-a65qwysm


In [40]:
client = Client(cluster)

In [41]:
client

0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:8787/status,

0,1
Dashboard: http://127.0.0.1:8787/status,Workers: 5
Total threads: 10,Total memory: 16.00 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:62181,Workers: 5
Dashboard: http://127.0.0.1:8787/status,Total threads: 10
Started: Just now,Total memory: 16.00 GiB

0,1
Comm: tcp://127.0.0.1:62205,Total threads: 2
Dashboard: http://127.0.0.1:62206/status,Memory: 3.20 GiB
Nanny: tcp://127.0.0.1:62186,
Local directory: /var/folders/g1/wjgd2b0x2l538zwy6f6nx8qc0000gp/T/dask-worker-space/worker-hrpo4jxf,Local directory: /var/folders/g1/wjgd2b0x2l538zwy6f6nx8qc0000gp/T/dask-worker-space/worker-hrpo4jxf

0,1
Comm: tcp://127.0.0.1:62199,Total threads: 2
Dashboard: http://127.0.0.1:62201/status,Memory: 3.20 GiB
Nanny: tcp://127.0.0.1:62185,
Local directory: /var/folders/g1/wjgd2b0x2l538zwy6f6nx8qc0000gp/T/dask-worker-space/worker-u7krfizy,Local directory: /var/folders/g1/wjgd2b0x2l538zwy6f6nx8qc0000gp/T/dask-worker-space/worker-u7krfizy

0,1
Comm: tcp://127.0.0.1:62211,Total threads: 2
Dashboard: http://127.0.0.1:62212/status,Memory: 3.20 GiB
Nanny: tcp://127.0.0.1:62188,
Local directory: /var/folders/g1/wjgd2b0x2l538zwy6f6nx8qc0000gp/T/dask-worker-space/worker-7gnmven4,Local directory: /var/folders/g1/wjgd2b0x2l538zwy6f6nx8qc0000gp/T/dask-worker-space/worker-7gnmven4

0,1
Comm: tcp://127.0.0.1:62200,Total threads: 2
Dashboard: http://127.0.0.1:62202/status,Memory: 3.20 GiB
Nanny: tcp://127.0.0.1:62184,
Local directory: /var/folders/g1/wjgd2b0x2l538zwy6f6nx8qc0000gp/T/dask-worker-space/worker-l2u0a_r4,Local directory: /var/folders/g1/wjgd2b0x2l538zwy6f6nx8qc0000gp/T/dask-worker-space/worker-l2u0a_r4

0,1
Comm: tcp://127.0.0.1:62208,Total threads: 2
Dashboard: http://127.0.0.1:62209/status,Memory: 3.20 GiB
Nanny: tcp://127.0.0.1:62187,
Local directory: /var/folders/g1/wjgd2b0x2l538zwy6f6nx8qc0000gp/T/dask-worker-space/worker-a65qwysm,Local directory: /var/folders/g1/wjgd2b0x2l538zwy6f6nx8qc0000gp/T/dask-worker-space/worker-a65qwysm


The ecosystem of clusters is large!

- [dask-jobqueue](https://jobqueue.dask.org/en/latest/index.html) (HTCondor, Slurm, PBS, and more)
- [dask-cloudprovider](https://docs.dask.org/en/latest/deploying-cloud.html)
- [dask-kubernetes](https://kubernetes.dask.org/en/latest/)
- [dask-gateway](https://gateway.dask.org/)
- ...

Simplified example:

```python
from distributed import Client
from dask_jobqueue import HTCondorCluster
cluster = HTCondorCluster(memory="2GB")
cluster.scale(10)
client = Client(cluster)

discovery = client.compute(my_big_task)
```

**Note for folks using binder**: the dashboard part of the live tutorial will be unavailable

In [44]:
z.compute()

array([2., 2., 2., ..., 2., 2., 2.])

Adding to the ecosystem!
========================

[dask-awkward](https://github.com/ContinuumIO/dask-awkward) and [dask-histogram](https://github.com/dask-contrib/dask-histogram): Native support for awkward arrays and boost-histogram objects in Dask.