# Dask through Jupyter Notebooks

This notebook runs a simple study on SUEP data using Dask, creating an output coffea file that can be analyzed in Dask_analysis.ipynb.

In [1]:
import os
import sys
import json
import time
import random
import numpy as np
import coffea
coffea.deprecations_as_errors = False #Get rid of warning for now
from coffea import hist, processor
from hist import Hist
import matplotlib

from dask_jobqueue import SLURMCluster
from distributed import Client
from dask.distributed import performance_report

(Set coffea.deprecations_as_errors = True to get a stack trace now.)
ImportError: coffea.hist is deprecated
  from distributed.utils import tmpfile


We set up a coffea ABC Processor to analyze the ROOT files.

In [2]:
import awkward as ak
import vector
vector.register_awkward()

class Simple_Process(processor.ProcessorABC):
    def __init__(self, isMC: int, era: int, sample: str) -> None:
        self.gensumweight = 1.0
        self.era = era
        self.isMC = isMC
        self.sample = sample

        self._accumulator = processor.dict_accumulator(
            {
                "ht_reco": hist.Hist(
                    "Events",
                    hist.Cat("dataset", "Dataset"),
                    hist.Bin("ht_reco", r"$H_T$ [GeV]", 50,0,2500),
                ),
                "ht_reco_triggered": hist.Hist(
                    "Events",
                    hist.Cat("dataset", "Dataset"),
                    hist.Bin("ht_reco_triggered", r"$H_T$ [GeV]",  50,0,2500),
                ),
                "nmuons": hist.Hist(
                    "Events",
                    hist.Cat("dataset", "Dataset"),
                    hist.Bin("nmuons", r"$N_{muons}$", 30, 0, 30),
                ),
                "muon_pt": hist.Hist(
                    "Events",
                    hist.Cat("dataset", "Dataset"),
                    hist.Bin("muon_pt", r"$Muon p_{T}$ [GeV]", 10, 0, 200),
                ),
                "muon_pt_triggered": hist.Hist(
                    "Events",
                    hist.Cat("dataset", "Dataset"),
                    hist.Bin("muon_pt_triggered", r"$Muon p_{T}$ [GeV]", 10, 0, 200),
                ),
                "MET": hist.Hist(
                    "Events",
                    hist.Cat("dataset", "Dataset"),
                    hist.Bin("MET", r"$p_{T}^{miss}$ [GeV]", 50, 0, 200),
                ),
                "sumw": processor.defaultdict_accumulator(float),
            }
        )
        
    @property
    def accumulator(self):
        return self._accumulator
    
    def process(self, events):
        output = self.accumulator
        dataset = events.metadata['dataset']
        #if "Muon" not in dataset:
        try:
            self.gensumweight = ak.sum(events.genWeight)
            output["sumw"][dataset] += ak.sum(events.genWeight)
        except:
            output["sumw"][dataset] = 1.0
        #output["sumw"][dataset] = 1.0
        
        muons = ak.zip({
            "pt": events.Muon.pt,
            "eta": events.Muon.eta,
            "phi": events.Muon.phi,
            "mass": events.Muon.mass,
            "mediumId": events.Muon.mediumId
        }, with_name="Momentum4D") 
        muon_triggered = muons[events.HLT.IsoMu24 == 1]
        muon_cut = (events.Muon.pt > 10) & \
            (abs(events.Muon.eta) <= 2.4) & \
            (events.Muon.mediumId == 1) 
        muons = muons[muon_cut]
        muon_triggered_cut = (muon_triggered.pt > 10) & \
            (abs(muon_triggered.eta) <= 2.4) & \
            (muon_triggered.mediumId == 1) 
        muon_triggered = muon_triggered[muon_triggered_cut]
        
        Ak4Jets = ak.zip({
            "pt": events.Jet.pt,
            "eta": events.Jet.eta,
            "phi": events.Jet.phi,
            "mass": events.Jet.mass,
        }, with_name="Momentum4D")
        #Ak4Jets = Ak4Jets[events.HLT.Mu45_eta2p1 == 1]#for 2016
        
        Jets_triggered = Ak4Jets[(events.HLT.PFHT1050 == 1) & (events.HLT.Mu50 == 1)]
        Ak4Jets = Ak4Jets[events.HLT.Mu50 == 1]
        Ak4JetCut = (Ak4Jets.pt > 30) & (abs(Ak4Jets.eta)<4.7)
        Ak4Jets = Ak4Jets[Ak4JetCut]
        Jets_triggeredCut = (Jets_triggered.pt > 30) & (abs(Jets_triggered.eta)<4.7)
        Jets_triggered = Jets_triggered[Jets_triggeredCut]
                
        # fill out hists

        output['ht_reco'].fill(ht_reco=ak.sum(Ak4Jets.pt,axis=-1), dataset=dataset)
        jet_trig = ak.to_numpy(ak.sum(Jets_triggered.pt,axis=-1),allow_missing=True)
        output['ht_reco_triggered'].fill(ht_reco_triggered=jet_trig, dataset=dataset)
        
        output['nmuons'].fill(nmuons=ak.num(muons, axis=-1), dataset=dataset)
        muons = muons[ak.num(muons, axis=-1)>0]
        output['muon_pt'].fill(muon_pt=ak.max(muons.pt, axis=-1), dataset=dataset)
        muon_trig = ak.to_numpy(ak.max(muon_triggered.pt, axis=-1),allow_missing=True)
        output['muon_pt_triggered'].fill(muon_pt_triggered=muon_trig, dataset=dataset)
        output['MET'].fill(MET=events.MET.pt, dataset=dataset)
                
        return output
        
    def postprocess(self, accumulator):
        return accumulator

In [3]:
def check_port(port):
    import socket
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    try:
        sock.bind(("0.0.0.0", port))
        available = True
    except:
        available = False
    sock.close()
    return available

The following section defines additional parts of the slurm Dask job. Here we source the bashrc to prepare Conda. We also pass in the x509 proxy. In order to share the proxy across the SubMIT machines you should move your proxy to your HOME directory.

In [4]:
slurm_env = [
     'export XRD_RUNFORKHANDLER=1',
     'export XRD_STREAMTIMEOUT=10',
     f'source {os.environ["HOME"]}/.bashrc',
     f'conda activate dask',
     f'export X509_USER_PROXY={os.environ["HOME"]}/x509up_u206148',
     f'sleep $[ ( $RANDOM % 300 )  + 1 ]s'
]

extra_args=[
     "--output=dask_job_output_%j.out",
     "--error=dask_job_output_%j.err",
     "--partition=submit",
     "--clusters=submit",
]

In [5]:
n_port       = 6820
w_port       = 9765
cores        = 1
processes    = 1
memory       = "10 GB"
chunksize    = 100000
maxchunks    = None

The following sets up the processor and json file. If you want to change files you can simply modify the json file

In [6]:
# load samples
file = "../filelist/trig_2018.txt"
samples = []
with open(file, 'r') as stream:
    for sample in stream.read().split('\n'):
        if '#' in sample: continue
        if len(sample.split('/')) <= 1: continue
        sample_name = sample.split("/")[-1]
        samples.append(sample_name)

In [7]:
# load file names
samples_dict = {}
for sample_name in samples:
    
    # ignores SUEP files since we want to use another one
    if 'SUEP' in sample_name: continue
    
    input_list = "/home/tier3/cmsprod/catalog/t2mit/nanosu/A01/{}/RawFiles.00".format(sample_name)
    
    files = []
    Raw_list = open(input_list, "r")
    for i in Raw_list:
        file = i.split(" ")[0]
        #file = file.replace("root://xrootd.cmsaf.mit.edu//store","/mnt/T2_US_MIT/hadoop/cms/store")
        files.append(file)
    
    samples_dict[sample_name] = files

In [8]:
# load SUEP files
suepDir = "/work/submit/freerc/Carlos/"
suep_dict = {}
for sample in os.listdir(suepDir):
    #if "M125" not in sample: continue
    if "ggHpythia_generic" not in sample: continue
    mass = sample.split("_")[2]
    suep_dict.update({mass: [suepDir + sample + "/GEN/total.root"]})

In [9]:
# combine QCD, data, and SUEP files
process_dict = {}
for key in list(samples_dict.keys()):
    process_dict.update({key:samples_dict[key]})#Only take 10 files for now.
#process_dict = process_dict | suep_dict

In [10]:
#process_dict

In [11]:
# cross section
xsections = {}
for sample in list(process_dict.keys()):
    xsection = 1.0
    if 'QCD' in sample:
        with open('../data/xsections_{}.json'.format('2018')) as file:
            MC_xsecs = json.load(file)
            try:
                xsection *= MC_xsecs[sample]["xsec"]
                xsection *= MC_xsecs[sample]["kr"]
                xsection *= MC_xsecs[sample]["br"]
            except:
                print("WARNING: I did not find the xsection for that MC sample. Check the dataset name and the relevant yaml file")
                print(sample)
        xsections.update({sample:xsection})
    else:
        xsections.update({sample:xsection})

The next section forms the Slurm Cluster. You can set up various parameters of the cluster here.

In [None]:
while not check_port(n_port):
    time.sleep(5)

import socket
cluster = SLURMCluster(
        queue='all',
        project="SUEP_Slurm",
        cores=4,
        processes=processes,
        memory=memory,
        #retries=10,
        walltime='00:30:00',
        scheduler_options={
              'port': n_port,
              'dashboard_address': 8000,
              'host': socket.gethostname()
        },
        job_extra=extra_args,
        env_extra=slurm_env,
)

In [None]:
cluster.adapt(minimum=1, maximum=10)
client = Client(cluster)
print(client)

In [None]:
#processor_instance = Simple_Process(isMC=1, era='2018', sample='test')
#output = processor.run_uproot_job(
#    process_dict,
#    treename='Events',
#    processor_instance=processor_instance,
#    executor=processor.futures_executor,
#    executor_args={'workers': 30,
#            'schema': processor.NanoAODSchema,
#            'xrootdtimeout': 30,
#        },
#    chunksize=100000)

## Running the processor
Now we will run the code with a performance report. This will analyze all of the input ROOT files and will store the histograms in output. Then we can analyze the output and make plots.

In [None]:
processor_instance = Simple_Process(isMC=1, era='2018', sample='test')
with performance_report(filename="dask-report.html"):
    output = processor.run_uproot_job(process_dict,
             treename='Events',
             processor_instance=processor_instance,
             executor=processor.dask_executor,
             executor_args={
                           'client': client,
                           'skipbadfiles': True,
                           'schema': processor.NanoAODSchema,
                           'xrootdtimeout': 30,
                           'retries': 10,
                           },
             chunksize=100000,
             maxchunks=maxchunks)

In [None]:
#client.cancel(cluster)

# just in case this isn't working as expected
#coffea.util.save(output, "unscaled_output.coffea")

# calculate normalization
scales = {} 
for dataset in output["sumw"]:
    xsec = xsections[dataset]
    scale = xsec / output["sumw"][dataset]
    if "Muon"  in dataset: scale = 1.0
    scales.update({dataset: scale})

    
# apply normalization to all histograms
for key in list(output.keys()):
    if key.lower() == 'sumw': continue
    print(key)
    output[key].scale(scales, axis='dataset')

coffea.util.save(output, "output.coffea")

### We can make some plots here too! But most of the analysis is in Dask_analysis.ipynb

In [None]:
from coffea.hist import plot
import matplotlib.pyplot as plt
import mplhep as hep

fig = plt.figure()
ax = fig.subplots()

#hep.cms.label(data=False, year='2018')
hep.style.use("ROOT")
# {"ALICE" | "ATLAS" | "CMS" | "LHCb1" | "LHCb2"}
hep.cms.label(data=False)

_ = ax.set_yscale('log')
plot.plot1d(output['ht_reco']["QCD_Pt*"], ax=ax, clear=False, stack=True)
_ = ax.set_xlim(0,2500)
_ = ax.set_ylim(0, 10000000000)
ax.get_legend().remove()

In [None]:
fig = plt.figure()
ax = fig.subplots()

#hep.cms.label(data=False, year='2018')
hep.style.use("ROOT")
# {"ALICE" | "ATLAS" | "CMS" | "LHCb1" | "LHCb2"}
hep.cms.label(data=False)

_ = ax.set_yscale('log')
plot.plot1d(output['ht_reco_triggered']["*Muon*"], ax=ax, clear=False, stack=True)
_ = ax.set_xlim(0,2500)
_ = ax.set_ylim(0, 10000000000)
ax.get_legend().remove()

In [None]:
val_trig = 0
val = 0
var_trig = 0
var = 0
i = 0
for sample in output['ht_reco_triggered'].project('dataset').values():
    if 'QCD_Pt' not in sample[0]: continue
    bh_trig = output['ht_reco_triggered'][sample[0]].to_boost()
    bh = output['ht_reco'][sample[0]].to_boost()
    if i == 0:
        bins_use = bh_trig.axes.edges[1][0]
        val_trig = bh_trig.values()[0]
        val= bh.values()[0]
        var_trig = bh_trig.variances()[0]
        var= bh.variances()[0]
    else:
        val_trig += bh_trig.values()[0]
        val += bh.values()[0]
        var_trig += bh_trig.variances()[0]
        var += bh.variances()[0]
    i += 1

#val_trig = np.array([ x+y for x,y in zip(val_trig[0::2], val_trig[1::2]) ])
#val = np.array([ x+y for x,y in zip(val[0::2], val[1::2]) ])
#var_trig = np.array([ x+y for x,y in zip(var_trig[0::2], var_trig[1::2]) ])
#var = np.array([ x+y for x,y in zip(var[0::2], var[1::2]) ])
x = val_trig/val
x2 = np.nan_to_num(x)*100

In [None]:
val_trig_data = 0
val_data = 0
var_trig_data = 0
var_data = 0
i = 0
for sample in output['ht_reco_triggered'].project('dataset').values():
    if 'Muon' not in sample[0]: continue
    bh_trig_data = output['ht_reco_triggered'][sample[0]].to_boost()
    bh_data = output['ht_reco'][sample[0]].to_boost()
    if i == 0:
        val_trig_data = bh_trig_data.values()[0]
        val_data= bh_data.values()[0]
        var_trig_data = bh_trig_data.variances()[0]
        var_data= bh_data.variances()[0]
    else:
        val_trig_data += bh_trig_data.values()[0]
        val_data += bh_data.values()[0]
        var_trig_data+= bh_trig_data.variances()[0]
        var_data += bh_data.variances()[0]
    i += 1
    
    
#val_trig_data = np.array([ x+y for x,y in zip(val_trig_data[0::2], val_trig_data[1::2]) ])
#val_data = np.array([ x+y for x,y in zip(val_data[0::2], val_data[1::2]) ])
#var_trig_data = np.array([ x+y for x,y in zip(var_trig_data[0::2], var_trig_data[1::2]) ])
#var_data = np.array([ x+y for x,y in zip(var_data[0::2], var_data[1::2]) ])
x_data = val_trig_data/val_data
x_data2 = np.nan_to_num(x_data)*100

In [None]:
#bins_use=bins_use[0::2]
import hist
MC_err = hist.intervals.ratio_uncertainty(num=val_trig,denom=val,uncertainty_type='efficiency')
data_err = hist.intervals.ratio_uncertainty(num=val_trig_data,denom=val_data,uncertainty_type='efficiency')

In [None]:
binc = np.array([ 0.5*(bins_use[i]+bins_use[i+1])for i in range(bins_use.shape[0]-1)])
xerr = np.diff(bins_use)*0.5

In [None]:
fig = plt.figure()
ax = fig.subplots()

hep.cms.label(data=False, year='SingleMuon 2018')
hep.style.use("ROOT")
# {"ALICE" | "ATLAS" | "CMS" | "LHCb1" | "LHCb2"}
hep.cms.label(data=False)
binc = np.array([ 0.5*(bins_use[i]+bins_use[i+1])for i in range(bins_use.shape[0]-1)])
plt.errorbar(binc,x_data2, xerr=xerr,yerr=[data_err[0],data_err[1]],color="red", fmt='o')
plt.errorbar(binc,x2, xerr=xerr,yerr=[MC_err[0],MC_err[1]],color="black", fmt='o')
#plt.axvline(x=1200, color='gray',ls='--', lw=2)
_ = ax.set_ylim(0.001, 130)
_ = ax.set_ylabel("Efficiency")
_ = ax.set_xlabel(r"AK4 $H_{T}$ [GeV]")
plt.rcParams['text.usetex'] = True
labels = [r"QCD MC",r"Data"]
leg = ax.legend(labels=labels)
plt.axvline(x=1200, color='gray',ls='--', lw=2)

In [None]:
z=np.nan_to_num(x_data2/x2)
z=np.clip(z,0,15)
z_up = np.nan_to_num((x_data2+data_err[0])/(x2-MC_err[1])) - z
z_up = np.clip(z_up,0,15)
z_down = z - np.nan_to_num((x_data2-data_err[1])/(x2+MC_err[0]))
z_down = np.clip(z_down,0,15)

In [None]:
#z=np.clip(z,0,1.5)
#z_up=np.clip(z_up,0,1.5)
#_down=np.clip(z_down,0,1.5)

In [None]:
fig = plt.figure()
ax = fig.subplots()

hep.cms.label(data=False, year='SingleMuon 2018')
hep.style.use("ROOT")
# {"ALICE" | "ATLAS" | "CMS" | "LHCb1" | "LHCb2"}
hep.cms.label(data=False)
plt.errorbar(binc,z, xerr=xerr,yerr=[z_up,z_down],color="red", fmt='o')
_ = ax.set_ylim(0.001, 1.5)
_ = ax.set_ylabel("Scale Factor")
_ = ax.set_xlabel(r"AK4 $H_{T}$ [GeV]")
plt.axvline(x=1200, color='gray',ls='--', lw=2)

In [None]:
import uproot
outfile = uproot.recreate("trigSF_2018.root")

In [None]:
z_var = np.maximum(z_up,z_down)

In [None]:
z_var

In [None]:
import boost_histogram as bh
newhist = bh.Histogram(bh.axis.Variable(bins_use),storage=bh.storage.Weight())
newhist[:] = np.stack([z, z_var], axis=-1)

In [None]:
newhist

In [None]:
#outfile['efficiency_Data'] = x_data2
#outfile['efficiency_MC'] = x_2
outfile['TriggerSF'] = newhist