# Scaling QCD Jets with Coffea Processor

In [1]:
import awkward as ak
import numpy as np
import coffea
import uproot
import hist
import vector
from coffea import util, processor
from coffea.nanoevents import NanoEventsFactory, NanoAODSchema, BaseSchema
from distributed.diagnostics.plugin import UploadDirectory
import matplotlib.pyplot as plt
from collections import defaultdict
import os
import pickle
from coffea.analysis_tools import PackedSelection
from dask.distributed import Client
from smp_utils import *

In [2]:
class QCDProcessor(processor.ProcessorABC):
        
    def __init__(self):
        
        ###################################
        ### Defining the Histogram Axes ###
        ###################################
        
        binning = util_binning()
        
        dataset_axis = binning.dataset_axis
        frac_axis = binning.frac_axis
        eta_axis = binning.eta_axis
        phi_axis = binning.phi_axis
        pt_axis = binning.pt_axis
        
        #rho_axis = binning.rho_axis
        #npvs_axis = binning.npvs_axis
        #npu_axis = binning.npu_axis
        
        ######################################
        ### Defining the Histogram Objects ###
        ######################################
        
        h_responses_histogram = hist.Hist(dataset_axis, frac_axis)#, eta_axis, phi_axis, pt_axis)#, storage="weight", label="Counts")
        #h_corrections_histogram = hist.Hist(dataset_axis, rho_axis, npvs_axis, npu_axis, storage="weight", label="Counts")
                                         
        cutflow = {}
        
        self.hists = {
            "responses_histogram":h_responses_histogram,
            #"corrections_histogram":h_corrections_histogram,
            "cutflow":cutflow,
        }
        
    @property
    def accumulator(self):
        return self.hists
    
    def process(self, events):
        
        dataset = events.metadata['dataset']
        print(f"Processing ----- {dataset}")
        if dataset not in self.hists["cutflow"]:
            self.hists["cutflow"][dataset] = defaultdict(int)
        
        ####################################################
        ### Applying Cuts to Jet Kinematic Distributions ###
        ####################################################
        
        ### Vertex and JetId Masks
        
        vtx_mask = np.abs(events.GenVtx.z - events.PV.z) < 0.2

        events = events[vtx_mask]
        
        id_mask = events.Jet.jetId > 0
        events.Jet = events.Jet[id_mask]
        events = events[ak.num(events.Jet, axis=1) > 0]
        
        ### Keeping three leading jets 
        
        gen_jets = events.GenJet[:, :3]
        reco_jets = gen_jets.nearest(events.Jet, threshold=0.2)
        pt_response = reco_jets.pt /gen_jets.pt
        
        ### Final masks
        
        sel_1 = ~ak.is_none(reco_jets, axis=1)

        reco_jets = reco_jets[sel_1]
        gen_jets = gen_jets[sel_1]
        pt_response = pt_response[sel_1]

        sel_2 = ak.num(pt_response) > 2

        reco_jets = reco_jets[sel_2]
        gen_jets = gen_jets[sel_2]
        pt_response = pt_response[sel_2]
        pt_response = pt_response[pt_response <= 2]
        
        ###########################################
        ### Applying Cuts to Pileup Observables ###
        ###########################################
        
        ### Mask application
        
        #n_reco_vtx = events.PV.npvs
        #n_pileup = events.Pileup.nPU
        #rho = events.fixedGridRhoFastjetAll
        #pu_nTrueInt = events.Pileup.nTrueInt
        
        #n_reco_vtx = n_reco_vtx[sel_2]
        #n_pileup = n_pileup[sel_2]
        #rho = rho[sel_2]
        #pu_nTrueInt = pu_nTrueInt[sel_2]
        
        ### Broadcasting across reco_jets
        
        #n_reco_vtx = ak.broadcast_arrays(n_reco_vtx, reco_jets.pt)[0]
        #n_pileup = ak.broadcast_arrays(n_pileup, reco_jets.pt)[0]
        #rho = ak.broadcast_arrays(rho, reco_jets.pt)[0]
        #pu_nTrueInt = ak.broadcast_arrays(pu_nTrueInt, reco_jets.pt)[0]
        
        ### Pileup Weights
        
        #puWeight = GetPUSF(dataset, np.array(ak.flatten(pu_nTrueInt)))
        
        ##############################
        ### Filling the Histograms ###
        ##############################
        
        self.hists["responses_histogram"].fill(dataset=dataset, frac=ak.ravel(pt_response))#, eta=ak.ravel(gen_jets.eta), phi=ak.ravel(gen_jets.phi), pt=ak.ravel(gen_jets.pt))#, weight=puWeight)
                                               
        #self.hists["corrections_histogram"].fill(dataset=dataset, npvs=ak.ravel(n_reco_vtx), npu=ak.ravel(n_pileup), rho=ak.ravel(rho), weight=puWeight)
        
        return self.hists
    
    def postprocess(self, accumulator):
        return accumulator

In [3]:
prependstr = "root://cmsxrootd.fnal.gov/"
#prependstr = "root://xcache/"

filedir = "samples/"

filestr = "flatPU_JMENano_%s.txt"

eras = ['2016']#, '2017', '2018']

fileset = {}

for era in eras:
    filename = filedir + filestr % (era)
    with open(filename) as f:
        files = [prependstr + i.rstrip() for i in f.readlines() if i[0] != "#"]
        fileset[era] = files

#client = Client("tls://192.168.55.113:8786")
        
run = processor.Runner(
    executor=processor.FuturesExecutor(compression=None, workers=4),
    schema=NanoAODSchema,
    skipbadfiles=True,
)

#run = processor.Runner(
#    executor = processor.DaskExecutor(client=client),
#    schema=NanoAODSchema,
#    chunksize = 1000000,
#    maxchunks = None,
#    skipbadfiles=True
#)

out = run(
    fileset=fileset,
    treename="Events",
    processor_instance=QCDProcessor()
)

fname_out = "pkl_files/QCD_pt_response_NEW.pkl"

with open(fname_out, "wb") as f:
    pickle.dump(out, f)

Output()

in file root://cmsxrootd.fnal.gov//store/mc/RunIISummer20UL16NanoAODAPVv9/QCD_Pt-15to7000_TuneCP5_Flat2018_13TeV_pythia8/NANOAODSIM/FlatPU0to75_106X_mcRun2_asymptotic_preVFP_v11-v1/130000/9E60AAAD-1494-AA4D-B596-D90933503361.root


Output()

Processing ----- 2016
Processing ----- 2016
Processing ----- 2016
Processing ----- 2016
Processing ----- 2016
Processing ----- 2016
Processing ----- 2016
Processing ----- 2016
Processing ----- 2016
Processing ----- 2016
Processing ----- 2016
Processing ----- 2016
Processing ----- 2016
Processing ----- 2016
Processing ----- 2016
Processing ----- 2016
Processing ----- 2016
Processing ----- 2016
Processing ----- 2016
Processing ----- 2016
Processing ----- 2016
Processing ----- 2016
Processing ----- 2016
Processing ----- 2016
Processing ----- 2016
Processing ----- 2016
Processing ----- 2016
Processing ----- 2016
Processing ----- 2016
Processing ----- 2016
Processing ----- 2016
Processing ----- 2016
Processing ----- 2016
Processing ----- 2016
Processing ----- 2016
Processing ----- 2016
Processing ----- 2016
Processing ----- 2016
Processing ----- 2016
Processing ----- 2016
Processing ----- 2016
Processing ----- 2016
Processing ----- 2016
Processing ----- 2016
Processing ----- 2016
Processing

in file root://cmsxrootd.fnal.gov//store/mc/RunIISummer20UL16NanoAODAPVv9/QCD_Pt-15to7000_TuneCP5_Flat2018_13TeV_pythia8/NANOAODSIM/FlatPU0to75_106X_mcRun2_asymptotic_preVFP_v11-v1/130000/A7CB57FE-3CB7-7948-BC8C-9CC683446AC3.root


Processing ----- 2016
Processing ----- 2016
Processing ----- 2016
Processing ----- 2016
Processing ----- 2016
Processing ----- 2016
Processing ----- 2016
Processing ----- 2016
Processing ----- 2016
Processing ----- 2016
Processing ----- 2016
Processing ----- 2016




Processing ----- 2016
Processing ----- 2016
Processing ----- 2016
Processing ----- 2016
Processing ----- 2016
Processing ----- 2016
Processing ----- 2016
Processing ----- 2016
Processing ----- 2016
Processing ----- 2016
Processing ----- 2016
Processing ----- 2016
Processing ----- 2016
Processing ----- 2016
Processing ----- 2016
Processing ----- 2016
Processing ----- 2016
Processing ----- 2016
Processing ----- 2016
Processing ----- 2016
Processing ----- 2016
Processing ----- 2016
Processing ----- 2016
Processing ----- 2016
Processing ----- 2016
Processing ----- 2016
Processing ----- 2016
Processing ----- 2016
Processing ----- 2016
Processing ----- 2016
Processing ----- 2016
Processing ----- 2016




Processing ----- 2016
Processing ----- 2016
Processing ----- 2016
Processing ----- 2016
Processing ----- 2016
Processing ----- 2016
Processing ----- 2016
Processing ----- 2016
Processing ----- 2016
Processing ----- 2016
Processing ----- 2016
Processing ----- 2016
Processing ----- 2016
Processing ----- 2016
Processing ----- 2016
Processing ----- 2016
Processing ----- 2016
Processing ----- 2016
Processing ----- 2016
Processing ----- 2016
Processing ----- 2016
Processing ----- 2016
Processing ----- 2016
Processing ----- 2016
Processing ----- 2016
Processing ----- 2016
Processing ----- 2016
Processing ----- 2016
Processing ----- 2016
