In [1]:
%run utils.py

In [2]:
from ipywidgets import FileUpload
upload = FileUpload()
upload

FileUpload(value={}, description='Upload')

In [3]:
setup_rucio_and_proxy(upload.data[-1])

In [4]:
from functools import partial
import dask
from dask import delayed
import rucio.client
import awkward as ak
import numpy as np
import matplotlib.pyplot as plt
import uproot
import numba
import vector
from tqdm.auto import tqdm
from dask.distributed import futures_of
from physlite_experiments.physlite_events import physlite_events
from physlite_experiments.io import AIOHTTPSource

In [6]:
rucio_client = rucio.client.Client(ca_cert=False)



In [7]:
#files = list(rucio_client.list_files("data17_13TeV", "data17_13TeV.periodK.physics_Main.PhysCont.DAOD_PHYSLITE.grp17_v01_p4309"))

In [8]:
files = []
for file in tqdm(rucio_client.list_files(
    "user.nihartma",
    "user.nihartma.data_13TeV.all.DAOD_PHYSLITE.grp15_v01_p4309"
)):
    files.append(file)



0it [00:00, ?it/s]

In [9]:
import os

In [10]:
[k for k in os.environ if "x509" in k.lower()]

['X509_USER_PROXY']

In [11]:
os.environ["JUPYTER_IMAGE"]

'eu.gcr.io/gke-dev-311213/jupyter-physlite:preprod'

In [12]:
class Source(AIOHTTPSource):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, tcp_connection_limit=100, **kwargs)

In [15]:
def get_4leptons(url):
    with uproot.open(url, http_handler=Source) as f:
        tree = f["CollectionTree"]
        events = physlite_events(tree)
        print(len(events))
        array = ak.zip(
            {
                "Electrons": ak.zip(
                    {
                        "p4": ak.zip({k: events.Electrons[k] for k in ["pt", "eta", "phi", "m"]}),
                        "charge": events.Electrons["charge"]
                    }
                ),
                "Muons": ak.zip(
                    {
                        "p4": ak.zip({k: events.Muons[k] for k in ["pt", "eta", "phi"]}),
                        "charge": events.Muons["charge"]
                    }
                )
            },
            depth_limit=1
        )
        array["Muons", "p4", "m"] = 105.6583715
        array = array[
            ((ak.num(array.Electrons) >= 2) & (ak.num(array.Muons) >= 2))
            | (ak.num(array.Electrons) >= 4)
            | (ak.num(array.Muons) >= 4)
        ]
        return ak.to_arrow(array)

In [16]:
def get_p4(url):
    with uproot.open(url, http_handler=Source) as f:
        tree = f["CollectionTree"]
        events = physlite_events(tree)
        print(len(events))
        p4 = ak.zip({k: events.Muons[k] for k in ["pt", "eta", "phi", "charge"]}, with_name="PtEtaPhiMLorentzVector")
        p4["mass"] = 105.6583715
        p4 = p4[ak.num(p4) >= 4]
        return ak.to_arrow(p4)

In [17]:
def get_m4mu(url):
    p4 = ak.with_name(ak.from_arrow(get_p4(url)), "PtEtaPhiMLorentzVector")
    combinations = ak.unzip(ak.combinations(p4, 4))
    charge_sum = ak.sum(ak.concatenate(combinations, axis=1).charge, axis=1)
    combinations = [c[charge_sum == 0] for c in combinations]
    return ak.to_numpy(ak.flatten(sum(combinations[1:], combinations[0]).mass))

In [18]:
def get_m4mu(url):
    with uproot.open(url, http_handler=Source) as f:
        tree = f["CollectionTree"]
        events = physlite_events(tree)
        p4 = ak.zip({k: events.Muons[k] for k in ["pt", "eta", "phi"]}, with_name="PtEtaPhiMLorentzVector")
        p4["mass"] = 105.6583715
        combinations = ak.unzip(ak.combinations(p4, 4))
        return ak.to_numpy(ak.flatten(sum(combinations[1:], combinations[0]).mass))

In [19]:
url = get_signed_url(rucio_client, files[0]["scope"], files[0]["name"])



In [20]:
%%time
array = ak.from_arrow(get_4leptons(url))

Skipping EventInfoAuxDyn.streamTagRobs
Skipping EventInfoAuxDyn.streamTagDets
Can't interpret PrimaryVerticesAuxDyn.neutralParticleLinks
Skipping AnalysisHLT_tau35_medium1_tracktwoEF_tau25_medium1_tracktwoEF_03dR30_L1DR-TAU20ITAU12I-J25AuxDyn.TrigMatchedObjects
Skipping AnalysisHLT_tau35_medium1_tracktwoEF_tau25_medium1_tracktwoEF_L1DR-TAU20ITAU12I-J25AuxDyn.TrigMatchedObjects
Skipping AnalysisHLT_tau35_medium1_tracktwo_tau25_medium1_tracktwo_03dR30_L1DR-TAU20ITAU12I-J25AuxDyn.TrigMatchedObjects
Skipping AnalysisHLT_tau35_medium1_tracktwo_tau25_medium1_tracktwo_L1DR-TAU20ITAU12I-J25AuxDyn.TrigMatchedObjects
Skipping AnalysisHLT_tau35_medium1_tracktwo_tau25_medium1_tracktwo_tautsf_L1DR-TAU20ITAU12I-J25AuxDyn.TrigMatchedObjects
Skipping AnalysisHLT_mu14_ivarloose_tau25_medium1_tracktwoEF_L1DR-MU10TAU12I_TAU12I-J25AuxDyn.TrigMatchedObjects
Skipping AnalysisHLT_mu14_ivarloose_tau25_medium1_tracktwo_L1DR-MU10TAU12I_TAU12I-J25AuxDyn.TrigMatchedObjects
Skipping AnalysisHLT_mu14_ivarloose_tau2

In [21]:
array.nbytes

496

In [22]:
from dask_gateway import GatewayCluster
cluster = GatewayCluster(
    worker_cores=1,
    worker_memory=3.5,
    #image="eu.gcr.io/gke-dev-311213/dask-gateway-physlite:20210622"
    #image="eu.gcr.io/gke-dev-311213/jupyter-physlite:20210622"
    #image="eu.gcr.io/gke-dev-311213/jupyter-physlite:20210709"
    image=os.environ["JUPYTER_IMAGE"]
)

  from distributed.utils import LoopRunner, format_bytes


In [23]:
cluster

VBox(children=(HTML(value='<h2>GatewayCluster</h2>'), HBox(children=(HTML(value='\n<div>\n<style scoped>\n    …

In [78]:
cluster.shutdown()

In [26]:
client = cluster.get_client()

In [50]:
cluster.scale(720)
#cluster.scale(16)

In [51]:
client.restart()

0,1
Connection method: Cluster object,Cluster type: dask_gateway.GatewayCluster
Dashboard: /services/dask-gateway/clusters/default.f57301bfcf744b4d8711c8c7a0cef6fe/status,


In [52]:
len(files)

264076

In [53]:
def merge(arrow_list):
    return ak.concatenate(ak.from_arrow(x) for x in arrow_list)

In [54]:
x509_data = upload.data[-1]

In [55]:
def run_task(x509_data, scope, name):
    url = get_signed_url_worker(
        x509_data,
        scope,
        name,
        rucio_account="nihartma",
        rucio_home="/srv/conda/envs/notebook",
        ca_cert=False,
    )
    return get_4leptons(url)

In [58]:
fut_x509_data = client.scatter(x509_data, broadcast=True)

In [None]:
client.who_has(fut_x509_data)

In [60]:
tasks = []
sub_tasks = []
n_sub = 100
for file in tqdm(files):
    sub_tasks.append(client.submit(run_task, fut_x509_data, file["scope"], file["name"]))
    if len(sub_tasks) >= n_sub:
        tasks.append(client.submit(merge, sub_tasks))
        sub_tasks = []

  0%|          | 0/264076 [00:00<?, ?it/s]

In [65]:
len(tasks)

2640

In [70]:
len([fut for fut in tasks if fut.status == "finished"])

2630

In [71]:
tasks[0].result()

<Array [{Electrons: [], ... Muons: []}] type='2173 * {"Electrons": var * {"p4": ...'>

In [72]:
tasks[0].result().nbytes

241300

In [73]:
p4_results = client.gather([fut for fut in tasks if fut.status == "finished"])

In [74]:
len(p4_results)

2630

In [75]:
p4_ak = ak.concatenate(p4_results)

In [76]:
p4_ak.nbytes / 1024 / 1024

362.04543685913086

In [77]:
ak.to_parquet(p4_ak, "leptons_100percent.parquet")