# Drag the blue LocalCluster box from the Dask tab to the cell below

# Math with Dask arrays

In [None]:
import dask
import dask.array as da
import numpy as np

In [None]:
x = da.array([1,2,3])
y = da.array([5,7,9])

f_xy = 5*x + y
f_xy.visualize()

So far we have not evaluated the function:

In [None]:
f_xy

We can do that with `.compute()` or call `dask.compute()`:

In [None]:
f_xy.compute()  # only now is the calculation being performed

Compare this to the following _eager_ approach, where the calculation runs immediately.

In [None]:
x_np = np.asarray([1,2,3])
y_np = np.asarray([5,7,9])

f_xy_np = 5*x_np + y_np
print(f_xy_np)

# Handling HEP data

In [None]:
import awkward as ak
import hist.dask
import uproot

In [None]:
fname = {"https://xrootd-local.unl.edu:1094//store/user/AGC/nanoAOD/TT_TuneCUETP8M1_13TeV-powheg-pythia8/"\
         "cmsopendata2015_ttbar_19980_PU25nsData2015v1_76X_mcRun2_asymptotic_v12_ext3-v1_00000_0000.root": "Events"}

# reading data eagerly, without Dask
with uproot.open(fname) as f:
    print(f["Electron_pt"].array())

This might take a few seconds to run. Let's now compare to building a Dask graph for this:

In [None]:
evts = uproot.dask(fname, steps_per_file=2)
arr = evts.Electron_pt

This still takes a bit of time to run as well! We are actually reading a bit of the file metadata here to know what is inside the file. This can be very helpful to know that some graphs will not work:

In [None]:
evts["some_variable_that_does_not_exist"]

In [None]:
arr.visualize(optimize_graph=True)

We could now call `.compute()` on the `arr` object, but let's go one step further and add histogram filling to our graph.

In [None]:
h = hist.dask.Hist.new.Regular(100, 0, 250, label="electron pT [GeV]").Double()
# note how this looks very similar to the non-Dask version
# hist.Hist.new.Regular(100, 0, 250, label="electron pT [GeV]").Double()

def my_data(evts):
    return ak.flatten(evts.Electron_pt)

task = h.fill(my_data(evts))

dask.visualize(task, optimize_graph=True)

In [None]:
h_computed, *_ = dask.compute(task)
h_computed.plot()

# The dashboard

Dask provides a dashboard with a lot of useful information: https://docs.dask.org/en/latest/dashboard.html.

You can access it from the JupyterHub menu: "File" -> "Launch Dask Dashboard Layout". You can also click one of the yellow buttons in the Dask tab on the left to get access to lots of additional graphs.

In [None]:
import dask.bag
import numpy as np
import time

Our data here is an array of numbers that we pack into a Dask `bag`.

In [None]:
NUM_TASKS = 8

x = np.linspace(1, NUM_TASKS, NUM_TASKS)
b = dask.bag.from_sequence(x, npartitions=len(x))

In [None]:
b.visualize()

In [None]:
def my_function(num):
    time.sleep(1)
    return num

futures = b.map(my_function)  # apply the function to each entry of the bag
mystery_task = futures.fold(lambda x,y: x+y, split_every=4)  # parallel reduction pattern

In [None]:
mystery_task.visualize(optimize_graph=True)

**What does this `mystery_task` do?** What result do we expect?

In [None]:
result, *_ = dask.compute(mystery_task)
print(result)

### Exercise

Look at the "Graph" visualization in the Dask dashboard. It shows the progres of the computation while the cell above is running. Change the task such that the graph becomes too large to be visualized in the dashboard! You might want to comment out the visualizations above in this notebook: they might also become huge!

# Using `coffea`

`coffea` provides us with a very convenient interface for handling our data through the provided schemas.

In [None]:
import warnings
warnings.filterwarnings("ignore")

from coffea.nanoevents import NanoEventsFactory, NanoAODSchema

In [None]:
evts = NanoEventsFactory.from_root(fname, schemaclass=NanoAODSchema).events()

evts.Electron

A full physics analysis might contain a lot of code, but we can organize it in functions in ways we find convenient.

In [None]:
def get_mass(electrons):
    """mass of the first two electrons in an event"""
    return (electrons[:, 0:1] + electrons[:, 1:2]).mass

def my_analysis(evts):
    """calculate the di-electron mass of events with exactly 2 electrons"""
    evts = evts[ak.num(evts.Electron, axis=1) == 2]
    mass = get_mass(evts.Electron)
    return mass

coffea_task = my_analysis(evts)

The full task graph for this (before optimization) is already containing quite a few nodes:

In [None]:
dask.visualize(coffea_task)

After optimization it is _much_ smaller:

In [None]:
dask.visualize(coffea_task, optimize_graph=True)

In [None]:
dask.compute(coffea_task)

Since we have our `coffea_task` as a graph, we can also be clever about which data to load from the files we are processing. Only data that is actually needed gets loaded.

In [None]:
import dask_awkward as dak
dak.necessary_columns(coffea_task)

### Exercise

Create a task graph that fills the electron mass into a histogram. Then execute that graph and visualize the histogram.

In [None]:
...  # your code