In [1]:
import pickle
import awkward as ak
import hist as hs
import numpy as np

from coffea import processor
from coffea.nanoevents.methods import vector, candidate

from coffea.nanoevents import BaseSchema
import mplhep as hep

# coffea processor
Meat of algo is in `process` method.

In [2]:
class CSCprocessor(processor.ProcessorABC):
    
    def delta_cls_gLLP(self, events):
        clusts = ak.zip(
            {
                'pt': ak.zeros_like(events.cscRechitClusterPhi),
                'phi': events.cscRechitClusterPhi,
                'eta': events.cscRechitClusterEta,
                'E': ak.zeros_like(events.cscRechitClusterPhi),
            },
            with_name = 'PtEtaPhiMLorentzVector',
            behavior = vector.behavior,                 
          )
        gLLPs = ak.zip(
            {
                'pt': events.gLLP_pt,
                'phi': events.gLLP_phi,
                'eta': events.gLLP_eta,
                'E': events.gLLP_e,
            },
            with_name = 'PtEtaPhiMLorentzVector',
            behavior = vector.behavior,                 
          )        
        cls_llp_pairs = ak.cartesian({
            "cls": clusts, 
            "llp": gLLPs
        }, nested = True) 
        
        deltaR = (cls_llp_pairs.cls).delta_r(cls_llp_pairs.llp)
        deltaEta = abs(cls_llp_pairs.cls.eta - cls_llp_pairs.llp.eta)
        deltaPhi = np.arctan2(np.sin(cls_llp_pairs.cls.phi - cls_llp_pairs.llp.phi), 
                              np.cos(cls_llp_pairs.cls.phi - cls_llp_pairs.llp.phi))
        
        deltaR, deltaEta, deltaPhi = ak.flatten(deltaR, axis=2), ak.flatten(deltaEta, axis=2), ak.flatten(deltaPhi, axis=2)
        return deltaR, deltaEta, deltaPhi,
    
    def delta_cls_leadmuon(self, events):
        clusts = ak.zip(
            {
                'pt': ak.zeros_like(events.cscRechitClusterPhi),
                'phi': events.cscRechitClusterPhi,
                'eta': events.cscRechitClusterEta,
                'E': ak.zeros_like(events.cscRechitClusterPhi),
            },
            with_name = 'PtEtaPhiMLorentzVector',
            behavior = vector.behavior,                 
          )
        leadMuons = ak.zip(
            {
                'pt': events.leadMuonPt,
                'phi': events.leadMuonPhi,
                'eta': events.leadMuonEta,
                'E': events.leadMuonE,
            },
            with_name = 'PtEtaPhiMLorentzVector',
            behavior = vector.behavior,                 
          ) 
        cls_muon_pairs = ak.cartesian({
            "cls": clusts, 
            "muon": leadMuons
        }, nested = True)
 
        deltaR = (cls_muon_pairs.cls).delta_r(cls_muon_pairs.muon)
        deltaEta = abs(cls_muon_pairs.cls.eta - cls_muon_pairs.muon.eta)
        deltaPhi = np.arctan2(np.sin(cls_muon_pairs.cls.phi - cls_muon_pairs.muon.phi), 
                              np.cos(cls_muon_pairs.cls.phi - cls_muon_pairs.muon.phi))
        
        deltaR, deltaEta, deltaPhi = ak.flatten(deltaR, axis=2), ak.flatten(deltaEta, axis=2), ak.flatten(deltaPhi, axis=2)
        return deltaR, deltaEta, deltaPhi
        
    
    def process(self, events):
        
        signame = 'Phi'
        
        # >>> output dict init >>>
        dataset = events.metadata['dataset']
        out = {
            dataset: {},
            f'{dataset}_cuts': {},
            f'{dataset}_vars': {},
        }
        # <<< output dict init <<<
        
        out[dataset][f'numEvents_pretrigger'] = len(events)
        
        # >>> some preprocessing >>>
        
            # add a new set of branches that only consists of values pertaining to muons
        muoncut = abs(events.lepPdgId) == 13
        events['muonE'] = events.lepE[muoncut]
        events['muonPt'] = events.lepPt[muoncut]
        events['muonEta'] = events.lepEta[muoncut]
        events['muonPhi'] = events.lepPhi[muoncut]
        events['muonPdgId'] = events.lepPdgId[muoncut]
        events['muonDZ'] = events.lepDZ[muoncut]
        events['muonLooseId'] = events.lepLooseId[muoncut]
        events['muonTightId'] = events.lepTightId[muoncut] 
        if 'mPhi0p3' in dataset:
            events['muonType'] = events.lepMuonType[muoncut]
            events['muonQuality'] = events.lepMuonQuality[muoncut]
            events['muon_passHLTFilter'] = events.lepMuon_passHLTFilter[muoncut]
        events = events[ak.count(events.muonPt, axis=1) > 0] #kill all events with empty muons
        out[dataset][f'numEvents_|muonId| == 13'] = len(events.muonPt)
        
        
            # cut and mutate events based on this new branch
        def muoncutter(events, muoncut):
            events['muonE'] = events.muonE[muoncut]
            events['muonPt'] = events.muonPt[muoncut]
            events['muonEta'] = events.muonEta[muoncut]
            events['muonPhi'] = events.muonPhi[muoncut]
            events['muonPdgId'] = events.muonPdgId[muoncut]
            events['muonDZ'] = events.muonDZ[muoncut]
            events['muonLooseId'] = events.muonLooseId[muoncut]
            events['muonTightId'] = events.muonTightId[muoncut]
            if 'mPhi0p3' in dataset:
                events['muonType'] = events.muonType[muoncut]
                events['muonQuality'] = events.muonQuality[muoncut]
                events['muon_passHLTFilter'] = events.muon_passHLTFilter[muoncut]
            events = events[ak.count(events.muonPt, axis=1) > 0] #kill all events with empty muons
            return events

        if 'mPhi0p3' in dataset:
            events = muoncutter(events, ak.any(events.muon_passHLTFilter[:,:,range(60,68)], axis=2))
            out[dataset][f'numEvents_muonHLTReq'] = len(events.muonPt)
            
        events = muoncutter(events, abs(events.muonEta) < 1.5)
        out[dataset][f'numEvents_|muonEta| < 1.5'] = len(events.muonPt)
        events = muoncutter(events, events.muonPt > 7)
        out[dataset][f'numEvents_muonPt > 7'] = len(events.muonPt)
        
        if 'mPhi0p3' in dataset:       
            events = muoncutter(events, events.muonQuality >= 2**25)
            out[dataset][f'numEvents_soft_muon_ID'] = len(events.muonPt) 
            
            # finally keep only the leading muon
        leadcut = (ak.max(events.muonPt, axis=1, mask_identity=True) == events.muonPt)
        events['leadMuonE'] = events.muonE[leadcut][:,0]
        events['leadMuonPt'] = events.muonPt[leadcut][:,0]
        events['leadMuonEta'] = events.muonEta[leadcut][:,0]
        events['leadMuonPhi'] = events.muonPhi[leadcut][:,0]
        events['leadMuonPdgId'] = events.muonPdgId[leadcut][:,0]
        events['leadMuonDZ'] = events.muonDZ[leadcut][:,0]
        events['leadMuonLooseId'] = events.muonLooseId[leadcut][:,0]
        events['leadMuonTightId'] = events.muonTightId[leadcut][:,0]
        if 'mPhi0p3' in dataset:
            events['leadMuonType'] = events.muonType[leadcut][:,0]
            events['leadMuonQuality'] = events.muonQuality[leadcut][:,0]
            events['leadMuon_passHLTFilter'] = events.muon_passHLTFilter[leadcut][:,0]                
        out[dataset][f'numEvents_leadMuon_cut'] = len(events.leadMuonPt)

            # add branches pertaining to deltaR between either leading muon and cluster, or llp and cluster
        if signame in dataset:
            cls_llp_deltaR, cls_llp_deltaEta, cls_llp_deltaPhi = self.delta_cls_gLLP(events)
            events['cscRechitCluster_llp_deltaR'] = cls_llp_deltaR
            events['cscRechitCluster_llp_deltaEta'] = cls_llp_deltaEta
            events['cscRechitCluster_llp_deltaPhi'] = cls_llp_deltaPhi

            events['gLLP_decay_vertex_z_matched'] = events.gLLP_decay_vertex_z.mask[ak.any(cls_llp_deltaR < .4, axis=1)]
            events['gLLP_e_matched'] = events.gLLP_e.mask[ak.any(cls_llp_deltaR < .4, axis=1)]
            
        cls_leadmuon_deltaR, cls_leadmuon_deltaEta, cls_leadmuon_deltaPhi = self.delta_cls_leadmuon(events)
        events['cscRechitCluster_leadmuon_deltaR'] = cls_leadmuon_deltaR
        events['cscRechitCluster_leadmuon_deltaEta'] = cls_leadmuon_deltaEta
        events['cscRechitCluster_leadmuon_deltaPhi'] = cls_leadmuon_deltaPhi
        # <<< some preprocessing <<<

        
        # >>> cut definitions >>> 
        dummy = ak.values_astype(ak.ones_like(events.nCscRechitClusters), 'bool') #dummy truth vector of same shape as csc variables
        cscCuts = {
            'posttrigger': dummy, 
            'llp_acc': (events.gLLP_csc == 1) if signame in dataset else dummy,
            'num Cluster > 0': (events.nCscRechitClusters > 0),
            'dR_gllp_cls < .4': (events.cscRechitCluster_llp_deltaR < .4) if signame in dataset else dummy,
            'dR_lmuon_cls > .8': (events.cscRechitCluster_leadmuon_deltaR > .8),
            'ME1112_veto': ((events.cscRechitClusterNRechitChamberPlus11 <= 0)&(events.cscRechitClusterNRechitChamberMinus11 <= 0)&
                            (events.cscRechitClusterNRechitChamberPlus12 <= 0)&(events.cscRechitClusterNRechitChamberMinus12 <= 0)),
            're12_veto': (events.cscRechitCluster_match_RE12_0p4 == 0),
            'mb1_veto': (events.cscRechitCluster_match_MB1Seg_0p4 == 0),
            'rb1_veto': (events.cscRechitCluster_match_RB1_0p4 == 0),
            'muon_veto_pt < 20': (events.cscRechitClusterMuonVetoPt < 20),
            '-5 < cls_time < 12.5': ((events.cscRechitClusterTimeWeighted <= 12.5)&(events.cscRechitClusterTimeWeighted >= -5)),
            '|cls_timeSpread| < 20': (events.cscRechitClusterTimeSpreadWeightedAll <= 20),
            '|cls_eta| < 1.9': (abs(events.cscRechitClusterEta) < 1.9),
            'cut_based_ID': (((events.cscRechitClusterNStation10 >  1) & (abs(events.cscRechitClusterEta) < 1.9)) |
                             ((events.cscRechitClusterNStation10 == 1) & (abs(events.cscRechitClusterAvgStation10) == 4) & (abs(events.cscRechitClusterEta) < 1.8)) |
                             ((events.cscRechitClusterNStation10 == 1) & (abs(events.cscRechitClusterAvgStation10) == 3) & (abs(events.cscRechitClusterEta) < 1.6)) |
                             ((events.cscRechitClusterNStation10 == 1) & (abs(events.cscRechitClusterAvgStation10) == 2) & (abs(events.cscRechitClusterEta) < 1.6)) |
                             ((events.cscRechitClusterNStation10 == 1) & (abs(events.cscRechitClusterAvgStation10) == 1) & (abs(events.cscRechitClusterEta) < 1.1))),
            'cls_size > 130': (events.cscRechitClusterSize >= 130),
        }
        # <<< cut definitions <<<

        # >>> variables to be plotted >>>                
        __ = lambda x: x
        bins = 30
        
        #must be same shape as any csc variable
        cscVars = {
            'cscRechitClusterNRechitChamberPlus11':  [bins,    0,  10, __, ],
            'cscRechitClusterNRechitChamberMinus11': [bins,    0,  10, __, ],
            'cscRechitClusterNRechitChamberPlus12':  [bins,    0,  10, __, ],
            'cscRechitClusterNRechitChamberMinus12': [bins,    0,  10, __, ],
            'cscRechitCluster_match_RE12_0p4':       [bins,    0,  10, __, ],
            'cscRechitCluster_match_MB1Seg_0p4':     [bins,    0,  10, __, ],
            'cscRechitCluster_match_RB1_0p4':        [bins,    0,  10, __, ],
            'cscRechitClusterMuonVetoPt':            [bins,    0, 100, __, ],
            'cscRechitClusterTimeWeighted':          [bins,  -20,  20, __, ],
            'cscRechitClusterTimeSpreadWeightedAll': [bins,    0,  30, __, ],
            'cscRechitClusterEta':                   [bins,    0,   3, abs,],
            'cscRechitClusterSize':                  [bins,    50, 300, __, ],
            'cscRechitClusterNStation10':            [bins,    0,   5, __, ],
            'cscRechitClusterAvgStation10':          [bins,    0,   5, abs, ],
        }
        
        if 'background' in dataset: # this is explicitly to protect from unblinding data
            cscVars['cscRechitClusterSize'] = [int(bins/5),    50, 100, __, ]

        if signame in dataset:
            cscVars['cscRechitCluster_llp_deltaR']   = [bins, 0, 5, __,]
            cscVars['cscRechitCluster_llp_deltaEta'] = [bins, 0, 5, abs,]
            cscVars['cscRechitCluster_llp_deltaPhi'] = [bins, 0, 5, __,]

        cscVars['cscRechitCluster_leadmuon_deltaR']   = [bins, 0, 5, __,]
        cscVars['cscRechitCluster_leadmuon_deltaEta'] = [bins, 0, 5, abs,]
        cscVars['cscRechitCluster_leadmuon_deltaPhi'] = [bins, 0, 5, __,]
        
        #must be flat variables of length nEvents
        eventVars = {
            'metEENoise': [bins,   0, 100, __, ],
            'gLLP_ctau': [bins, 0, 1e3, __, ],
        }
        
        if signame in dataset:        
            eventVars['gLLP_decay_vertex_z'] = [2*bins,   0, 1200, abs, ]
            eventVars['gLLP_decay_vertex_z_matched'] = [2*bins,   0, 1200, abs, ]
            eventVars['gLLP_e'] = [bins,   0, 100, __, ]
            eventVars['gLLP_e_matched'] = [bins,   0, 100, __, ]
            
        Vars = cscVars | eventVars
        # <<< variables to be plotted <<<
        
        
        # >>> create hists >>> 
        bigCut = cscCuts[list(cscCuts.keys())[0]] #sets first cut
        for cut in cscCuts:
            out[f'{dataset}_cuts'][cut] = 1       
            bigCut = bigCut & cscCuts[cut]
            
            if bigCut.layout.minmax_depth == (2,2):
                temp = ak.any(bigCut, axis=1)
            elif bigCut.layout.minmax_depth == (1,1):
                temp = bigCut
                          
            out[dataset][f'numEvents_{cut}'] = sum(temp)
            
            for var in Vars:

                out[f'{dataset}_vars'][var] = 1  
                v = Vars[var]
                
                if bigCut.layout.minmax_depth == (2,2) and events[var].layout.minmax_depth == (1,1):
                    temp2 = ak.any(bigCut, axis=1)
                else:
                    temp2 = bigCut
                
                data = ak.flatten(events[var][temp2], axis=None)
                data = v[3](data)
                key = f'{var} with {cut}'
                out[f'{dataset}_vars'][key] = data.to_list()
                if 'gLLP_e' in var:
                    out[dataset][key] = hs.Hist.new.Reg(v[0], v[1]+1, v[2], name=var, label=var, transform=hs.axis.transform.log).Double()
                else:
                    out[dataset][key] = hs.Hist.new.Reg(v[0], v[1], v[2], name=var, label=var).Double()
                out[dataset][key].fill(data) 
        # <<< create hists <<<
        return out
    
    def postprocess(self, accumulator):
        return accumulator
    

# Runner
Raw data files to be processed into histograms, etc, are called here and processed by the processor defined above.

In [9]:
# prefix = 'rootfiles/'
# fileset = {
#             'PhiToPi0Pi0_mPhi1p0_ctau1000':
#                 [prefix + 'BToKPhi_MuonGenFilter_mPhi1p0_ctau1000_1pb_weighted.root'],
    
#             'PhiToPi0Pi0_mPhi0p3_ctau1000':
#                 [prefix + 'BToKPhi_MuonGenFilter_PhiToPi0Pi0_mPhi0p3_ctau1000_1pb_weighted.root'],

#             'PhiToPi0Pi0_mPhi0p3_ctau300':
#                 [prefix + 'BToKPhi_MuonGenFilter_PhiToPi0Pi0_mPhi0p3_ctau300_1pb_weighted.root'],

#             'PhiToPiPlusPiMinus_mPhi0p3_ctau300':
#                 [prefix + 'BToKPhi_MuonGenFilter_PhiToPiPlusPiMinus_mPhi0p3_ctau300_1pb_weighted.root'],

#             'PhiToPiPlusPiMinus_mPhi0p3_ctau1000':
#                 [prefix + 'BToKPhi_MuonGenFilter_PhiToPiPlusPiMinus_mPhi0p3_ctau1000_1pb_weighted.root'],
#             }
# #fileset['backgroundNew'] = [prefix + 'ParkingBPH4_2018A_goodLumi.root']

prefix = 'root://cmsxrootd.fnal.gov//store/user/christiw/displacedJetMuonAnalyzer/bparking/V1p19/MC_Fall18/v1/v5/normalized/'
fileset = {
#             'PhiToPi0Pi0_mPhi0p3_ctau1000':
#                 [prefix + 'BToKPhi_MuonGenFilter_PhiToPi0Pi0_mPhi0p3_ctau1000_1pb_weighted.root'],

#              'PhiToPi0Pi0_mPhi0p3_ctau300':
#                  [prefix + '/BToKPhi_MuonGenFilter_PhiToPi0Pi0_mPhi0p3_ctau300_1pb_weighted.root'],

#             'PhiToPiPlusPiMinus_mPhi0p3_ctau300':
#                 [prefix + 'BToKPhi_MuonGenFilter_PhiToPiPlusPiMinus_mPhi0p3_ctau300_1pb_weighted.root'],

#             'PhiToPiPlusPiMinus_mPhi0p3_ctau1000':
#                 [prefix + 'BToKPhi_MuonGenFilter_PhiToPiPlusPiMinus_mPhi0p3_ctau1000_1pb_weighted.root'],
            }
#fileset['PhiToPi0Pi0_mPhi1p0_ctau1000'] = ['root://cmsxrootd.fnal.gov//store/group/phys_exotica/delayedjets/displacedJetMuonAnalyzer/csc/V1p17/MC_Fall18/v2/v3/normalized/BToKPhi_MuonGenFilter_mPhi1p0_ctau1000_1pb_weighted.root',]
fileset['backgroundNew'] = ['root://cmsxrootd.fnal.gov//store/user/christiw/displacedJetMuonAnalyzer/bparking/V1p19/Data2018_UL/v6/normalized/ParkingBPH4_2018A_goodLumi.root']


out = processor.run_uproot_job(
    fileset,
    treename="MuonSystem",
    processor_instance=CSCprocessor(),
    executor=processor.futures_executor,
    executor_args={"schema": BaseSchema, "workers": 3},
    # executor_args={"schema": BaseSchema, "workers": 1},
    #maxchunks = 1,
    #chunksize=100000,
)

Output()

Traceback (most recent call last):
  File "/afs/cern.ch/user/a/aaportel/mambaforge/envs/anal/lib/python3.9/site-packages/coffea/processor/executor.py", line 781, in _processwith
    merged = _watcher(FH, self, reducer, pool)
  File "/afs/cern.ch/user/a/aaportel/mambaforge/envs/anal/lib/python3.9/site-packages/coffea/processor/executor.py", line 401, in _watcher
    batch = FH.fetch(len(FH.completed))
  File "/afs/cern.ch/user/a/aaportel/mambaforge/envs/anal/lib/python3.9/site-packages/coffea/processor/executor.py", line 285, in fetch
    raise bad_futures[0].exception()
concurrent.futures.process.BrokenProcessPool: A process in the process pool was terminated abruptly while the future was running or pending.


BrokenProcessPool: A process in the process pool was terminated abruptly while the future was running or pending.

# Saver
A dictionary of histogram objects as well as some other useful info is saved into a pickle file.

In [None]:
filename = 'bkg_outfile.pickle'
outfile = open(filename, 'wb')
pickle.dump(out, outfile)
outfile.close()