In [None]:
import glob
import time
import uproot
import tqdm
import math
from functools import partial
import pandas as pd
import numpy as np
from coffea.nanoevents import NanoEventsFactory, NanoAODSchema
from dask_gateway import Gateway

In [None]:
# file = "root://eos.cms.rcac.purdue.edu//store/data/Run2016B/SingleMuon/NANOAOD/02Apr2020_ver2-v1/20000/014D129C-DD09-A748-BB1C-81184C4A8DDD.root"
prefix = "root://eos.cms.rcac.purdue.edu/"
prefix_mount = "/eos/purdue/"

datasets = [
    # 2016
    "/store/data/Run2016B/SingleMuon/NANOAOD/02Apr2020_ver2-v1",
    "/store/data/Run2016C/SingleMuon/NANOAOD/02Apr2020-v1",
    "/store/data/Run2016D/SingleMuon/NANOAOD/02Apr2020-v1",
    "/store/data/Run2016E/SingleMuon/NANOAOD/02Apr2020-v1",
    "/store/data/Run2016F/SingleMuon/NANOAOD/02Apr2020-v1",
    "/store/data/Run2016G/SingleMuon/NANOAOD/02Apr2020-v1",
    "/store/data/Run2016H/SingleMuon/NANOAOD/02Apr2020-v1",
    #2017
    "/store/data/Run2017B/SingleMuon/NANOAOD/02Apr2020-v1",
    "/store/data/Run2017C/SingleMuon/NANOAOD/02Apr2020-v1",
    "/store/data/Run2017D/SingleMuon/NANOAOD/02Apr2020-v1",
    "/store/data/Run2017E/SingleMuon/NANOAOD/02Apr2020-v1",
    "/store/data/Run2017F/SingleMuon/NANOAOD/02Apr2020-v1",
    #2018
    "/store/data/Run2018A/SingleMuon/NANOAOD/02Apr2020-v1",
    "/store/data/Run2018B/SingleMuon/NANOAOD/02Apr2020-v1",
    "/store/data/Run2018C/SingleMuon/NANOAOD/02Apr2020-v1",
    "/store/data/Run2018D/SingleMuon/NANOAOD/02Apr2020-v1",
]

# all ROOT files in datasets
files = []
for dataset in datasets:
    ds_files = glob.glob(prefix_mount+dataset+"/*/*root")
    print(f"{dataset}: {len(ds_files)}")
    files.extend(ds_files)

# replace explicit path /eos/purdue with XRootD prefix
files = [f.replace(prefix_mount, prefix) for f in files]

# files = [files[0], files[1]]
print(len(files), "files")

In [None]:
def get_columns(file, fraction=1):
    '''
        Structure of a NanoAOD file looks like this:
          - branch1
            - leaf1
            - leaf2
          - branch2
            - leaf1
            - leaf2
            - leaf3
        We will return list of branches/leaves in the following format:
          [
            [branch1, leaf1],
            [branch1, leaf2],
            [branch2, leaf1],
            [branch2, leaf2],
            [branch2, leaf3],
          ]
    '''

    # full NanoAOD event
    events = NanoEventsFactory.from_root(
        file,
        schemaclass=NanoAODSchema.v6,
        uproot_options={"timeout": 120}
    ).events()

    all_columns = np.empty((0, 2), dtype=float)

    # loop over branches
    for branch in events.fields:
        # loop over leaves
        for leaf in events[branch].fields:
            all_columns = np.append(all_columns, [np.array([branch, leaf])], axis=0)

    # select a fraction of leaves
    columns_subset = all_columns[:math.ceil(fraction * len(all_columns))]

    return columns_subset


# Workflow to run for each file
def process(file, columns=[]):
    events = NanoEventsFactory.from_root(
        file,
        schemaclass=NanoAODSchema.v6,
        uproot_options={"timeout": 120}
    ).events()

    # We will compute mean values for a given subset of columns.
    # This way we can be sure that we access every element in a column.
    mean_values = {}
    for column in columns:
        branch = column[0]
        leaf = column[1]
        if leaf in events[branch].fields:
            mean_values[f"{branch}_{leaf}"] = np.mean(events[branch][leaf])
        else:
            mean_values[f"{branch}_{leaf}"] = 0

    nevents = len(events)
    return nevents, mean_values


In [None]:
# Measure time for a list of files
def run_benchmark(process, files, columns=[], parallel=False, client=None):
    tick = time.time()

    nevts_total = 0

    if parallel:
        # Parallel processing using Dask
        if not client:
            raise "Dask client is missing!"
        futures = client.map(partial(process, columns=columns), files)
        results = client.gather(futures)
        for r in results:
            nevts, mean_vals = r
            nevts_total += nevts
    else:
        # Sequential processing
        for file in tqdm.tqdm(files):
            nevts, mean_vals = process(file, columns=columns)
            # print(mean_vals)
            nevts_total += nevts

    tock = time.time()
    elapsed = tock - tick

    print(nevts_total, "events")
    print(round(elapsed,3), "s")
    print(nevts_total/elapsed, "evts/s")

    

In [None]:
columns = get_columns(files[0], fraction=0.02405)
print(len(columns), "branches")

In [None]:
# Sequential processing
#run_benchmark(process, files, columns)

In [None]:
gateway = Gateway()
cluster = gateway.new_cluster(
    # reservation="DASKTEST",
    queue="cms-express",
    env={
        "PYTHONPATH": "/depot/cms/private/users/dkondra/af-benchmark",
        "X509_USER_PROXY": "/depot/cms/private/users/dkondra/x509up_u616617"
    }
)
cluster.scale(100)
cluster

In [None]:
# Process via Dask Gateway cluster
run_benchmark(process, files, columns, parallel=True, client=cluster.get_client())

In [None]:
cluster.shutdown()

In [None]:
gateway = Gateway()
# List existing clusters
gateway.list_clusters()
# options = gateway.cluster_options()
# options

In [None]:
name = "d6e87428c2f04da2964690275284afb3"
cluster = gateway.connect(name)
client = gateway.connect(name).get_client()
cluster