Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix parallel processing for python3 #52

Merged
merged 5 commits into from
Nov 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 5 additions & 6 deletions Fitter/TauES/createinputsTES.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,14 +104,14 @@ def main(args):
fname = "%s/%s_%s_tes_$OBS.inputs-%s-%s.root"%(outdir,analysis,chshort,era,tag)

print "Nominal inputs"
createinputs(fname, sampleset, observables, bins, filter=setup["processes"], dots=True)
createinputs(fname, sampleset, observables, bins, filter=setup["processes"], dots=True, parallel=parallel)

if "TESvariations" in setup:
for var in setup["TESvariations"]["values"]:
print "Variation: TES = %f"%var

newsampleset = sampleset.shift(setup["TESvariations"]["processes"], ("_TES%.3f"%var).replace(".","p"), "_TES%.3f"%var, " %.1d"%((1.-var)*100.)+"% TES", split=True,filter=False,share=True)
createinputs(fname,newsampleset, observables, bins, filter=setup["TESvariations"]["processes"], dots=True)
createinputs(fname,newsampleset, observables, bins, filter=setup["TESvariations"]["processes"], dots=True, parallel=parallel)
newsampleset.close()

if "systematics" in setup:
Expand All @@ -128,7 +128,7 @@ def main(args):
weightReplaced = [sysDef["nomWeight"],sysDef["altWeights"][iSysVar]] if "altWeights" in sysDef else ["",""]

newsampleset_sys = sampleset.shift(sysDef["processes"], sampleAppend, "_"+sysDef["name"]+sysDef["variations"][iSysVar], sysDef["title"], split=True,filter=False,share=True)
createinputs(fname,newsampleset_sys, observables, bins, filter=sysDef["processes"], replaceweight=weightReplaced, dots=True)
createinputs(fname,newsampleset_sys, observables, bins, filter=sysDef["processes"], replaceweight=weightReplaced, dots=True, parallel=parallel)
newsampleset_sys.close()

if "TESvariations" in setup:
Expand All @@ -137,7 +137,7 @@ def main(args):
for var in setup["TESvariations"]["values"]:
print "Variation: TES = %f"%var
newsampleset_TESsys = sampleset.shift(overlap_TES_sys, ("_TES%.3f"%var).replace(".","p")+sampleAppend, "_TES%.3f"%var+"_"+sysDef["name"]+sysDef["variations"][iSysVar], " %.1d"%((1.-var)*100.)+"% TES" + sysDef["title"], split=True,filter=False,share=True)
createinputs(fname,newsampleset_TESsys, observables, bins, filter=overlap_TES_sys, replaceweight=weightReplaced, dots=True)
createinputs(fname,newsampleset_TESsys, observables, bins, filter=overlap_TES_sys, replaceweight=weightReplaced, dots=True, parallel=parallel)
newsampleset_TESsys.close()


Expand Down Expand Up @@ -166,9 +166,8 @@ def main(args):

if __name__ == "__main__":
from argparse import ArgumentParser
argv = sys.argv
description = """Create input histograms for datacards"""
parser = ArgumentParser(prog="createInputs",description=description,epilog="Good luck!")
parser = ArgumentParser(description=description,epilog="Good luck!")
parser.add_argument('-y', '--era', dest='eras', nargs='*', choices=['2016','2017','2018','UL2016_preVFP','UL2016_postVFP','UL2017','UL2018','UL2018_v2p5'], default=['UL2017'], action='store',
help="set era" )
parser.add_argument('-c', '--config', dest='config', type=str, default='TauES/config/defaultFitSetupTES_mutau.yml', action='store',
Expand Down
7 changes: 3 additions & 4 deletions Fitter/paper/createinputs.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ def main(args):
GMF = "genmatch_2<5"
if splitbydm:
idweights = getdmsf(era) # DM-dependent SF, pT > 20 GeV
print idweights
print(idweights)
sampleset.split('DY',[('ZTT_DM0', GMR+" && dm_2==0"), ('ZTT_DM1', GMR+" && dm_2==1"),
('ZTT_DM10',GMR+" && dm_2==10"),('ZTT_DM11',GMR+" && dm_2==11"),
('ZL',GML),('ZJ',GMJ)])
Expand Down Expand Up @@ -228,9 +228,8 @@ def main(args):

if __name__ == "__main__":
from argparse import ArgumentParser
argv = sys.argv
description = """Create input histograms for datacards"""
parser = ArgumentParser(prog="createInputs",description=description,epilog="Good luck!")
parser = ArgumentParser(description=description,epilog="Good luck!")
parser.add_argument('-y', '--era', dest='eras', nargs='*', choices=['2016','2017','2018','UL2017'], default=['UL2017'], action='store',
help="set era" )
parser.add_argument('-c', '--channel', dest='channels', nargs='*', choices=['mutau','mumu'], default=['mutau'], action='store',
Expand All @@ -245,5 +244,5 @@ def main(args):
LOG.verbosity = args.verbosity
PLOG.verbosity = args.verbosity
main(args)
print "\n>>> Done."
print("\n>>> Done.")

11 changes: 6 additions & 5 deletions Fitter/python/plot/datacard.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,10 +138,11 @@ def createinputs(fname,sampleset,obsset,bins,syst="",**kwargs):
if files[obs].cd(bin): # $FILE:$BIN/$PROCESS_$SYSTEMATC
hist.Write(name,TH1.kOverwrite)
TAB.printrow(hist.GetSumOfWeights(),hist.GetEntries(),obs.printbins(),name)
deletehist(hist) # clean memory
if not parallel: # avoid segmentation faults for parallel
deletehist(hist) # clean histogram from memory

# CLOSE
for obs, file in files.iteritems():
for obs, file in files.items():
file.Close()


Expand Down Expand Up @@ -170,7 +171,7 @@ def plotinputs(fname,varprocs,obsset,bins,**kwargs):
if isinstance(varprocs['Nom'],Systematic): # convert Systematic objects back to simple string
systs = varprocs # OrderedDict of Systematic objects
varprocs = OrderedDict()
for syskey, syst in systs.iteritems():
for syskey, syst in systs.items():
if syskey.lower()=='nom':
varprocs['Nom'] = syst.procs
else:
Expand All @@ -181,7 +182,7 @@ def plotinputs(fname,varprocs,obsset,bins,**kwargs):
ftag = tag+obs.tag
fname_ = repkey(fname,OBS=obsname,TAG=ftag)
file = ensureTFile(fname_,'UPDATE')
for set, procs in varprocs.iteritems(): # loop over processes with variation
for set, procs in varprocs.items(): # loop over processes with variation
if set=='Nom':
systag = "" # no systematics tag for nominal
procs_ = procs[:]
Expand Down Expand Up @@ -250,7 +251,7 @@ def stackinputs(file,variable,processes,**kwargs):
exphists.append(hist)
for group in groups:
grouphists(exphists,*group,replace=True,regex=True,verb=0)
stack = Stack(variable,datahist,exphists)
stack = Stack(variable,datahist,exphists,clone=True)
stack.draw()
stack.drawlegend(ncols=2,twidth=0.9)
if text:
Expand Down
28 changes: 14 additions & 14 deletions Plotter/config/samples.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ def getsampleset(channel,era,**kwargs):
#tag = kwargs.get('tag', "" ) # extra tag for sample file names
table = kwargs.get('table', True ) # print sample set table
setera(era) # set era for plot style and lumi-xsec normalization
if 'TT' in split and 'Top' in join: # don't join TT & ST
if split and 'TT' in split and 'Top' in join: # don't join TT & ST
join.remove('Top')
join += ['TT','ST']

Expand Down Expand Up @@ -177,19 +177,19 @@ def getsampleset(channel,era,**kwargs):
sampleset.stitch("W*Jets", incl='WJ', name='WJ' ) # W + jets
if "v2p5" in era:
sampleset.stitch("DY*J*M-50", incl='DYJ', name="DY_M50")
# The following skimming efficiencies should be removed when running on nTuples with corrected cutflow content!!!!
eff_nanoAOD_DYll=0.47435726,
eff_nanoAOD_DYll_0orp4j=0.439206,
eff_nanoAOD_DYll_1j=0.54153996,
eff_nanoAOD_DYll_2j=0.59700258,
eff_nanoAOD_DYll_3j=0.65562099,
eff_nanoAOD_DYll_4j=0.74383978,
eff_nanoAOD_mutau=0.82747870,
eff_nanoAOD_mutau_0orp4j=0.814466,
eff_nanoAOD_mutau_1j=0.847658,
eff_nanoAOD_mutau_2j=0.867779,
eff_nanoAOD_mutau_3j=0.889908,
eff_nanoAOD_mutau_4j=0.922111) # Drell-Yan, M > 50 GeV
## The following skimming efficiencies should be removed when running on nTuples with corrected cutflow content!!!!
# eff_nanoAOD_DYll=0.47435726,
# eff_nanoAOD_DYll_0orp4j=0.439206,
# eff_nanoAOD_DYll_1j=0.54153996,
# eff_nanoAOD_DYll_2j=0.59700258,
# eff_nanoAOD_DYll_3j=0.65562099,
# eff_nanoAOD_DYll_4j=0.74383978,
# eff_nanoAOD_mutau=0.82747870,
# eff_nanoAOD_mutau_0orp4j=0.814466,
# eff_nanoAOD_mutau_1j=0.847658,
# eff_nanoAOD_mutau_2j=0.867779,
# eff_nanoAOD_mutau_3j=0.889908,
# eff_nanoAOD_mutau_4j=0.922111) # Drell-Yan, M > 50 GeV
else:
sampleset.stitch("DY*J*M-50", incl='DYJ', name="DY_M50")
#sampleset.stitch("DY*J*M-10to50", incl='DYJ', name="DY_M10to50" )
Expand Down
2 changes: 1 addition & 1 deletion Plotter/plot.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ def plot(sampleset,setup,parallel=True,tag="",extratext="",outdir="plots",era=""
stack.drawlegend() #position)
stack.drawtext(text)
stack.saveas(fname,ext=exts,tag=tag)
stack.close()
stack.close(keep=True)


def main(args):
Expand Down
3 changes: 2 additions & 1 deletion Plotter/python/methods/QCD_OSSS.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,8 @@ def QCD_OSSS(self, variables, selection, **kwargs):
LOG.verbose("SampleSet.QCD_OSSS: SS yields: data=%.1f, exp=%.1f, qcd=%.1f, scale=%.3f"%(ndata,nexp,nqcd,scale),verbosity,level=2)

# CLEAN
deletehist([datahist,exphist]+exphists)
if not parallel: # avoid segmentation faults for parallel
deletehist([datahist,exphist]+exphists) # clean histogram from memory

return qcdhists

Expand Down
145 changes: 145 additions & 0 deletions Plotter/test/testRDataFrame.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
#!/usr/bin/env python3
# Author: Izaak Neutelings (November 2023)
# Description: Test RDataFrame to run some samples in parallel
# References:
# https://root.cern/doc/master/classROOT_1_1RDataFrame.html#parallel-execution
import os, re
import time
from array import array
import ROOT; ROOT.PyConfig.IgnoreCommandLineOptions = True # to avoid conflict with argparse
from ROOT import gROOT, gStyle, TNamed, RDataFrame, RDF, TCanvas, TLegend,\
kRed, kGreen, kBlue, kMagenta
#RDataFrame = ROOT.RDF.Experimental.Distributed.Spark.RDataFrame
gROOT.SetBatch(True) # don't open GUI windows
gStyle.SetOptTitle(False) # don't make title on top of histogram
ROOT.EnableImplicitMT(4) # number of cores
rexp_esc = re.compile(r"[-+()\[\]\.:]") # escape characters for filename-safe
rexp_stit = re.compile(r"([^/]+).root") # get filename without extension
TNamed.__repr__ = lambda o: "<%s(%r,%r) at %s>"%(o.__class__.__name__,o.GetName(),o.GetTitle(),hex(id(o)))
#RDF.RInterface.__repr__ = lambda o: "xxx<%s(%r,%r) at %s>"%(o.__class__.__name__,o.GetName(),o.GetTitle(),hex(id(o)))
lcolors = [kBlue+1, kRed+1, kGreen+1, kMagenta+1]

def took(start_wall,start_cpu,pre=""):
time_wall = time.time() - start_wall # wall clock time
time_cpu = time.perf_counter() - start_cpu # CPU time
return "%s %d min %.1f sec (CPU: %d min %.1f sec)"%((pre,)+divmod(time_wall,60)+divmod(time_cpu,60))


def plot(fname,hists,verb=0):
print(f">>> plot: {fname!r}: {hists!r}")
canvas = TCanvas('canvas','canvas',100,100,1000,800) # XYWH
canvas.SetMargin(0.10,0.04,0.11,0.02) # LRBT
canvas.SetLogy()
canvas.SetTicks(1,0) # draw ticks on opposite side
#legend = TLegend()
for hist, color in zip(hists,lcolors):
hist.SetLineWidth(2)
hist.SetLineColor(color)
hist.Draw('HIST E1 SAME')
#legend.Draw()
canvas.BuildLegend()
canvas.SaveAs(fname)
canvas.Close()


def sampleset_RDF(fnames,tname,vars,sels,rungraphs=True,verb=0):
print(f">>> sampleset_RDF")
results = [ ]
res_dict = { } # { selection : { variable: [ hist1, ... ] } }

# BOOK histograms
start = time.time(), time.perf_counter() # wall-clock & CPU time
for stitle, fname, weight in fnames:
rdframe = RDataFrame(tname,fname)
wname = weight
if verb>=1:
print(f">>> sampleset_RDF: Created RDF {rdframe!r} for {fname}...")
if not rdframe.HasColumn(wname): # to compile mathematical expressions in xvar
wname = "_rdf_sample_weight"
if verb>=1:
print(f">>> sampleset_RDF: Defining {wname!r} as {weight!r}...")
rdframe = rdframe.Define(wname,weight) # define column for this variable

# SELECTIONS: add filters
for sname, sel in sels:
print(f">>> sampleset_RDF: Selections {sel!r}...")
rdf_sel = rdframe.Filter(sel)
if verb>=1:
print(f">>> sampleset_RDF: Created RDF {rdf_sel!r} with filter {sel!r}...")

# VARIABLES: book histograms
for vname, vexp, *bins in vars:
model = (vname,stitle,*bins) # RDF.TH1DModel
rdf_var = rdf_sel
if not rdf_var.HasColumn(vexp): # to compile mathematical expressions in xvar
if verb>=2:
print(f">>> sampleset_RDF: Defining {vname!r} as {vexp!r}...")
rdf_var = rdf_var.Define(vname,vexp) # define column for this variable
vexp = vname
if verb>=2:
print(f">>> sampleset_RDF: Booking {vname!r} with model {model!r} and RDF {rdf_var!r}...")
result = rdf_var.Histo1D(model,vexp,wname)
if verb>=2:
print(f">>> sampleset_RDF: Booked {vname!r}: {result!r}")
results.append(result)
res_dict.setdefault(sname,{ }).setdefault(vname,[ ]).append(result) #.GetValue()
print(f">>> sampleset_RDF: Booking took {took(*start)}")

# RUN events loops to fill histograms
start = time.time(), time.perf_counter() # wall-clock & CPU time
if rungraphs:
print(f">>> sampleset_RDF: Start RunGraphs of {len(results)} results...")
RDF.RunGraphs(results)
else:
print(f">>> sampleset_RDF: Start Draw for {len(results)} results...")
for result in results:
print(f">>> sampleset_RDF: Start {result} ...")
result.Draw('hist')
print(f">>> sampleset_RDF: Processing took {took(*start)}")

# PLOT histograms
for sname in res_dict:
for vname in res_dict[sname]:
fname = f"{vname}_{sname}.png"
hists = [r.GetValue() for r in res_dict[sname][vname]]
plot(fname,hists,verb=verb)


def main(args):
ROOT.EnableImplicitMT(args.ncores) # number of cores
verbosity = args.verbosity #+2
indir = "/scratch/ineuteli/analysis/UL2018"
tname = 'tree'
fnames = args.fnames or [
('DYJets', f"{indir}/DY/DYJetsToLL_M-50_mutau.root", "2.65e-5*genweight"), # 5343/201506219
('TTTo2L2N',f"{indir}/TT/TTTo2L2Nu_mutau.root", "5.61e-9*genweight"), # 88.29/15748570179.6
('TTToHadr',f"{indir}/TT/TTToHadronic_mutau.root", "2.57e-9*genweight"), # 377.96/146924304906.7
('TTToSemi',f"{indir}/TT/TTToSemiLeptonic_mutau.root", "2.05e-9*genweight"), # 365.35/177965649707.7
]
sels = [
("ss_ptgt20","q_1*q_2<0 && pt_1>20 && pt_2>20"),
("os_ptgt20","q_1*q_2>0 && pt_1>20 && pt_2>20"),
("os_ptgt40","q_1*q_2>0 && pt_1>40 && pt_2>40"),
]
ptbins = array('d',[0,10,15,20,21,23,25,30,40,60,100,150,200,300,500])
vars = [
('pt_1', 'pt_1',50,0,250),
('pt_1_rebin','pt_1',len(ptbins)-1,ptbins),
('deta', 'abs(eta_1-eta_2)',50,0,5),
]
#sampleset_RDF(fnames,tname,vars,sels,rungraphs=False,verb=verbosity)
sampleset_RDF(fnames,tname,vars,sels,rungraphs=True,verb=verbosity)


if __name__ == "__main__":
from argparse import ArgumentParser
description = """Test RDataFrame."""
parser = ArgumentParser(description=description,epilog="Good luck!")
parser.add_argument('-i', '--fnames', nargs='+', help="input files" )
parser.add_argument('-t', '--tag', default="", help="extra tag for output" )
parser.add_argument('-n', '--ncores', default=4, type=int, help="number of cores, default=%(default)s" )
parser.add_argument('-v', '--verbose', dest='verbosity', type=int, nargs='?', const=1, default=0, action='store',
help="set verbosity" )
args = parser.parse_args()
main(args)
print("\n>>> Done.")