In [7]:
import math
from __future__ import division
from collections import OrderedDict
from striped.job import SinglePointStripedSession as Session, IPythonDisplay
import histbook as hb

job_server = ("ifdb02.fnal.gov", 8765)
session = Session(job_server)

In [8]:
bg_datasets = """ 
WminusH_HToBB_WToLNu_M125_13TeV_powheg_pythia8,
ST_s-channel_4f_InclusiveDecays_13TeV-amcatnlo-pythia8
""".split()
bg_datasets = [ds.strip() for ds in bg_datasets if ds.strip()]

In [9]:
import fnal_column_analysis_tools.lookup_tools as lookup_tools
import cloudpickle
import zlib

# import a bunch of correction histograms
weightsext = lookup_tools.extractor()
correctionDescriptions = open("newCorrectionFiles.txt").readlines()
weightsext.add_weight_sets(correctionDescriptions)
weightsext.finalize()
weights_eval = weightsext.make_evaluator()
#let's pickle and zip it
weights_names = zlib.compress(cloudpickle.dumps(weightsext._names))
weights_vals = zlib.compress(cloudpickle.dumps(weightsext._weights))

#dir(weights_eval)
#print(weights_eval["muScaleFactor_TightId_Iso"])

In [10]:
all_hists = OrderedDict()
cat = hb.groupby("category", keeporder=True)

def add_1d(*args,**kwargs):
    h = None 
    if 'wgt' in kwargs:
        h = hb.Hist(hb.bin(*args), cat, weight=kwargs['wgt'])
    else:
        h = hb.Hist(hb.bin(*args), cat)    
    all_hists[args[0]] = h
    return h

add_1d("FatJetPt", 500, 0, 1000)
add_1d("MuonPt", 500, 0, 1000)
add_1d("ElectronPt", 500, 0, 1000)
add_1d("MuonE", 500, 0, 1000)
add_1d("ElectronE", 500, 0, 1000)
add_1d("MuonEta", 500, 0, 1000)
add_1d("ElectronEta", 500, 0, 1000)
add_1d("stripeThroughput", 300, 0, 1000000)
add_1d("FatJetCounts",15,0,15)
add_1d("MET",500,0,1000)
#add_1d("stripeTime", 1000, 0.0, 0.5)
#add_1d("leadingLeptonPt", 100, 0, 500,wgt="weight")
#add_1d("zMass", 120, 60, 180,wgt="weight")

display_opts = {'width': 300, 'height': 300}
def buildDisplay(hists, cols=2):
    rows = []
    for iRow in range(int(math.ceil(len(hists)/cols))):
        hrowname = hists.keys()[iRow*cols:(iRow+1)*cols]
        hrow = hists.values()[iRow*cols:(iRow+1)*cols]
        rows.append(hb.beside(*tuple(h.overlay("category").step(n, **display_opts) for h,n in zip(hrow, hrowname))))
    return IPythonDisplay(
        hb.below(*tuple(rows))
    )

display = buildDisplay(all_hists)



class Callback:
    def __init__(self, display):
        self.Display = display
        
    def on_histogram_update(self, nevents):
        self.Display.update()


In [11]:
#__worker_class__

# break sandbox
# https://stackoverflow.com/questions/33880646/access-module-sys-without-using-import-machinery
sys = next(getattr(c, f).__func__.__globals__['sys'] for c in ().__class__.__base__.__subclasses__() for f in dir(c) if isinstance(getattr(c, f, None), type((lambda: 0).__get__(0))) and 'sys' in getattr(c, f).__func__.__globals__)
if 'sandbox' in sys.modules:
    __builtins__['__import__'] = sys.modules['sandbox'].saved_import
    #import subprocess
    #raise Exception(subprocess.check_output("pip list".split(" ")))
    
import numpy as np
import awkward
import uproot_methods
import cloudpickle
import zlib
import time
from collections import OrderedDict
import fnal_column_analysis_tools
from fnal_column_analysis_tools.analysis_objects import JaggedCandidateArray
from fnal_column_analysis_tools.striped import ColumnGroup, PhysicalColumnGroup, jaggedFromColumnGroup
from fnal_column_analysis_tools.striped.WorkerAddons import Timer
from fnal_column_analysis_tools.lookup_tools import evaluator

class Worker(Timer):
    def __init__(self):
        super(Worker,self).__init__()
        self.Columns = ["MET_pt","AK15Puppi.pt","AK15Puppi.jetId","Jet.pt"
                        "nMuon","nElectron",
                        "Electron.pt","Electron.eta","Electron.phi",
                        "Electron.dxy", "Electron.dz","Electron.mass", 
                        "Electron.pfRelIso03_all", "Electron.mvaSpring16GP_WP90",
                        "Muon.pt", "Muon.eta", "Muon.phi",
                        "Muon.mass","Muon.dxy", "Muon.dz", "Muon.pfRelIso04_all" ]
        self.weights_eval = None        
        
    def run(self, events, job):
        stripelen = len(events.AK15Puppi.count)
        self.set_tic('all')
    
        if self.weights_eval is None:            
            self.weights_eval = evaluator(cloudpickle.loads(zlib.decompress(job["weights_names"])),
                                          cloudpickle.loads(zlib.decompress(job["weights_vals"])))
        weights_eval = self.weights_eval

        #Jet Selection
        fatJetCols = ColumnGroup(events,"AK15Puppi","pt","jetId")
        fatJet_new = jaggedFromColumnGroup(fatJetCols)
        
        fatJet_selection = ( (fatJet_new["jetId"] > 0) & (fatJet_new["pt"] > 200) ) 
        selected_fatJets = fatJet_new[fatJet_selection]
        
        #Muon Selection
        m_counts = Muon.count
        flat_muon_lorentz = uproot_methods.classes.TLorentzVectorArray.from_ptetaphim(events.Muon.pt,events.Muon.eta,events.Muon.phi,events.Muon.mass)
        muons = JaggedCandidateArray.candidatefromcounts(m_counts,"dxy","dz","pfRelIso04_all",p4=flat_muon_lorentz)        
        
        loose_m_selection = (muons.p4.pt>5)*(abs(muons.p4.eta)<2.4)*(abs(muons["dxy"])<0.5)*(abs(muons["dz"])<1)*(muons["pfRelIso04_all"]<0.4)
        loose_muons = muons[loose_m_selection]
        
        #Electron Selection
        e_counts = Electron.count
        #eleCols = ColumnGroup(events,"Electron","pt","eta","phi","mass")
        #jagged_ele = jaggedFromColumnGroup(eleCols)
        #flat_electron_lorentz = uproot.methods.classes.TLorentzVectorArray.from_ptetaphim(jagged_ele["pt"], jagged_ele["eta"],jagged_ele["phi"],jagged_ele["mass"])
        flat_electron_lorentz = uproot.methods.classes.TLorentzVectorArray.from_ptetaphim(events.Electron.pt, events.Electron.eta, events.Electron.phi, events.Electron.mass)
        electrons = JaggedCandidateArray.candidatefromcounts(e_counts,"dxy","dz","pfRelIso03_all","mvaSpring16GP_WP90",p4=flat_electron_lorentz)
    
        loose_e_selection = (electrons.p4.pt>7)*(abs(electrons.p4.eta)<2.4)*(abs(electrons["dxy"])<0.05)*(abs(electrons["dz"])<0.2)*(electrons["pfRelIso03_all"]<0.4)*(electrons["mvaSpring16GP_WP90"])
        loose_electrons = electrons[loose_e_selection]
        
        
        #I think I tried this before and it didn't work
        signal_events = events[selected_fatJets.counts==1 & loose_electrons.counts==0 & loose_muons.counts==0 ]

        #Calculate phi between MET and selected_fatJets
        dphipfmet = abs(signal_events.MET_phi - selected_fatJets.p4.phi)
        
        job.fill(
            category="test",
            MuonPt=loose_muons.p4.pt.flatten(),
            MuonEta=loose_muons.p4.eta.flatten(),
            MuonE=loose_muons.p4.E.flatten(),
            ElectronPt=loose_electrons.p4.pt.flatten(),
            ElectronEta=loose_electrons.p4.eta.flatten(),
            ElectronE=loose_electrons.p4.E.flatten(),
            FatJetPt=selected_fatJets["pt"].flatten(),
            FatJetCounts=selected_fatJets.counts.flatten(),
            MET=signal_events.MET_pt.flatten()
        ) 
        
        #profiling info
        self.set_toc('all')
        self.fill_job_timer_info(stripelen,job)        
        

In [12]:
display.init()
callback = Callback(display)
_ = map(lambda h: h.clear(), all_hists.values())

import time
nevents_total = 0
t1 = time.time()
for dataset in bg_datasets:
    job = session.createJob(dataset, 
            fraction=1.,
            user_callback=callback, 
            user_params = {"weights_names":weights_names,
                           "weights_vals":weights_vals},
            histograms=all_hists.values()
    )
    job.run()
    runtime = job.TFinish - job.TStart
    nevents = job.EventsProcessed
    nevents_total += nevents
    print "%-70s %7.3f M events, %7.3f M events/sec" % (dataset[:70], float(nevents)/1e6, nevents/runtime/1000000)
    #print "%s: %.6f million events/second" % (dataset, nevents/runtime/1000000)
    display.update()

t2 = time.time()
print "Total events processed: %d in %.1f seconds -> %.6f million events/second" %(nevents_total, t2-t1, nevents_total/(t2-t1)/1000000)


WminusH_HToBB_WToLNu_M125_13TeV_powheg_pythia8,                          0.000 M events,   0.000 M events/sec


Striped worker #7 excepton:
Traceback (most recent call last):
  File "product/worker/socket_worker_spawner3.py", line 79, in run
    self.runWorker(params, dxsock, addr)
  File "product/worker/socket_worker_spawner3.py", line 121, in runWorker
    nevents = worker.run()
  File "/home/ivm/build/striped/worker/StripedWorker2.py", line 141, in run
    use_data_cache = self.UseDataCache)
  File "/home/ivm/build/striped/worker/StripedWorker2.py", line 16, in __init__
    self.ClientDataset = client.dataset(dataset_name, columns)
  File "/home/ivm/striped_home/product/striped/client/StripedClient.py", line 633, in dataset
    return StripedDataset(self, name, preload_columns=preload_columns)
  File "/home/ivm/striped_home/product/striped/client/StripedClient.py", line 213, in __init__
    self.columns(preload_columns, include_size_columns=True)    # this will preload columns
  File "/home/ivm/striped_home/product/striped/client/StripedClient.py", line 357, in columns
    sc = cc.sizeColumn


Striped worker #6 excepton:
Traceback (most recent call last):
  File "product/worker/socket_worker_spawner3.py", line 79, in run
    self.runWorker(params, dxsock, addr)
  File "product/worker/socket_worker_spawner3.py", line 121, in runWorker
    nevents = worker.run()
  File "/home/ivm/build/striped/worker/StripedWorker2.py", line 141, in run
    use_data_cache = self.UseDataCache)
  File "/home/ivm/build/striped/worker/StripedWorker2.py", line 16, in __init__
    self.ClientDataset = client.dataset(dataset_name, columns)
  File "/home/ivm/striped_home/product/striped/client/StripedClient.py", line 633, in dataset
    return StripedDataset(self, name, preload_columns=preload_columns)
  File "/home/ivm/striped_home/product/striped/client/StripedClient.py", line 213, in __init__
    self.columns(preload_columns, include_size_columns=True)    # this will preload columns
  File "/home/ivm/striped_home/product/striped/client/StripedClient.py", line 357, in columns
    sc = cc.sizeColumn


Striped worker #0 excepton:
Traceback (most recent call last):
  File "product/worker/socket_worker_spawner3.py", line 79, in run
    self.runWorker(params, dxsock, addr)
  File "product/worker/socket_worker_spawner3.py", line 121, in runWorker
    nevents = worker.run()
  File "/home/ivm/build/striped/worker/StripedWorker2.py", line 141, in run
    use_data_cache = self.UseDataCache)
  File "/home/ivm/build/striped/worker/StripedWorker2.py", line 16, in __init__
    self.ClientDataset = client.dataset(dataset_name, columns)
  File "/home/ivm/striped_home/product/striped/client/StripedClient.py", line 633, in dataset
    return StripedDataset(self, name, preload_columns=preload_columns)
  File "/home/ivm/striped_home/product/striped/client/StripedClient.py", line 213, in __init__
    self.columns(preload_columns, include_size_columns=True)    # this will preload columns
  File "/home/ivm/striped_home/product/striped/client/StripedClient.py", line 357, in columns
    sc = cc.sizeColumn


ST_s-channel_4f_InclusiveDecays_13TeV-amcatnlo-pythia8                   0.000 M events,   0.000 M events/sec


Striped worker #2 excepton:
Traceback (most recent call last):
  File "product/worker/socket_worker_spawner3.py", line 79, in run
    self.runWorker(params, dxsock, addr)
  File "product/worker/socket_worker_spawner3.py", line 121, in runWorker
    nevents = worker.run()
  File "/home/ivm/build/striped/worker/StripedWorker2.py", line 141, in run
    use_data_cache = self.UseDataCache)
  File "/home/ivm/build/striped/worker/StripedWorker2.py", line 16, in __init__
    self.ClientDataset = client.dataset(dataset_name, columns)
  File "/home/ivm/striped_home/product/striped/client/StripedClient.py", line 633, in dataset
    return StripedDataset(self, name, preload_columns=preload_columns)
  File "/home/ivm/striped_home/product/striped/client/StripedClient.py", line 213, in __init__
    self.columns(preload_columns, include_size_columns=True)    # this will preload columns
  File "/home/ivm/striped_home/product/striped/client/StripedClient.py", line 357, in columns
    sc = cc.sizeColumn


Total events processed: 0 in 66.4 seconds -> 0.000000 million events/second


In [13]:
def moment(df, n, binval='mid'):
    """
        df: DataFrame with single-level MultiIndex specifying binning, and a shape of (nbins, )
        n: n-th moment of distribution, ignoring (over/under/nan)flow bins
        binval in ['left', 'right', 'mid']: point in the bin to use as bin value
    """
    vals = np.array([getattr(b, binval) for b in df.index if type(b) is not str])[1:-1]
    weights = np.array(df)[1:-2]  # ignore nanflow (last bin)
    moment = sum(pow(vals, n)*weights) / sum(weights)
    return moment

def mean(df, binval='mid'):
    return moment(df, 1, binval)

def std(df, binval='mid'):
    return np.sqrt(max(moment(df, 2, binval) - moment(df, 1, binval)**2, 0.))

In [14]:
stripeThroughput = mean(all_hists['stripeThroughput'].pandas()['count()']["all"])
nWorkers = 180
processingTime = nevents_total/stripeThroughput/nWorkers
print "Stripe processing throughput: %.0f evt/s" % stripeThroughput
print "Total throughput: %.0f evt/s" % (nevents_total/(t2-t1), )
print "Striped server overhead: %.1f %%" % ((1-processingTime/(t2-t1))*100, )

KeyError: 'all'