In [None]:
from pathlib import Path
import datetime
import traceback

import dask
import dask_awkward as dak
import hist.dask
import coffea
import numpy as np
import uproot

from coffea.nanoevents import NanoEventsFactory, NanoAODSchema
from coffea.analysis_tools import PackedSelection
from coffea import dataset_tools

from functools import partial
import cloudpickle
from collections import defaultdict
import os
import time
import warnings

from ndcctools.taskvine import DaskVine, PythonTask, FunctionCall
    
warnings.filterwarnings("ignore")
NanoAODSchema.warn_missing_crossrefs = False # silences warnings about branches we will not use here

In [None]:
BRANCH_LIST = [
        "run", "luminosityBlock", "event",
        "GenPart_pt", "GenPart_eta", "GenPart_phi", "CorrT1METJet_phi",
        "GenJet_pt", "CorrT1METJet_eta", "SoftActivityJet_pt",
        "Jet_eta", "Jet_phi", "SoftActivityJet_eta", "SoftActivityJet_phi",
        "CorrT1METJet_rawPt", "Jet_btagDeepFlavB", "GenJet_eta", 
        "GenPart_mass", "GenJet_phi",
        "Jet_puIdDisc", "CorrT1METJet_muonSubtrFactor", "Jet_btagDeepFlavCvL",
        "Jet_btagDeepFlavQG", "Jet_mass", "Jet_pt", "GenPart_pdgId",
        "Jet_btagDeepFlavCvB", "Jet_cRegCorr"
]

In [None]:
def report_stats(out, t0=None, t1=None, failed=False):
    if not out:
        return

    if not isinstance(out, dict):
        # something is weird with that output
        print(out)
        return

    if t0 is None:
        t0 = out["start"]
        
    if t1 is None:
        t1 = out["end"]

    if failed:
        print(f"{out['failed']}")

    print(f"chunks: {out['chunks']}")
    print(f"events: {out['num_events']}")
    # summary of performance
    read_GB = out['read'] / 1000**3
    print(f"total read (compressed): {read_GB:.2f} GB")
    print(f"total read (uncompressed): {out['uncompressed'] / 1000**3:.2f} GB")

    rate_Gbps = read_GB*8/(t1-t0)
    if rate_Gbps == 0:
        rate_Gbps = 0.000000001
    print(f"average data rate: {rate_Gbps:.2f} Gbps (need to scale by x{200/rate_Gbps:.0f} to reach 200 Gbps)")

    n_evts = out["num_entries"]
    print(f"total event rate (wall clock time): {n_evts / (t1-t0) / 1000:.2f} kHz (processed {n_evts} events total)")

    total_runtime = out["runtime"]
    print(f"total aggregated runtime in function: {total_runtime:.2f} s")
    print(f"ratio total runtime / wall clock time: {total_runtime / (t1-t0):.2f} "\
          "(should match # cores without overhead / scheduling issues)")
    print(f"event rate (aggregated time spent in function): {n_evts / total_runtime / 1000:.2f} kHz")               
    print(f"failed: {len(out['failed'])}/{out['chunks']}")
    print(f"not dict: {out['not_dict']}")



In [None]:
# wrappers for debugging
def get_bytes(key, fn, *args):
    # get the transfer stats out of the dask function
    r = fn(*args)
    if key[0] == "accum":
        return (None, r)
    else:
        return ((r['read'], r['uncompressed'], r['num_entries'], r['start'], r['end'], r['chunks'], len(r['failed'])), r)

def avg_bytes():
    output_file = open("stats.csv", "w")
    print(f"read,uncompressed,num_entries,start,end,chunks,nfailed", file=output_file)

    stats = defaultdict(lambda: 0)
    stats['start'] = None

    def avg(info, stats=stats, output_file=output_file):
        if not info:
            return
        read, uncompressed, num_entries, start, end, chunks, nfailed = info

        print(f"{read},{uncompressed},{num_entries},{start},{end},{chunks},{nfailed}", file=output_file)

        if not stats['start']:
            stats['start'] = start

        if stats['start'] > start:
            stats['start'] = start

        if stats['end'] < start:
            stats['end'] = end

        stats['total'] += read

        if stats['counter'] % 1000 == 0:
            bps = (8*stats['total'])/(stats['end'] - stats['start'])
            if bps > stats['max_rate_seen']:
                stats['max_rate_seen'] = bps
            print(f"rate: {bps/1000**3:.2f} Gbps, read: {stats['total']/1000**3:.2f} GB,  max seen: {stats['max_rate_seen']/1000**3:.2f} Gbps")
            output_file.flush()

        stats['counter'] += 1
    return avg


def trace_memory(key, fn, *args):
    import tracemalloc
    tracemalloc.start()
    before = tracemalloc.get_traced_memory()
    b = fn(*args)
    after = tracemalloc.get_traced_memory()
    tracemalloc.stop()

    memory_used = (after[1] - before[1])/1000**3
    memory_used = after[1]/1000**3
    a = (key, memory_used)

    return (a, b)

def trace_memory_peak(arg):
    stats = {'max_memory': 0}
    def mem(arg, stats=stats):
        (key, memory) = arg
        if memory > stats['max_memory']:
            stats['max_memory'] = memory
            print("new max", key, memory)
    return mem



In [None]:
# work for coffea
def do_stuff_real(events):
    # track number of events
    num_entries = ak.num(events, axis=0)

    # read out all other branches into integers to avoid memory issues
    _counter = 0
    for obj_to_add in [
        events.GenPart.pt,
        events.GenPart.eta,
        events.GenPart.phi,
        events.CorrT1METJet.phi,
        events.GenJet.pt,
        events.CorrT1METJet.eta,
        events.SoftActivityJet.pt,
        events.Jet.eta,
        events.Jet.phi,
        events.SoftActivityJet.eta,
        events.SoftActivityJet.phi,
        events.LHEPart.eta,
        events.LHEPart.phi,
        events.CorrT1METJet.rawPt,
        events.Jet.btagDeepFlavB,
        events.GenJet.eta,
        events.GenPart.mass,
        events.GenJet.phi,
        events.Jet.puIdDisc,
        events.CorrT1METJet.muonSubtrFactor,
        events.Jet.btagDeepFlavCvL,
        events.LHEPart.mass,
        events.LHEPart.pt,
        events.Jet.btagDeepFlavQG,
        events.Jet.mass,
        events.Jet.pt,
        events.GenPart.pdgId,
        events.Jet.btagDeepFlavCvB,
        events.Jet.cRegCorr,
        events.LHEPart.incomingpz
    ]:
        _counter_to_add = ak.count_nonzero(obj_to_add, axis=1)

        # reduce >2-dimensional (per event) branches further
        for _ in range(_counter_to_add.ndim - 1):
            _counter_to_add = ak.count_nonzero(_counter_to_add, axis=-1)

        _counter = _counter + _counter_to_add  # sum 1-dim array built from new branch

    _counter = ak.count_nonzero(_counter, axis=0)  # reduce to int

    return {"num_entries": num_entries, "_counter": _counter}



In [None]:
# work for uproot.dask
# specs: [(filename, obj_path, entry_start, entry_stop), ...]
def do_uproot_read_dask(specs, compute=False):
    import dask
    import awkward as ak
    import time
    import gc

    t0 = time.time()

    size_read = 0
    size_uncompressed = 0
    num_entries = 0
    ccounter_all = 0
    failed = []

    try:
        for (filename, path, entry_start, entry_stop) in specs:
            try:
                spec = {f"{filename}": {"object_path": "Events", "steps": [entry_start, entry_stop]}}
                pre_events, pre_report = uproot.dask(spec, filter_name=BRANCH_LIST, allow_read_errors_with_report=True, entry_start=entry_start, entry_stop=entry_stop)
                events, report = dask.compute(pre_events, pre_report, num_workers=1)

                if report.exception and report.exception[0]:
                    raise Exception(f"{filename}: {report.exception} {report.message}")

                num_entries += ak.num(events, axis=0)

                if compute:
                    ccounter_all = 0
                    for b in BRANCH_LIST:
                        try:
                            ccounter_all += ak.count_nonzero(events[b], axis=None)
                        except Exception:
                            pass

                size_uncompressed = events.nbytes
                size_read = sum(report['performance_counters']['num_requested_bytes'])
            except Exception as e:
                #failed.append({filename: traceback.format_exc()})
                failed.append({filename: e})
                print(f"{filename}: {e}")
            finally:
                events = None
                report = None
                # gc.collect()
    except Exception as e:
        size_read = 0
        size_uncompressed = 0
        num_entries = 0
        ccounter_all = 0

    t1 = time.time()

    return {"chunks": len(specs), "read": size_read, "uncompressed":
            size_uncompressed, "num_entries": num_entries, "compute_counter":
            ccounter_all, "runtime": t1-t0, "start": t0, "end": t1, "failed": failed}

In [None]:
# this is the one used for the workshop results
def do_uproot_read_open(specs, compute=False):
    import uproot
    import time
    import gc
    import signal

    def handler(num, stack):
        raise TimeoutError(f"{specs}")

    signal.alarm(300)
    signal.signal(signal.SIGALRM, handler)


    t0 = time.time()

    size_read = 0
    size_uncompressed = 0
    num_entries = 0
    ccounter_all = 0
    failed = []
    try:
        for (filename, path, entry_start, entry_stop) in specs:
            try:
                with uproot.open(f"{filename}") as froot:
                    if not entry_start:
                        entry_start = 0
                    if not entry_stop:
                        entry_stop = froot[path].num_entries
                    num_entries += entry_stop - entry_start
                    for b in BRANCH_LIST:
                        try:
                            froot["Events"][b].array(entry_start=entry_start, entry_stop=entry_stop)
                            size_uncompressed += froot["Events"][b].uncompressed_bytes
                        except uproot.exceptions.KeyInFileError:
                            pass

                    size_read += froot.file.source.num_requested_bytes
            except Exception as e:
                #failed.append({filename: traceback.format_exc()})
                failed.append({filename: e})
                print(f"{filename}: {e}")
                raise
            finally:
                # gc.collect()
                pass
    except Exception as e:
        size_read = 0
        size_uncompressed = 0
        num_entries = 0
        ccounter_all = 0
        #raise

    t1 = time.time()

    return {"chunks": len(specs), "read": size_read, "uncompressed":
            size_uncompressed, "num_entries": num_entries, "compute_counter":
            ccounter_all, "runtime": t1-t0, "start": t0, "end": t1, "failed": failed}



In [None]:
# accumulation task for manual graph
# adds numbers and extends lists values from partial dictionaries
def accum_dict(*partials):
    from collections import defaultdict
    out = defaultdict(lambda: 0)
    out["failed"] = []

    start = None
    end = None
    for p in partials:
        if not p:
            continue

        if not isinstance(p, dict):
            out["not_dict"] += 1
            continue
        try:
            pstart = p["start"]
            if not start or pstart < start:
                start = pstart
        except KeyError:
            pass

        try:
            pend = p["end"]
            if not end or pend > end:
                end = pend
        except KeyError:
            pass

        if "failed" in p and p["failed"]:
            print(p["failed"])

        for k, v in p.items():
            if isinstance(v, list):
                out[k].extend(v)
            else:
                try:
                    out[k] += v
                except KeyError:
                    out[k] = v
                except TypeError:
                    print(k, v)
    out["start"] = start
    out["end"] = end

    return out



In [None]:
# construct manual graph
# samples: from coffea preprocess
# fn: function to apply to samples (e.g., do_uproot_read_dask)
# times: how many times to repeat each sample
# files_per_task: number of files to assign to a single read task
# compute: Whether to make a simple computation on the data read (i.e., count nonzero entries)
# accumulate: Whether to accumulate per dataset (True), or return each read result indiviudually (False)
# accumulate_all: Whether to accumulate the individual datasets into a single result.
# accum_fn: Function to accumulate results (default is to accumulate as dictionaries)
# accum_chunk: Number of resutls per accumualtion task
def manual_graph(samples, fn, times=1, files_per_task=1, compute=False, accumulate=True, accumulate_all=False, accum_fn=accum_dict, accum_chunk=10):
    
    def add_vertex(graph, process, on_queue, files_for_task):
        # if len(graph) > 1000:
        #    return
        key = (f"uproot:{process}", len(graph))
        graph[key] = (fn, files_for_task, compute)
        on_queue.append(key)

    targets = []
    graph = {}
    for _ in range(times):
        files_for_task = []

        for process in samples:
            on_queue = []

            for name, info in samples[process]["files"].items():
                obj_path = info.get('object_path', 'Events')
                steps = info.get('steps', [[None, None]])

                for (start, stop) in steps:
                    files_for_task.append([name, obj_path, start, stop])
                    if len(files_for_task) >= files_per_task:
                        add_vertex(graph, process, on_queue, files_for_task)
                        files_for_task = []

            if len(files_for_task) > 0:
                add_vertex(graph, process, on_queue, files_for_task)
                files_for_task = []

            if accumulate:
                while on_queue:
                    args = on_queue[0:accum_chunk]
                    on_queue = on_queue[accum_chunk:]
                    key = ("accum", process, len(graph))
                    graph[key] = (accum_fn, *args)
                    if on_queue:
                        on_queue.append(key)
                    else:
                        targets.append(key)
            else:
                targets.extend(on_queue)
    if accumulate and accumulate_all:
        on_queue = targets
        targets = []
        while on_queue:
            args = on_queue[0:accum_chunk]
            on_queue = on_queue[accum_chunk:]
            key = ("accum", len(graph))
            graph[key] = (accum_fn, *args)
            if on_queue:
                on_queue.append(key)
            else:
                targets.append(key)
    return (graph, targets)
 



In [None]:
print(f"uproot: {uproot.__version__}")
print(f"hist: {hist.__version__}")
print(f"coffea: {coffea.__version__}")

manager = DaskVine(port=8786, ssl=True, name=f"{os.environ.get('USER', 'noname')}-coffea-casa",run_info_path="/mnt/data/btovar-logs/",)

extra_files = {}
env_vars = {}
    
token_acc_path = "/etc/cmsaf-secrets-chown/access_token"
token_xch_path = "/etc/cmsaf-secrets-chown/xcache_token"

if Path(token_acc_path).is_file():
    extra_files[manager.declare_file(token_acc_path, cache=True)] = "access_token"
    env_vars["BEARER_TOKEN_FILE"] = "access_token"
if Path(token_xch_path).is_file():
    extra_files[manager.declare_file(token_xch_path, cache=True)] = "xcache_token"
    env_vars["XCACHE_FILE"] = "xcache_token"


# bring back accumulation task results for better disk garbage collection
def checkpoint_accum(dag, key):
    if key[0] == "accum":
        return True

# usually we put all these options directly into dask calls,
# but coffea preprocessing only allows one argument to set the scheduler,
# thus we create a partial of manager.get, which is the function that takes
# a dask graph and executes it.
vine_scheduler = partial(manager.get,
                         resources={"cores": 1},  #  max 1 core, 5GB of disk per task
                         #resources_mode='fixed',
                         resources_mode=None,   # set to "fixed" to kill tasks on resources
                         extra_files=extra_files,
                         checkpoint_fn=checkpoint_accum,
                         env_vars=env_vars,
                         submit_per_cycle=1000,  # throttle submission to keep memory usage low,
                         max_pending=6000,       # and start doing work faster
                         worker_transfers=True,  # keep partials at workers
                         task_mode="function-calls", # use one interpreter per worker
                         lib_resources={"cores": 8, "slots": 8}, # resources and functions a single interpreter can run
                         # environment="env.tar.gz",   # needed for task_mode="tasks" if taskvine version at worker 
                                                       # is behind: poncho_package_create $CONDA_PREFIX env.tar.gz,
                                                       # or if more modules are needed at the execution site.
                        )

# given to coffea and dask functions as **scheduler_options to make taskvine the scheduler
scheduler_options = {}
scheduler_options['scheduler'] = vine_scheduler

In [None]:
# read datasets
import json
fname = "zstd_files.json"

fileset = {}
with open(fname,'r') as fp:
    for i,(dataset_name,file_list) in enumerate(json.load(fp).items()):
        fileset[dataset_name] = {"files": {}}
        for j,dataset_fpath in enumerate(file_list):
            xrd_fpath = f"root://xcache.cmsaf-dev.flatiron.hollandhpc.org:1094/{dataset_fpath}"
            fileset[dataset_name]["files"][xrd_fpath] = "Events"

In [None]:
# preprocess
try:
    # step_size = 50_000
    # step_size = 100_000
    # step_size = 500_000
    step_size = 5_000_000

    with open(f"preprocessed_{step_size}.pkl", "rb") as f:
        # do not re preprocess if we don't have too...
        samples = cloudpickle.load(f)
except Exception:
    scheduler_options['scheduler'] = vine_scheduler
    samples, report = dataset_tools.preprocess(fileset, step_size=step_size, skip_bad_files=True, uproot_options={"allow_read_errors_with_report": True}, **scheduler_options)
    with open(f"preprocessed_{step_size}.pkl", "wb") as f:
        cloudpickle.dump(samples, f)

    total_files  = sum([len(p["files"]) for p in samples.values()])
    total_chunks = sum(sum(len(f["steps"]) for f in p["files"].values()) for p in samples.values())
    print(f"nfiles: {total_files} chunks: {total_chunks}")



In [None]:
# run with manual graph
if True:
    # create the graph
    tasks, targets = manual_graph(samples, times=2, files_per_task=1, fn=do_uproot_read_open, compute=False, accumulate=True, accumulate_all=True, accum_fn=accum_dict, accum_chunk=10)

    print(f"tasks: {len(tasks)}")
    
    # send twice the functions that can fit to a worker. With this, a worker can start working in the next load
    # while waiting for the manager to retrieve completed results.
    manager.tune("resource-submit-multiplier", 2)
    
    # keep a second copy of each results in the workers. More relevant when there is eviction, but it helps a little
    # when accumulating results, as workers can more easily find available sources for transfers.
    manager.tune("temp-replica-count", 2)

    # execute task graph
    t0 = time.time()
    outs = vine_scheduler(tasks, targets, progress_label='[green]process',
                          wrapper=get_bytes, wrapper_proc=avg_bytes())
    #outs = vine_scheduler(tasks, targets, progress_label='[green]process', wrapper=trace_memory, wrapper_proc=trace_memory_peak())

    ### or use dask client
    # from dask.distributed import Client, performance_report
    # client = Client("tls://localhost:8786")
    # outs = client.get(tasks, targets)

    t1 = time.time()

    print("stats with IO time")
    report_stats(outs[0], failed=True)

    print("stats IO + accumulation")
    report_stats(outs[0], t0, t1)



In [None]:
# regular coffea
if False:
    t0 = time.perf_counter()

    # change default scheduler
    tasks = dataset_tools.apply_to_fileset(do_stuff_real, samples, uproot_options={"allow_read_errors_with_report": (OSError, TypeError, KeyError), "filter_name": BRANCH_LIST})

    #(out, report) = dask.compute(tasks, **scheduler_options, progress_label="[green]process")
    ((out,report),) = dask.compute(tasks, **scheduler_options)
    t1 = time.perf_counter()

    try:
        print(f"total time spent in uproot reading data: {ak.sum([v['duration'] for v in report.values()]):.2f} s")
        print(f"wall time: {t1-t0:.2f}s")
        print(f"events: {sum(out[k]['num_entries'] for k in out)}")
        event_rate = sum(out[k]["num_entries"] for k in out)

        event_rate = event_rate / (t1-t0)
        print(f"event rate: {event_rate / 1_000:.2f} kHz")

        read_GB = ak.sum([v['performance_counters']['num_requested_bytes'] for v in report.values()]) / 1_000**3
        rate_Gbs = read_GB / (t1-t0)
        print(f" - read {read_GB:.2f} GB in {t1-t0:.2f} s -> {rate_Gbs:.2f} GBps (need to scale by x{200/8/rate_Gbs*1000:.0f} to reach 200 Gbps)")
    except Exception:
        pass

    with open("outs.pkl", "wb") as f:
        cloudpickle.dump((out, report), f)