In [1]:
%load_ext autoreload

In [2]:
%autoreload 1

In [3]:
%aimport event_selector
%aimport looper_utils
%aimport make_plots

In [4]:
import awkward as ak
import uproot
import numpy as np
from yahist import Hist1D, Hist2D

## Load data

In [5]:
%%bash
ls metadata/

chunklist.json
totalWeights.json
xsection.json


In [6]:
import json
with open("./metadata/chunklist.json") as f:
    chunks = json.load(f) 
chunks.keys()

dict_keys(['EGamma_2018A', 'EGamma_2018B', 'EGamma_2018C', 'EGamma_2018D', 'DYJets', 'ttbar', 'ZG', 'WG', 'GJets_HT40To100', 'GJets_HT100To200', 'GJets_HT200To400', 'GJets_HT400To600', 'GJets_HT600ToInf', 'QCD_pT30To40', 'QCD_pT40ToInf', 'Diphoton', 'ZH', 'signal'])

## Process data

In [7]:
def process(args):
    
    import event_selector
    import make_plots
    
    obj_list = ["electron", "muon", "tau", "photon", "others"]
   
    isData = "EGamma" in args[0]
    events = event_selector.prepare_inputs(args, obj_list, isData=isData)
    
    tot_w = ak.sum(events.genWeight)
    mgg = events.ggMass
    
    ## need to change gHidx
    gHidx = event_selector.get_gHidx(args) 
    g_Hidx = gHidx.gHidx
    mask_diphoton = event_selector.select_photon_byEvent(events.Photon, g_Hidx, mgg)
    
    mask_photon = event_selector.select_photon(events.Photon, g_Hidx, mgg)
    photons_selected = events.Photon[mask_photon] 
    nPho = ak.num(photons_selected)
    
    mask_tau = event_selector.select_tau(events.Tau, "all", isTight=False)
    ## clean by dR wrt photons
    mask_tau = mask_tau & looper_utils.mask_by_dR(events.Tau, photons_selected, 0.2)
    nTau = ak.num(events.Tau[mask_tau])
    
    mask_ele = event_selector.select_electron(events.Electron, isTight=False)
    mask_ele = mask_ele & looper_utils.mask_by_dR(events.Electron, photons_selected, 0.2)
    nEle = ak.num(events.Electron[mask_ele])
    
    mask_mu = event_selector.select_muon(events.Muon, isTight=False)
    mask_mu = mask_mu & looper_utils.mask_by_dR(events.Muon, photons_selected, 0.2)
    nMu = ak.num(events.Muon[mask_mu])
    
    sum_charge = (ak.sum(events.Tau.charge, axis=1) + ak.sum(events.Electron.charge, axis=1) + ak.sum(events.Muon.charge, axis=1) == 0) 
    mask_dR_mll_0lep_2tau = looper_utils.mask_by_dR_mll_0lep_2tau(events.Tau[mask_tau], (0.2,3.5), (30,140))
    mask_dR_mll_1mu_1tau = looper_utils.mask_by_dR_mll_1lep_1tau(events.Tau[mask_tau], events.Muon[mask_mu], (0.3,3.5), (20,120))
    mask_dR_mll_1ele_1tau = looper_utils.mask_by_dR_mll_1lep_1tau(events.Tau[mask_tau], events.Electron[mask_ele], (0.3,3.5), (20,120))
    
    mask_dipho = (nPho == 2)
    mask_0lep_1tau = (nPho == 2) & (nTau == 1) & (nEle == 0) & (nMu == 0)
    mask_0lep_2tau = (nPho == 2) & (nTau == 2) & (nEle == 0) & (nMu == 0) & (sum_charge) & (mask_dR_mll_0lep_2tau)
    mask_2lep_0tau = (nPho == 2) & (nTau == 0) & (nEle + nMu == 2) & (sum_charge)
    mask_1lep_0tau = (nPho == 2) & (nTau == 0) & (nEle + nMu == 1)
    
    mask_1mu_1tau = (nPho == 2) & (nTau == 1) & (nEle == 0) & (nMu == 1) & (sum_charge) & (mask_dR_mll_1mu_1tau)
    mask_1ele_1tau = (nPho == 2) & (nTau == 1) & (nEle == 1) & (nMu == 0) & (sum_charge) & (mask_dR_mll_1ele_1tau)
    mask_1lep_1tau = (mask_1mu_1tau) | (mask_1ele_1tau)
    
    masks = {"dipho": mask_dipho,
             "0lep_1tau": mask_0lep_1tau,
             "0lep_2tau": mask_0lep_2tau,
             "1lep_1tau": mask_1lep_1tau,
             "2lep_0tau": mask_2lep_0tau,
             "1lep_0tau": mask_1lep_0tau}
    
    hists = {}
    
    for key, mask in masks.items():
        print (key)
        #hists[key] = process_event(genWeight[mask], evt_vars[mask])
        #print ("process diphoton")
        hists[key] =  make_plots.process_diphoton(events.Photon[mask], g_Hidx[mask], mgg[mask], events.genWeight[mask])
        #print ("process tau")
        hists[key].update( make_plots.process_tau(events.Tau[mask_tau][mask], events.genWeight[mask]))
        #print ("process muon")
        hists[key].update( make_plots.process_muon(events.Muon[mask_mu][mask], events.genWeight[mask]))
        #print ("process electron")
        hists[key].update( make_plots.process_electron(events.Electron[mask_ele][mask], events.genWeight[mask]))
        #print ("process tau-lep")
        if key == "1lep_1tau":
            tmphists = hists[key].update( make_plots.process_1tau_1lep(events.Tau[mask_tau][mask], events.Muon[mask_mu][mask],
                                                                       events.Electron[mask_ele][mask], events.genWeight[mask]))
            #if bool(tmphists):
            #    hists[key].update(tmphists)
    
    return hists

In [10]:
%%time
hists = process(chunks["signal"][0])
#hists = process((chunks["signal"][0][0], 0, 200) )
#hists = process(chunks["ZH"][0])

dipho
0lep_1tau
0lep_2tau
1lep_1tau
2lep_0tau
1lep_0tau
CPU times: user 7.86 s, sys: 153 ms, total: 8.01 s
Wall time: 31.1 s


## local checks

In [11]:
#hists['1lep_1tau'].keys()
#hists['dipho']['n_electron']
#hists['0lep_2tau']['mtautau']
hists['1lep_1tau']['mtaumu']

bin,content
"(0,5.17241)",0 ± 0
"(5.17241,10.3448)",0 ± 0
"(10.3448,15.5172)",0 ± 0
"(15.5172,20.6897)",0 ± 0
[21 rows hidden],[21 rows hidden]
"(129.31,134.483)",0 ± 0
"(134.483,139.655)",0 ± 0
"(139.655,144.828)",0 ± 0
"(144.828,150)",0 ± 0


In [14]:
#hists['dipho']['pho_pT1'].integral
hists['dipho']['pho_pT1'].integral_error

10.295630085604653

## send to dask

In [12]:
from dask.distributed import Client

client = Client("tcp://169.228.130.5:4595")
client

0,1
Client  Scheduler: tcp://169.228.130.5:4595  Dashboard: http://169.228.130.5:13346/status,Cluster  Workers: 78  Cores: 78  Memory: 312.00 GB


In [12]:
from dask.distributed import Client
c = Client(memory_limit='4GB', n_workers=30, threads_per_worker=1)
c

0,1
Client  Scheduler: tcp://127.0.0.1:23798  Dashboard: http://127.0.0.1:8787/status,Cluster  Workers: 30  Cores: 30  Memory: 120.00 GB


In [15]:
c.shutdown()

distributed.client - ERROR - Failed to reconnect to scheduler after 10.00 seconds, closing client
_GatheringFuture exception was never retrieved
future: <_GatheringFuture finished exception=CancelledError()>
concurrent.futures._base.CancelledError


In [18]:
def g():
    import glob
    allpys = glob.glob("./*py")
    return allpys
g()    

['./utils.py',
 './condor_utils.py',
 './cachepreload.py',
 './__init__.py',
 './event_selector.py',
 './looper_utils.py',
 './make_plots.py']

In [19]:
x = client.submit(g)

In [20]:
client.gather(x)

['./cachepreload.py',
 './event_selector.py',
 './looper_utils.py',
 './make_plots.py',
 './utils.py']

In [14]:
%%time
from dask.distributed import as_completed
import collections, functools, operator 

futures = {}
results = {}

cat_keys = ["dipho", "0lep_1tau", "0lep_2tau", "1lep_1tau", "2lep_0tau", "1lep_0tau"]

for process_key in chunks:
    print ("start process {}".format(process_key))
    #if key != "signal": continue
    futures[process_key] = client.map(process, chunks[process_key], retries=5)
    #futures[process_key] = c.map(process, chunks[process_key], retries=5)
    
    results_local = []
    results[process_key] = {}
    ## do you really need this while?
    while len(results_local) < len(chunks[process_key]): 
        ac = as_completed(futures[process_key], with_results=True)
        for future, result in ac:
            results_local.append(result)
            
    ## merge histograms by the key
    for cat_key in cat_keys: 
        dicts = [results_local[i][cat_key] for i in range(len(results_local))]  
        counter = collections.Counter() 
        for d in dicts:  
            counter.update(d) 
        results[process_key][cat_key] = dict(counter) 

start process EGamma_2018A
start process EGamma_2018B
start process EGamma_2018C
start process EGamma_2018D
start process DYJets
start process ttbar
start process ZG
start process WG
start process GJets_HT40To100
start process GJets_HT100To200
start process GJets_HT200To400
start process GJets_HT400To600
start process GJets_HT600ToInf
start process QCD_pT30To40
start process QCD_pT40ToInf
start process Diphoton
start process ZH
start process signal
CPU times: user 1min 4s, sys: 3.01 s, total: 1min 7s
Wall time: 1h 16min 8s


## save to disk?

- foler1: cat (dipho, 0lep_1tau, 0lep_2tau...)
    + folder2: process (data, signal, ZG...)
        - folder3: hists (pT, eta, phi...)

In [16]:
results.keys()

dict_keys(['EGamma_2018A', 'EGamma_2018B', 'EGamma_2018C', 'EGamma_2018D', 'DYJets', 'ttbar', 'ZG', 'WG', 'GJets_HT40To100', 'GJets_HT100To200', 'GJets_HT200To400', 'GJets_HT400To600', 'GJets_HT600ToInf', 'QCD_pT30To40', 'QCD_pT40ToInf', 'Diphoton', 'ZH', 'signal'])

In [18]:
results['signal'].keys()

dict_keys(['dipho', '0lep_1tau', '0lep_2tau', '1lep_1tau', '2lep_0tau', '1lep_0tau'])

In [19]:
results['signal']['dipho'].keys()

dict_keys(['pho_pT1', 'pho_pT2', 'pho_pTom1', 'pho_pTom2', 'pho_eta1', 'pho_eta2', 'pho_phi1', 'pho_phi2', 'pho_id1', 'pho_id2', 'tau_pT1', 'tau_eta1', 'tau_phi1', 'tau_deeptau_vs_j_1', 'tau_deeptau_vs_m_1', 'tau_deeptau_vs_e_1', 'n_tau', 'mtautau', 'dR_tautau', 'tau_pT2', 'tau_eta2', 'tau_phi2', 'tau_deeptau_vs_j_2', 'tau_deeptau_vs_m_2', 'tau_deeptau_vs_e_2', 'muon_pT1', 'muon_eta1', 'muon_phi1', 'muon_iso1', 'n_muon', 'mmumu', 'dR_mumu', 'muon_pT2', 'muon_eta2', 'muon_phi2', 'muon_iso2', 'electron_pT1', 'electron_eta1', 'electron_phi1', 'electron_iso1', 'n_electron', 'mee', 'dR_ee', 'electron_pT2', 'electron_eta2', 'electron_phi2', 'electron_iso2'])

In [15]:
%%time
from subprocess import call
import json

process_keys = results.keys()
cat_keys = results['signal'].keys()

tag = "basic_dR_mll_cut"

for cat_key in cat_keys:
    for process_key in process_keys:
        dirname = './hists/' + tag + '/' + cat_key + '/' + process_key + '/'
        call('mkdir -p ' + dirname, shell=True)
        for hist_key in results[process_key][cat_key].keys():
            histname = dirname + hist_key
            results[process_key][cat_key][hist_key].to_json(histname+".json")
            #with open(histname, "w") as f:
            #    data = json.dump(results[process_key][cat_key][hist_key].to_json(histname+".json"), f)

CPU times: user 569 ms, sys: 1.65 s, total: 2.22 s
Wall time: 5.83 s


In [49]:
%%bash
ls hists/basic/dipho

Diphoton
DYJets
EGamma_2018A
EGamma_2018B
EGamma_2018C
EGamma_2018D
GJets_HT100To200
GJets_HT200To400
GJets_HT400To600
GJets_HT40To100
GJets_HT600ToInf
QCD_pT30To40
QCD_pT40ToInf
signal
ttbar
WG
ZG
ZH


## scratch

In [9]:
chunks.keys()

dict_keys(['EGamma_2018A', 'EGamma_2018B', 'EGamma_2018C', 'EGamma_2018D', 'DYJets', 'ttbar', 'ZG', 'WG', 'GJets_HT40To100', 'GJets_HT100To200', 'GJets_HT200To400', 'GJets_HT400To600', 'GJets_HT600ToInf', 'QCD_pT30To40', 'QCD_pT40ToInf', 'Diphoton', 'ZH', 'signal'])

In [10]:
%%time
#obj_list_ = ["electron", "muon", "tau", "photon"] #, "others"]
obj_list_ = ["others", "idx", "tau", "photon"]
gHidxs = event_selector.get_gHidx(chunks["signal"][0]) 
evts = event_selector.prepare_inputs(chunks["signal"][0], obj_list_)
#evts = event_selector.prepare_inputs(chunks["EGamma_2018A"][0], obj_list_, isData=True)
evts

CPU times: user 3.73 s, sys: 149 ms, total: 3.88 s
Wall time: 6.07 s


In [15]:
mgg = evts.ggMass
mask_photon = event_selector.select_photon(evts.Photon, gHidxs.gHidx, mgg)
photons_selected = evts.Photon[mask_photon] 
mask_tau = event_selector.select_tau(evts.Tau, "all", isTight=False)
## clean by dR wrt photons
mask_tau = mask_tau & looper_utils.mask_by_dR(evts.Tau, photons_selected, 0.2)
nTau = ak.num(evts.Tau[mask_tau])

In [24]:
evts.Tau.mask[nTau == 2]

<Array [None, None, None, None, ... None] type='33831 * option[var * {"chargedIs...'>

In [34]:
%%time
event_selector.process_dipho_1tau(evts, gHidx)

33831
[2 4 1 ... 1 2 2]
CPU times: user 44.4 ms, sys: 1.99 ms, total: 46.4 ms
Wall time: 42.7 ms


In [21]:
evts.Photon[ ak.num(evts.Photon[mask_diphoton]) == 2 ]

<Array [[{chargedHadronIso: 2.58, ... ] type='16344 * var * {"chargedHadronIso":...'>

In [14]:
mask_diphoton[103]

<Array [False, True, False, False, False] type='5 * bool'>