# A HEP analysis using dask

In this notebook, we utilize dask alongside [uproot](https://uproot.readthedocs.io/en/latest/), [awkward](https://awkward-array.readthedocs.io/en/latest/), and [hist](https://github.com/scikit-hep/hist/) to analyze some CMS Run I open data.

This analysis touches on a lot of the previous material, using dask delayed, bag, and dataframe objects in unison.
The complicated task graph that we eventually build and run will allow us to discuss some of the visualization and monitoring tools to inspect workflow progress.

In [None]:
import distributed

client = distributed.Client()
client

We need to teach dask about our HEP-specific data types, so it can make intelligent decisions about when to cache and when to reproduce intermediate results. The `patch.py` module in this directory does this for us by registering the appropriate object size functions with the `dask.sizeof` utility.

In [None]:
import patch
client.upload_file("patch.py")

Our inputs consist of 20 open data files from CMS data collection in 2012. We set up a delayed routine to fetch the `Events` tree out of each file.

In [None]:
import dask
from dask import delayed
import uproot


@delayed(pure=True)
def get_tree(url):    
    return uproot.open(url)["Events"]

urls = [
    f"root://eospublic.cern.ch//eos/root-eos/benchmark/CMSOpenDataDimuon/Run2012BC_DoubleMuParked_Muons_{i}.root"
    for i in range(1, 21)
]
inputs = [get_tree(url) for url in urls]
inputs[0]

We can compute one input and bring it back to our client to inspect the available columns in our data

In [None]:
inputs[0].compute().show()

With this information in hand, we can create an awkward array structure representing these muons, as Lorentz vectors with a charge attribute.

In [None]:
import awkward as ak
# NB: https://github.com/scikit-hep/vector is now in beta
from coffea.nanoevents.methods import vector


@delayed(pure=True)
def muon_struct(tree, entry_start, entry_stop):
    def get(name):
        return tree[name].array(entry_start=entry_start, entry_stop=entry_stop)

    return ak.zip(
        {
            "pt": get("Muon_pt"),
            "eta": get("Muon_eta"),
            "phi": get("Muon_phi"),
            "mass": get("Muon_mass"),
            "charge": get("Muon_charge"),
        },
        with_name="PtEtaPhiMLorentzVector",
        behavior=vector.behavior,
    )

When handling large arrays, we need to be careful to make sure that the chunks each have appropriate data volumes, namely somewhere on the order of 1-100 megabytes. We need to chunk the files (each being over 2 GB) and a convenient way to do that with uproot is to set entry (collision event) ranges. But first, we need to know how many events are in each file:

In [None]:
nevents, = dask.compute([t.num_entries for t in inputs])
nevents

Now we can declare a list of delayed objects representing chunks of event-muons:

In [None]:
import numpy as np

def chunks(n, target_size):
    edges = np.linspace(0, n, n // target_size, dtype=int)
    return zip(edges[:-1], edges[1:])

chunksize = 400_000
muons = [
    muon_struct(tree, start, stop)
    for tree, nev in zip(inputs, nevents)
    for start, stop in chunks(nev, chunksize)
]
len(muons), muons[0]

[persist](https://docs.dask.org/en/latest/api.html?highlight=persist#dask.persist) turns lazy Dask collections into Dask collections with the same metadata, but now with their results fully computed or actively computing in the background.

Let's persist the first chunk in the cluster so that we can quickly use it in subsequent testing. We'll also explicitly pull its value back to the client to do some preliminary inspection with matplotlib, plotting the number of muons per event as well as the invariant mass of dimuon pairs from events with exactly two muons.

In [None]:
muons[0] = muons[0].persist()
muons_chunk = muons[0].compute()

In [None]:
import matplotlib.pyplot as plt
import numpy as np

plt.hist(ak.num(muons_chunk), bins=range(6));

In [None]:
plt.hist(muons_chunk[ak.num(muons_chunk)==2].sum().mass, bins=np.geomspace(0.2, 200, 200));
plt.gca().set_xscale("log")

These plots look nice, but we'd rather be filling histograms in our distributed cluster, aggregating them, and returning the total back to the client. Let's see how we can do that with dask:

In [None]:
import hist

@delayed(pure=True)
def nmuons_plot(muons_chunk):
    return (
        hist.Hist.new
        .Reg(6, 0, 6, name="nMuons")
        .Double()
        .fill(ak.num(muons_chunk))
    )

@delayed(pure=True)
def filter_muons(muons_chunk):
    return muons_chunk[
        (ak.num(muons_chunk)==2)
        & (ak.sum(muons_chunk.charge, axis=1) == 0)
    ]

@delayed(pure=True)
def dimuon_cand(muons_chunk):
    return muons_chunk.sum()

@delayed(pure=True)
def mass_plot(cand_chunk):
    return (
        hist.Hist.new
        .Log(1000, 0.2, 200, name="mass", label="Di-muon invariant mass")
        .Double()
        .fill(cand_chunk.mass)
    )

We can use `dask.compute` to compute several delayed objects at once instead of calling the `.compute()` method on each in turn. This allows the scheduler to recognize shared components in the computation graph and ensure they are not executed more than necessary.

In [None]:
test_nmu, test_mass = dask.compute(
    nmuons_plot(muons[0]),
    mass_plot(dimuon_cand(filter_muons(muons[0])))
)
display(test_nmu, test_mass)

Suppose, in addition to the plots above, we also want to save a reduced dataset of dimuon candidate events, with each constituent muon's kinematic attributes corresponding to a column in a table. We can construct a pandas dataframe from our filtered muons and composite candidate:

In [None]:
import pandas as pd

@delayed(pure=True)
def cand_table(muons_chunk, cand_chunk):
    mupos = ak.firsts(muons_chunk[muons_chunk.charge == 1])
    muneg = ak.firsts(muons_chunk[muons_chunk.charge == -1])
    return pd.DataFrame({
        "mass": cand_chunk.mass,
        "pt": cand_chunk.pt,
        "mu+_pt": mupos.pt,
        "mu+_eta": mupos.eta,
        "mu+_phi": mupos.phi,
        "mu-_pt": muneg.pt,
        "mu-_eta": muneg.eta,
        "mu-_phi": muneg.phi,
    })


cand_table(
    filter_muons(muons[0]),
    dimuon_cand(filter_muons(muons[0])),
).compute()

As mentioned above, when calling `dask.compute` on multiple delayed objects, the scheduler first optimizes the task graph. Let's prove this. The `results` method creates the three delayed results we are interested in without any attempt to re-use shared components, as opposed to the `results_opt` method. Yet, if we visualize the task graph for either method applied to the first chunk of muon-events, we see no difference.

In [None]:
def results(muons_chunk):
    return (
        nmuons_plot(muons_chunk),
        mass_plot(dimuon_cand(filter_muons(muons_chunk))),
        cand_table(
            filter_muons(muons_chunk),
            dimuon_cand(filter_muons(muons_chunk)),
        ),
    )

def results_opt(muons_chunk):
    filtered = filter_muons(muons_chunk)
    cand = dimuon_cand(filtered)
    return (
        nmuons_plot(muons_chunk),
        mass_plot(cand),
        cand_table(filtered, cand),
    )

dask.visualize(results(muons[0]), optimize_graph=True)  # dask.compute always optimizes the graph

Now lets reduce the histogram results by using the dask bag [fold](https://docs.dask.org/en/latest/bag-api.html#dask.bag.Bag.fold) method and save our reduced dataset to parquet files using the dask dataframe [to_parquet](https://docs.dask.org/en/latest/dataframe-api.html?#dask.dataframe.to_parquet) after some additional filtering to only consider Z boson candidates (i.e. in a mass window 60-120 GeV).

We do have to work a bit harder to construct a dask bag from delayed objects since it expects lists of items, and our results are `hist.Hist` objects. By adding a simple intermediate function `to_list` to group results together, we can then use [from_delayed](https://docs.dask.org/en/latest/bag-api.html#dask.bag.from_delayed). It may seem that `from_sequence` might do what we want, but be warned this will not properly treat sequences of delayed objects!

In [None]:
import dask.dataframe as dd
import dask.bag as db
from operator import add

@delayed(pure=True)
def to_list(*args):
    return list(args)


result_set = np.array([results(chunk) for chunk in muons[:10]])
nmuons_final = db.from_delayed(
    to_list(*result_set[start:stop, 0])
    for start, stop in chunks(len(result_set), 4)
).fold(add)
mass_final = db.from_delayed(
    to_list(*result_set[start:stop, 1])
    for start, stop in chunks(len(result_set), 4)
).fold(add)
table_full = dd.from_delayed(result_set[:, 2])
table_skim = (
    table_full[abs(table_full.mass - 90.0) < 30.0]
    .repartition(2)
    .to_parquet("zmmtable", compute=False)
)

In [None]:
dask.visualize((nmuons_final, mass_final, table_skim), optimize_graph=True)

While this computation is running, we'll explore the dask dashboard (URL listed in `client` repr at the top of the notebook)
This can also be seen from the jupyter lab by pasting the URL into the labextension pane.

In [None]:
res, = dask.compute((nmuons_final, mass_final, table_skim))

In [None]:
fig, ax = plt.subplots()
res[0].plot(ax=ax)
ax.set_xlabel("Number of muons")
ax.set_ylabel("Event counts")

In [None]:
fig, ax = plt.subplots()
res[1].plot(ax=ax)
ax.set_xscale("log")
ax.set_xlabel("Di-muon invariant mass [GeV]")
ax.set_ylabel("Event counts")

In [None]:
pd.read_parquet("zmmtable")

Before closing our client, take a look at the memory usage.
We only hold a persistent reference to `muons[0]` at this point, yet the memory usage is high. After restarting, it drops.
The main culprit in this instance is not leaks but rather the way that memory allocations work: memory is requested from the operating system by worker processes to satisfy peak usage, and is not relinquished immediately in all cases.

In [None]:
client.restart()

In [None]:
client.close()