# dask-awkward: Early Status Demo

**This is a demo of pre-release software!**

The [dask-awkward](https://github.com/ContinuumIO/dask-awkward) project provides a native Dask collection and recreation of the Awkward Array API with Dask compatibility.

In [None]:
import awkward._v2 as ak
import dask_awkward as dak
import distributed
import fsspec

 We're going to use Dask's distributed execution engine so we can see the computation in action with either the [distributed dashboard](https://docs.dask.org/en/stable/diagnostics-distributed.html) or Dask's [jupyterlab extension](https://github.com/dask/dask-labextension).

In [None]:
client = distributed.Client()
client

## Toy dataset

For the demo we'll use a dataset of Higgs bosons decaying to two $Z$-bosons decaying leptonically to muons ($H\rightarrow ZZ \rightarrow\mu\mu\mu\mu$). In this simulated physics sample all 4 muons are not always detected, so we will have non-rectilinear data. We have 50 compressed files containing line delimeted JSON records (one record is one event, many records per file). We'll use a wildcard to grab all of them.

The current state of dask-awkward supports reading JSON files via `dak.from_json`.

First we'll eagerly load a small part of the dataset to see what data we're working with:

In [None]:
import ujson as json

single_file = "./data/higgs.000.json.gz"
with fsspec.open(single_file, compression="infer") as f:
    eager = ak.from_iter([json.loads(line) for line in f])

In [None]:
eager

In [None]:
eager[10].tolist()

In [None]:
eager[ak.num(eager.muons, axis=1) == 4]

Now let's stage loading the whole dataset with dask-awkward.

In [None]:
dataset = dak.from_json("./data/higgs*json.gz")

In [None]:
dataset

We see that we have a dask-awkward collection containing 50 partitions (one for each file)

# High level analysis example

We'll build up a task graph with the following steps:
- Check how many muons are in each event.
- Create a boolean array for events with exactly two opposite sign opposite flavor muons
- Grab events from the original dataset passing that selection.
- Calculate the invariant mass of the pair.
- Histogram that value.

`dak.num` is the dask-awkward version of `ak.num`

In [None]:
# check the total number of muons in each event
n_muons = dak.num(dataset.muons, axis=1)

In [None]:
n_muons.visualize()

Much of the awkward slicing/getitem API is supported; we'll use that for defining the selection

In [None]:
n_muons.dask.visualize()

In [None]:
n_muons.dask.visualize(color="layer_type")

In [None]:
muon_pairs = dataset.muons[n_muons == 2]
oppo_sign = muon_pairs.charge[:, 0] + muon_pairs.charge[:, 1] == 0
selected_muons = muon_pairs[oppo_sign]

In [None]:
selected_muons.visualize()

In [None]:
selected_muons.dask.visualize(color="layer_type")

In [None]:
mu1 = selected_muons[:, 0]
mu2 = selected_muons[:, 1]

In [None]:
m = np.sqrt(
    mu1.pt * mu2.pt * 2 * (np.cosh(mu1.eta - mu2.eta) - np.cos(mu1.phi - mu2.phi))
)

In [None]:
m.dask.visualize()

In [None]:
import dask_histogram as dh
import boost_histogram as bh

h = dh.histogram(
    m,
    bins=120,
    range=(0, 120),
    histogram=bh.Histogram,
)

In [None]:
h.visualize(optimize_graph=True)

In [None]:
import hist
h = hist.Hist(h.compute())
h.plot1d()

## Some other features, looking ahead.

We'd like to fail as quickly as possibly (i.e. before compute time), when we have enough knowledge about the expected structure of the concrete awkward array that would result from the dask-awkward collection, we can do that.

In [None]:
dataset.fields

In [None]:
dataset.muons.fields

In [None]:
dataset.muons[["pt", "eta", "energy"]]  # will raise _before_ compute time thanks to the typetracer knowing the fields.

In [None]:
dataset.muons[["pt", "eta"]] # this works

In [None]:
selection1 = dataset.muons.charge[:, 0] = dataset.muons.charge[:, 1] == 0