In [23]:
from coffea.nanoevents import NanoEventsFactory
from coffea.nanoevents.schemas import PFNanoAODSchema
import dask

def open_events():
    factory = NanoEventsFactory.from_root(
        {"file:./pfnano.root": "Events"},
        schemaclass=PFNanoAODSchema,
    )
    return factory.events()

In [44]:
open_events()

dask.awkward<from-uproot, npartitions=1>

In [2]:
from coffea.ml_tools.torch_wrapper import torch_wrapper
import awkward as ak
import dask_awkward
import numpy as np

class ParticleNetExample1(torch_wrapper):
    def prepare_awkward(self, events):
        jets = ak.flatten(events.Jet)

        def pad(arr):
            return ak.fill_none(
                ak.pad_none(arr, 100, axis=1, clip=True),
                0.0,
            )

        # Human readable version of what the inputs are
        # Each array is a N jets x 100 constituent array
        imap = {
            "points": {
                "deta": pad(jets.eta - jets.constituents.pf.eta),
                "dphi": pad(jets.delta_phi(jets.constituents.pf)),
            },
            "features": {
                "dr": pad(jets.delta_r(jets.constituents.pf)),
                "lpt": pad(np.log(jets.constituents.pf.pt)),
                "lptf": pad(np.log(jets.constituents.pf.pt / jets.pt)),
                "f1": pad(np.log(np.abs(jets.constituents.pf.d0) + 1)),
                "f2": pad(np.log(np.abs(jets.constituents.pf.dz) + 1)),
            },
            "mask": {
                "mask": pad(ak.ones_like(jets.constituents.pf.pt)),
            },
        }

        # Compacting the array elements into the desired dimension using
        # ak.concatenate
        retmap = {
            k: ak.concatenate([x[:, np.newaxis, :] for x in imap[k].values()], axis=1)
            for k in imap.keys()
        }

        # Returning everything using a dictionary. Also perform type conversion!
        # model(*a, **b)
        # return a, b
        return (), {
            "points": ak.values_astype(retmap["points"], "float32"),
            "features": ak.values_astype(retmap["features"], "float32"),
            "mask": ak.values_astype(retmap["mask"], "float16"),
        }


# Setting up the model container
pn_example1 = ParticleNetExample1("model.pt")

# Running on dask_awkward array
dask_events = open_events()
dask_results = pn_example1(dask_events)
print("Dask awkward results:", dask_results.compute())  # Runs file!

Issue: coffea.nanoevents.methods.vector will be removed and replaced with scikit-hep vector. Nanoevents schemas internal to coffea will be migrated. Otherwise please consider using that package!.
  from coffea.nanoevents.methods import vector


Dask awkward results: [[0.0693, -0.0448], [0.0678, -0.0451], ..., [0.0616, ...], [0.0587, -0.0172]]




In [14]:
jets = ak.flatten(open_events().Jet)

In [15]:
def pad(arr):
    return ak.fill_none(
        ak.pad_none(arr, 100, axis=1, clip=True),
        0.0,
    )

In [16]:
imap = {
    "points": {
        "deta": pad(jets.eta - jets.constituents.pf.eta),
        "dphi": pad(jets.delta_phi(jets.constituents.pf)),
    },
    "features": {
        "dr": pad(jets.delta_r(jets.constituents.pf)),
        "lpt": pad(np.log(jets.constituents.pf.pt)),
        "lptf": pad(np.log(jets.constituents.pf.pt / jets.pt)),
        "f1": pad(np.log(np.abs(jets.constituents.pf.d0) + 1)),
        "f2": pad(np.log(np.abs(jets.constituents.pf.dz) + 1)),
    },
    "mask": {
        "mask": pad(ak.ones_like(jets.constituents.pf.pt)),
    },
}

In [19]:
retmap = {
    k: ak.concatenate([x[:, np.newaxis, :] for x in imap[k].values()], axis=1)
    for k in imap.keys()
}

In [28]:
comp_ret = dask.compute(retmap)[0]
comp_im = dask.compute(imap)[0]

In [35]:
retmap

{'points': dask.awkward<concatenate, npartitions=1>,
 'features': dask.awkward<concatenate, npartitions=1>,
 'mask': dask.awkward<concatenate, npartitions=1>}

In [39]:
imap['points'].values()

dict_values([dask.awkward<fill-none, npartitions=1>, dask.awkward<fill-none, npartitions=1>])

In [41]:
ak.values_astype(retmap["points"], "float32").compute()

In [43]:
retmap['points'].compute()

In [3]:
pn_example1(dask_events).compute()

In [4]:
computed = dask_results.compute()

In [5]:
computed

In [6]:
class ParticleNetExample2(ParticleNetExample1):
    def postprocess_awkward(self, return_array, events):
        softmax = np.exp(return_array)[:, 0] / ak.sum(np.exp(return_array), axis=-1)
        njets = ak.count(events.Jet.pt, axis=-1)
        return ak.unflatten(softmax, njets)


pn_example2 = ParticleNetExample2("model.pt")

# Running on dask awkward
dask_events = open_events()
dask_jets = dask_events.Jet
dask_jets["MLresults"] = pn_example2(dask_events)
dask_events["Jet"] = dask_jets
print(dask_events.Jet.MLresults.compute())

        is partitionwise-compatible with dask.awkward<divide, npartitions=1>
        (e.g. counts comes from a dak.num(array, axis=1)),
        otherwise this unflatten operation will fail when computed!


[[0.528, 0.528, 0.524, 0.523, 0.521, 0.52, 0.519, 0.519], ..., [0.528, ...]]


In [7]:
class ParticleNetExample(torch_wrapper):
    def prepare_awkward(self, events):
        jets = ak.flatten(events.Jet)

        def pad(arr):
            return ak.fill_none(
                ak.pad_none(arr, 100, axis=1, clip=True),
                0.0,
            )

        # Human readable version of what the inputs are
        # Each array is a N jets x 100 constituent array
        imap = {
            "points": {
                "deta": pad(jets.eta - jets.constituents.pf.eta),
                "dphi": pad(jets.delta_phi(jets.constituents.pf)),
            },
            "features": {
                "dr": pad(jets.delta_r(jets.constituents.pf)),
                "lpt": pad(np.log(jets.constituents.pf.pt)),
                "lptf": pad(np.log(jets.constituents.pf.pt / jets.pt)),
                "f1": pad(np.log(np.abs(jets.constituents.pf.d0) + 1)),
                "f2": pad(np.log(np.abs(jets.constituents.pf.dz) + 1)),
            },
            "mask": {
                "mask": pad(ak.ones_like(jets.constituents.pf.pt)),
            },
        }

        # Compacting the array elements into the desired dimension using
        # ak.concatenate
        retmap = {
            k: ak.concatenate([x[:, np.newaxis, :] for x in imap[k].values()], axis=1)
            for k in imap.keys()
        }

        # Returning everything using a dictionary. Also take care of type
        # conversion here.
        return (), {
            "points": ak.values_astype(retmap["points"], "float32"),
            "features": ak.values_astype(retmap["features"], "float32"),
            "mask": ak.values_astype(retmap["mask"], "float16"),
        }

    def postprocess_awkward(self, return_array, events):
        softmax = np.exp(return_array)[:, 0] / ak.sum(np.exp(return_array), axis=-1)
        njets = ak.count(events.Jet.pt, axis=-1)
        return ak.unflatten(softmax, njets)


pn_example = ParticleNetExample("model.pt")

# Running on dask awkward arrays
dask_events = open_events()
dask_jets = dask_events.Jet
dask_jets["MLresults"] = pn_example(dask_events)
dask_events["Jet"] = dask_jets
print(dask_events.Jet.MLresults.compute())

print(dask_awkward.necessary_columns(dask_events.Jet.MLresults))

[[0.528, 0.528, 0.524, 0.523, 0.521, 0.52, 0.519, 0.519], ..., [0.528, ...]]
{'from-uproot-e8eda6e42b6c187a20c9d695353963b0': frozenset({'nJetPFCands', 'JetPFCands_pFCandsIdx', 'nJet', 'Jet_nConstituents', 'PFCands_phi', 'PFCands_dz', 'PFCands_d0', 'Jet_phi', 'Jet_eta', 'Jet_pt', 'nPFCands', 'PFCands_pt', 'PFCands_eta'})}
