# Demo: `DAOD_PHYSLITE` analysis with uproot/awkward on jupyterhub on GCP

<div class="alert alert-info">
Note: This tutorial is targeted at users interested in R&D and technical details. Much of this is still in early development/prototyping.
</div>

The image that runs on jupyterhub and the dask workers is defined by the following Dockerfile:

https://github.com/gcp4hep/analysis-cluster/blob/16fb374fe26948081cf3f3b02117d05366d96520/daskhub/docker/jupyter-physlite/Dockerfile

## Read and process PHYSLITE using uproot/awkward

First, let's start with some general notes on reading `DAOD_PHYSLITE`

The PHYSLITE ROOT files currently follow a similar structure as regular ATLAS xAODs

They containing several trees, where the one holding the actual data is called `CollectionTree`. The others contain various forms of Metadata.

In [None]:
import uproot
import awkward as ak
import numpy as np
import matplotlib.pyplot as plt

In [None]:
f = uproot.open("data/DAOD_PHYSLITE_21.2.108.0.art.pool.root")

In [None]:
f.keys()

### 1-D vectors
* All branches are stored with the **highest split level**
* In most cases data stored in branches called `Aux.<something>` or `AuxDyn.<something>`
* Typically **vectors of fundamental types**, like e.g. pt/eta/phi of particle collections
* **can be read into numpy arrays efficiently using uproot** since data stored as contiguous blocks  
(except for the 10-byte vector headers whoose positions are known from ROOT's event offsets)

In [None]:
f["CollectionTree"].show("/AnalysisElectronsAuxDyn.(pt|eta|phi)$/i", name_width=30, interpretation_width=50)

### ElementLinks

The most relevant exception to this: `ElementLink` branches:

* provide cross references into other collections
* **often 2-dimensional** (`vector<vector<ElementLink<...>>>`)
* data part (`ElementLink`) is serialized as a **structure of 2 32bit unsigned integers**:
  * hash `m_persKey`, identifying the target collection
  * index `m_persIndex` identifying the array-index of the corresponding particle in the target collection.

In [None]:
f["CollectionTree/AnalysisElectronsAuxDyn.trackParticleLinks"].typename

In [None]:
for element in f.file.streamer_named("ElementLinkBase").elements:
    print(f"{element.member('fName')}: {element.member('fTypeName')}")

Uproot can read this, but the loop that deserializes the data is done in python and therefore slow.

This is not relevant for this very small file, but becomes important for larger files.

This can be handled by [AwkwardForth](https://doi.org/10.1051/epjconf/202125103002) which is however currently (November 2021) not yet integrated with uproot.

For now we can use a custom function `branch_to_array` to do this:

In [None]:
from physlite_experiments.deserialization_hacks import branch_to_array

In [None]:
branch_to_array(f["CollectionTree/AnalysisElectronsAuxDyn.trackParticleLinks"])

One can actually see a significant improvement already for the small file with only 40 events!

In [None]:
%%timeit
# using standard uproot
f.file.array_cache.clear()
f["CollectionTree/AnalysisElectronsAuxDyn.trackParticleLinks"].array()

In [None]:
%%timeit
# using numba
f.file.array_cache.clear()
branch_to_array(f["CollectionTree/AnalysisElectronsAuxDyn.trackParticleLinks"])

In [None]:
%%timeit
# using awkward forth
f.file.array_cache.clear()
branch_to_array(f["CollectionTree/AnalysisElectronsAuxDyn.trackParticleLinks"], use_forth=True)

## Integration with `coffea.nanoevents`

The PHYSLITE schema and the corresponding behavior classes are still under development - [CoffeaTeam/coffea#540](https://github.com/CoffeaTeam/coffea/issues/540) tracks the progress of some TODO items.

For more information on `NanoEvents` see the [NanoEvents tutorial](https://github.com/CoffeaTeam/coffea/blob/master/binder/nanoevents.ipynb) or [Nick Smith's presentation](https://youtu.be/udzkE6t4Mck) at the [pyHEP 2020](https://indico.cern.ch/event/882824).

<div class="alert alert-block alert-success">
    <b>The Goal:</b>
    <ul>
        <li>Work with object-oriented event data models, but stick to the array-at-a-time processing paradigm.<br> → Struct/Object of arrays instead of Array of structs/objects</li>
        <li>Hide the details from the user</li>
    </ul>
</div>

In [None]:
from coffea.nanoevents import NanoEventsFactory, PHYSLITESchema

# patch nanoevents to use the custom branch_to_array function
from physlite_experiments.deserialization_hacks import patch_nanoevents
patch_nanoevents()

In [None]:
factory = NanoEventsFactory.from_root(
    "data/DAOD_PHYSLITE_21.2.108.0.art.pool.root",
    "CollectionTree",
    schemaclass=PHYSLITESchema
)
events = factory.events()

This groups particles and the available properties conveniently under one central `event` array

* everything is lazy loading
* cross referencing via ElementLinks already implemented for some collections
* particles behave as LorentzVectors (can add them, calculate invariant masses and much more)

See [my tutorial at the IRIS-HEP AGC tools workshop 2021](https://github.com/nikoladze/agc-tools-workshop-2021-physlite) for more technical details

In [None]:
events.Electrons

In [None]:
events.Electrons.fields

In [None]:
events.Electrons.trackParticles

In [None]:
events.Electrons.trackParticles.z0

In [None]:
events.Electrons[events.Electrons.pt > 10000].trackParticles

In [None]:
events.TruthElectrons.parents

In [None]:
events.TruthElectrons.parents.children.pdgId

In [None]:
events.TruthElectrons.parents.children.pdgId.ndim

## Read data via HTTPS from google cloud storage (authentication via rucio)

We will use the following functions to authenticate to rucio and get signed urls on google cloud storage (GCS).

In [None]:
from utils import setup_rucio_and_proxy, get_signed_url, get_signed_url_worker

For that we have to provide a VOMS proxy. To avoid the need for having the grid certificate and the voms tools on this jupyterhub instance we create the voms proxy outside (some machine where we have the voms tools and our grid certificate) and upload it to this notebook:

In [None]:
from ipywidgets import FileUpload
upload = FileUpload()
display(upload)

Also, fill in your CERN account name here:

In [None]:
RUCIO_ACCOUNT="nihartma"

Then we setup the nescessary environment variables (fill in your cern account name):

In [None]:
setup_rucio_and_proxy(upload.data[-1], rucio_account=RUCIO_ACCOUNT)

Now we should be able to query rucio:

In [None]:
import rucio.client
rucio_client = rucio.client.Client(ca_cert=False)

*Note: we should probably install the CERN CA files into the container in the future such that we don't need to run with `ca_cert=False`*

Let's get a list of all files in one data period, corresponding to around 10% of the whole Run2 data - around 10TB in total:

In [None]:
files = list(rucio_client.list_files("data17_13TeV", "data17_13TeV.periodK.physics_Main.PhysCont.DAOD_PHYSLITE.grp17_v01_p4309"))

In [None]:
files[0]

In [None]:
sum(file["bytes"] for file in files) / 1024 ** 4

The full Run2 dataset is replicated to GCS. To access it via https we can ask rucio for a signed url. Uproot can directly deal with http(s) urls:

In [None]:
url = get_signed_url(rucio_client, files[0]["scope"], files[0]["name"])

In [None]:
f_remote = uproot.open(url)

In [None]:
f_remote["CollectionTree/AnalysisElectronsAuxDyn.pt"].array()

Some notes on this:

* GCS does not support multi-range requests (equivalent to xrootd vector reads), single-range requests are allowed
* Single-range requests with the uproot `MultithreadedHTTPSource` are suboptimal
* GCS seems fine with a huge number of parallel requests - this can be done with asyncio
* However, oftentimes downloading the whole file is still faster async reading of partial chunks (but needs lot's of memory)

In [None]:
import requests

def download(url):
    return requests.get(url).content

In [None]:
data = download(url)

In [None]:
import io

uproot.open(io.BytesIO(data))["CollectionTree/AnalysisElectronsAuxDyn.pt"].array()

I have an experimental implementation for an asyncio HTTPSource for uproot (should probably make a PR for uproot at some point or consider using an interface to fsspec which has a `cat_ranges` method that might be used for this).

GCS seems fine with 100 parallel tcp connections (even for each worker on a larger cluster):

In [None]:
from physlite_experiments.io import AIOHTTPSource

class AIOHTTP100Source(AIOHTTPSource):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, tcp_connection_limit=100, **kwargs)

In [None]:
uproot.open(url, http_handler=AIOHTTP100Source)["CollectionTree/AnalysisElectronsAuxDyn.pt"].array()

## Run an actual analysis with this

In [None]:
import warnings
with warnings.catch_warnings():
    warnings.simplefilter("ignore")
    factory = NanoEventsFactory.from_root(url, "CollectionTree", schemaclass=PHYSLITESchema, uproot_options=dict(http_handler=AIOHTTP100Source))

In [None]:
events = factory.events()

We are going to use the example from my [vCHEP presention](https://doi.org/10.1051/epjconf/202125103001) that tries to reproduce some "[SUSYTools](https://gitlab.cern.ch/atlas/athena/tree/master/PhysicsAnalysis/SUSYPhys/SUSYTools)-like" object selections and overlap removal for electrons, muons and jets:

In [None]:
from physlite_experiments.analysis_example import get_obj_sel

In [None]:
events_decorated = get_obj_sel(events)

This will create the fields `passOR`, `signal`, `baseline` to indicate which objects pass these selections

In [None]:
events_decorated.Electrons.signal

In [None]:
events_decorated.Jets.passOR

Delta-R between electrons and jets without overlap removal:

In [None]:
exx, jxx = ak.unzip(ak.cartesian([events_decorated.Jets, events_decorated.Electrons]))
plt.hist(ak.flatten(exx.delta_r(jxx)).to_numpy(), bins=100);

With overlap removal:

In [None]:
electrons_pass = events_decorated.Electrons[events_decorated.Electrons.passOR]
jets_pass = events_decorated.Jets[events_decorated.Jets.passOR]
exx, jxx = ak.unzip(ak.cartesian([electrons_pass, jets_pass]))
plt.hist(ak.flatten(exx.delta_r(jxx)).to_numpy(), bins=100);

For this simple demonstration, let's just count the number of objects passing the criteria:

In [None]:
def run_analysis(events):
    events = get_obj_sel(events)
    return  {
        collection: {
            flag : ak.count_nonzero(events[collection][flag])
            for flag in ["baseline", "passOR", "signal"]
        } for collection in ["Electrons", "Muons", "Jets"]
    }

In [None]:
def merge(results):
    out = {
        collection: {
            flag: 0
            for flag in ["baseline", "passOR", "signal"]
        } for collection in ["Electrons", "Muons", "Jets"]
    }
    for result in results:
        for collection, flags in result.items():
            for flag, count in flags.items():
                out[collection][flag] += count
    return out

In [None]:
result = run_analysis(events)

In [None]:
result

## Run on a dask cluster

Now we want to run this function in parallel on a large number of files. This jupyterhub instance also features a [dask-gateway](https://gateway.dask.org/) that allows us as a user to dynamically request a cluster on GCP nodes.

Some notes:

* All python modules need to be installed in the container running on the dask workers
* We don't have a shared filesystem on GCP -> all data has to be accessed from GCS (google cloud storage)

Also see Fernandos instructions on https://github.com/gcp4hep/analysis-cluster/wiki/Daskhub-usage

One can either create the cluster here, or in another notebook or terminal. We will choose the latter option and create and manage it in the notebook [`manage_cluster.ipynb`](manage_cluster.ipynb) and then connect here:

In [None]:
from dask_gateway import Gateway
gateway = Gateway()
clusters = gateway.list_clusters()
clusters

In [None]:
cluster = gateway.connect(clusters[0].name)

Drag & Drop the Dashboard url from the folloing cell to the dask plugin window on the left

In [None]:
cluster

In [None]:
#cluster.shutdown()

In [None]:
client = cluster.get_client()

In [None]:
client

The `utils.py` module is not in the docker container - therefore we need to make sure the workers have it. One way to do this is the `UploadFile` worker plugin:

In [None]:
from distributed.diagnostics.plugin import UploadFile

client.register_worker_plugin(UploadFile("utils.py"))

<div class="alert alert-info">
Unfortunately i currently see lot's of memory issues running in parallel with <code>coffea.nanoevents</code>, so for this part we use an old prototype of this where this seems to be less drastic.
</div>

One can also use [`coffea.processor`](https://github.com/CoffeaTeam/coffea/blob/master/binder/processor.ipynb) to run in parallel, but also this requires a bit of work to get it running with all the custom things we have in this notebook (namely the uproot source, the signed url retrieving)

In [None]:
from physlite_experiments.physlite_events import physlite_events
from physlite_experiments.utils import subdivide

In [None]:
import math

def run_old(url, max_chunksize=100000):
    array_cache = {}
    with uproot.open(
        url, http_handler=AIOHTTP100Source, array_cache=array_cache
    ) as f:
        tree = f["CollectionTree"]
        entry_start = 0
        results = []
        n = tree.num_entries
        for chunksize in subdivide(n, math.ceil(n / max_chunksize)):
            entry_stop = entry_start + chunksize
            events = physlite_events(
                tree, entry_start=entry_start, entry_stop=entry_stop
            )
            entry_start = entry_stop
            results.append(run_analysis(events))
            array_cache.clear()
    return merge(results)

In [None]:
run_old(url)

We have to extract the x509 proxy data to be able to serialize it to a function that is used on the workers. One could use the `UploadFile` worker plugin here alternatively.

In [None]:
x509_data = upload.data[-1]

In [None]:
def job(fileinfo):
    url = get_signed_url_worker(
        x509_data, fileinfo["scope"], fileinfo["name"], rucio_account=RUCIO_ACCOUNT, ca_cert=False
    )
    return run_old(url)

In [None]:
job(files[0])

We're using the `futures` api of dask which is rather low level but gives us the most control for these R&D studies. It mimics python's [`concurrent.futures`](https://docs.python.org/3/library/concurrent.futures.html) module.

The basic principle is: You submit a function and it's input arguments and get back a `Future` object (immediately):

In [None]:
future = client.submit(job, files[0])

When you call `.result()` it will block until the job is finished. One can chain multiple futures to create execution graphs. There is also [`dask.delayed`](https://docs.dask.org/en/stable/delayed.html) which is capable to abstractly create these graphs and exectute them on demand (potentially optimizing). Finally, there are dask collections for distributed equivalents of pandas DataFrames or numpy arrays and there is [one for awkward array in development](https://dask-awkward.readthedocs.io/en/latest/index.html). This will allow a higher level interface for all the things we are doing here in the future ...

In [None]:
future.result()

Now, let's scale the cluster to a larger number of workers (e.g. 128) and run on a subset of files, corresponding to roughly 1TB of data:

In [None]:
subset = files[::10]
sum(info["bytes"] for info in subset) / 1024 ** 4

In [None]:
futures = client.map(job, subset)

The following call will gather all results and block until all jobs are finished:

In [None]:
results = client.gather(futures)

In [None]:
results[:3]

In [None]:
total = merge(results)

In [None]:
total

## Don't forget to shutdown the cluster afterwards!

In [None]:
cluster.shutdown()