In [1]:
import ROOT
import dask
import os
import numpy as np
import math
import pandas
from ROOT import RooRealVar, RooBreitWigner, RooCBShape, RooFFTConvPdf, RooChebychev, RooAddPdf, RooDataHist, RooPlot, TCanvas, TPad, TH1, TF1
from ROOT import RooArgList
import time
# Initialize ROOT
ROOT.PyConfig.IgnoreCommandLineOptions = True
#ROOT.ROOT.EnableImplicitMT()
from dask.distributed import LocalCluster, Client

Welcome to JupyROOT 6.26/08


In [2]:
class variable(object):
    def __init__(self, name, title, nbins=None, xmin=None, xmax=None):
        self._name = name
        self._title = title
        self._nbins = nbins
        self._xmin = xmin
        self._xmax = xmax
    def __str__(self):
        return  '\"'+str(self._name)+'\",\"'+str(self._title)+'\",\"'+str(self._nbins)+','+str(self._xmin)+','+str(self._xmax)

my_vars = []

my_vars.append(variable(name = "e1_energy", title= "leading electron energy [GeV]", nbins = 50, xmin = 0, xmax=100))
my_vars.append(variable(name = "e2_energy", title= "sub leading electron energy [GeV]", nbins = 50, xmin = 0, xmax=100))
my_vars.append(variable(name = "m_ee", title= "Zee invariant mass, m_{ee} [GeV]", nbins = 50, xmin = 84, xmax=98))

In [10]:
#global variables                                                                                                                                                
fit_lowcut = 84.
fit_highcut = 98.
NbinsX = 50

nmaxiteration= 100
recreate_files= True

nmaxpartition = 1
distributed = False


#file = ROOT.TFile.Open("/home/jovyan/work/RDataFrame_test/ee_Z_ee_EDM4Hep.root")
file = ROOT.TFile.Open("/home/jovyan/work/RDataFrame_test/input_times_200.root")

for a in file.GetListOfKeys():
    print(a)
    
folder = "/home/jovyan/work/RDataFrame_test/output/mytest_Zee/"
if not os.path.exists(folder):
    os.mkdir(folder)
repohisto = folder+"plots/"
if not os.path.exists(repohisto):
    os.mkdir(repohisto)    
    
text_file = open("/home/jovyan/work/RDataFrame_test/utils/functions.h", "r")   
data = text_file.read()

Name: events Title: Events tree
Name: events Title: Events tree


In [11]:
def my_initialization_function():
    print(ROOT.gInterpreter.ProcessLine(".O"))
    ROOT.gInterpreter.Declare('{}'.format(data))
    print("end of initialization")
    
def create_connection():
    """
    Setup connection to a Dask cluster. Two ingredients are needed:
    1. Creating a cluster object that represents computing resources. This can be
       done in various ways depending on the type of resources at disposal. To use
       only the local machine (e.g. your laptop), a `LocalCluster` object can be
       used. This step can be skipped if you have access to an existing Dask
       cluster; in that case, the cluster administrator should provide you with a
       URL to connect to the cluster in step 2. More options for cluster creation
       can be found in the Dask docs at
       http://distributed.dask.org/en/stable/api.html#cluster .
    2. Creating a Dask client object that connects to the cluster. This accepts
       directly the object previously created. In case the cluster was setup
       externally, you need to provide an endpoint URL to the client, e.g.
       'https://myscheduler.domain:8786'.
 
    Through Dask, you can connect to various types of cluster resources. For
    example, you can connect together a set of machines through SSH and use them
    to run your computations. This is done through the `SSHCluster` class. For
    example:
 
    ```python
    from dask.distributed import SSHCluster
    cluster = SSHCluster(
        # A list with machine host names, the first name will be used as
        # scheduler, following names will become workers.
        hosts=["machine1","machine2","machine3"],
        # A dictionary of options for each worker node, here we set the number
        # of cores to be used on each node.
        worker_options={"nprocs":4,},
    )
    ```
 
    Another common usecase is interfacing Dask to a batch system like HTCondor or
    Slurm. A separate package called dask-jobqueue (https://jobqueue.dask.org)
    extends the available Dask cluster classes to enable running Dask computations
    as batch jobs. In this case, the cluster object usually receives the parameters
    that would be written in the job description file. For example:
 
    ```python
    from dask_jobqueue import HTCondorCluster
    cluster = HTCondorCluster(
        cores=1,
        memory='2000MB',
        disk='1000MB',
    )
    # Use the scale method to send as many jobs as needed
    cluster.scale(4)
    ```
 
    In this tutorial, a cluster object is created for the local machine, using
    multiprocessing (processes=True) on 4 workers (n_workers=4) each using only
    1 core (threads_per_worker=1).
    """
    cluster = LocalCluster(n_workers=2, threads_per_worker=1, processes=True)
    client = Client(cluster)
    return client

In [12]:
def myGetFitParameters(map_histog, mean_bw, input_width, input_sigma, path, m_sf, NbinsX, fit_lowcut, fit_highcut):
    parameters = []

    x = RooRealVar("x", "x", fit_lowcut, fit_highcut)  # 84,98//80-100                                                                                           
    x.setBins(10000, "cache")
    x.setMin("cache", 64.)
    x.setMax("cache", 118.)

    m0 = RooRealVar("m0", "m0", mean_bw, fit_lowcut, fit_highcut)  # 80-100                                                                                      
    width = RooRealVar("width", "width", input_width, 1., 4.)
    bw = RooBreitWigner("bw", "bw", x, m0, width)

    mean = RooRealVar("mean", "mean", 0.)
    sigma = RooRealVar("sigma", "sigma", input_sigma, 1., 5.)
    alpha = RooRealVar("alpha", "alpha", 1.3)
    n = RooRealVar("n", "n", 5.1)
    cb = RooCBShape("cb", "cb", x, mean, sigma, alpha, n)

    pdf_sig = RooFFTConvPdf("pdf_sig", "pdf_sig", x, bw, cb)
    coef0 = RooRealVar("c0", "coefficient #0", 1.0, -.01, 0.01)
    coef1 = RooRealVar("c1", "coefficient #1", -0.1, -.01, 0.01)
    coef2 = RooRealVar("c2", "coefficient #2", -0.1, -.01, 0.01)
    bkg1 = RooChebychev("bkg1", "bkg1", x, RooArgList(coef0, coef1, coef2))
    fsig = RooRealVar("fsig", "signal fraction", 0.9, 0., 1.)
    pdf = RooAddPdf("pdf", "pdf", RooArgList(pdf_sig, bkg1), RooArgList(fsig))
    histo = RooDataHist("histo", "histo", x, Import=map_histog)
    x.setRange("signal", fit_lowcut, fit_highcut)

    ROOT.Math.MinimizerOptions.SetDefaultMinimizer("Minuit2")
    ROOT.Math.MinimizerOptions.SetDefaultTolerance(0.0000001)
    ROOT.Math.MinimizerOptions.SetDefaultPrecision(0.0000001)

    pdf.fitTo(histo, SumW2Error=True, Range="signal")

    canv = ROOT.TCanvas("canv", "canv", 800, 600)
    frame1 = x.frame(Bins=NbinsX, Title="Convolution of a Breit-Wigner and a Crystal-Ball, Chebychev pol. bkg")
    histo.plotOn(frame1, Name="Data")
    pdf.plotOn(frame1, Name="pdf", LineColor=ROOT.kRed)
    pdf.paramOn(frame1, Layout=0.60)
    pdf.plotOn(frame1, Components="bkg1", LineStyle=ROOT.kDotted, LineColor=ROOT.kBlue)

    canvas = TCanvas("canvas", "canvas", 800, 600)
    canvas.cd()
    pad1 = TPad("pad1", "pad1name", 0.01, 0.31, 0.99, 0.99)
    pad2 = TPad("pad2", "pad2name", 0.01, 0.01, 0.99, 0.41)
    pad1.Draw()
    pad2.Draw()
    pad1.cd()
    pad1.SetBottomMargin(0.16)
    pad2.SetBottomMargin(0.24)
    frame1.GetYaxis().SetTitleOffset(1.4)
    frame1.GetXaxis().SetTitle("m_ee [GeV], sf_"+str(i_sf))
    frame1.Draw()
    pad1.Modified()
    pad1.RedrawAxis()
    pad1.Update()
    pad2.cd()

    tf1_model = pdf.asTF(x)
    clone_data = histo.createHistogram("clone_data",x,Binning=(NbinsX,fit_lowcut,fit_highcut))
    pdfHisto_data = pdf.generateBinned({x}, 1000000)
    clone_fit_data = pdfHisto_data.createHistogram("clone_fit_data",x,Binning=(NbinsX,fit_lowcut,fit_highcut))
    clone_fit_data.Scale(clone_data.Integral()/clone_fit_data.Integral())

    pdfHisto = pdf_sig.generateBinned({x}, 1000000)
    clone_fit = pdfHisto.createHistogram("clone_fit",x,Binning=(NbinsX,fit_lowcut,fit_highcut))
    clone_fit.Scale(clone_data.Integral()/clone_fit.Integral())
    
    x1 = fit_lowcut
    x2 = fit_highcut
    bin1 = clone_data.FindBin(x1)
    bin2 = clone_data.FindBin(x2)

    for i in range(0,clone_data.GetNbinsX() + 1):
        if i < bin1:
            clone_data.SetBinContent(i, 0.)
        if i > bin2:
            clone_data.SetBinContent(i, 0.)


    clone_data.Divide(clone_fit_data)

    clone_data.GetXaxis().SetTitle("m_ee [GeV], sf_"+str(i_sf))
    clone_data.GetYaxis().SetTitle("DATA / FIT")
    clone_data.GetXaxis().SetRangeUser(fit_lowcut, fit_highcut)
    clone_data.GetYaxis().SetRangeUser(0., 2.)
    clone_data.GetXaxis().SetLabelSize(0.1)
    clone_data.GetYaxis().SetLabelSize(0.08)
    clone_data.GetXaxis().SetTitleSize(0.08)
    clone_data.GetYaxis().SetTitleSize(0.09)
    clone_data.GetYaxis().SetTitleOffset(0.6)
    clone_data.GetXaxis().SetTitleOffset(1.2)
    clone_data.Draw("E1")
    pad2.Modified()
    pad2.SetGridy()
    pad2.RedrawAxis()
    pad2.Update()

    output_folder = path + "FitPlots"
    if not os.path.exists(output_folder):
        os.makedirs(output_folder)

    canvas.SaveAs(path + "FitPlots/sf_"+str(i_sf)+".pdf")
    canvas.SaveAs(path + "FitPlots/sf_"+str(i_sf)+".root")

    modelMean = tf1_model.GetMaximumX()

    parameters.append(modelMean)  # 0                                                                                                                            
    parameters.append(m0.getError())  # 1                                                                                                                        
    parameters.append(m0.getVal())  # 2                                                                                                                          
    parameters.append(sigma.getVal())  # 3  
    parameters.append(sigma.getError())  # 4                                                                                                                     
    parameters.append(mean.getVal())  # 5                                                                                                                        
    parameters.append(mean.getError())  # 6                                                                                                                      
    parameters.append(alpha.getVal())  # 7                                                                                                                       
    parameters.append(alpha.getError())  # 8                                                                                                                     
    parameters.append(n.getVal())  # 9                                                                                                                           
    parameters.append(n.getError())  # 10                                                                                                                        
    parameters.append(width.getVal())  # 11                                                                                                                      
    parameters.append(width.getError())  # 12                                                                                                                    

    #print("width =", width.getVal())

    # Elimina gli oggetti                                                                                                                                        
    del frame1
    del clone_data
    del clone_fit
    del pad1
    del pad2

    return parameters

In [13]:
def bookhisto(df, var, nmaxiteration):
    h_ = {}
    
    for i_sf in range(0,nmaxiteration):
        #i_sf = str(i_sf)
        #h_[i_sf] = {}
        for v in var:           
            h_[v._name+"_"+str(i_sf)]= df.Histo1D(ROOT.RDF.TH1DModel(v._name+"_"+str(i_sf), v._title+"; Events", v._nbins, v._xmin, v._xmax), v._name+"_"+str(i_sf))
            #print(v._name+"_"+str(i_sf))
            #h_[i_sf][v._name].GetValue()
        
    print("Done bookhisto!")
    return h_    


def savehisto(h, var, nmaxiteration, repohisto):

    #for i_sf in range(0,nmaxiteration):
    #    i_sf = str(i_sf)
    #    for v in var:
    #        histo = ROOT.TH1D(v._name+"_"+str(i_sf),v._title+"; Events", v._nbins, v._xmin, v._xmax)
            
    
    label="m_ee_test"
    
    Z_resolution = []
    
    if recreate_files== True:
        outfile = ROOT.TFile.Open(repohisto+label+'.root', "RECREATE")
    else:
        outfile = ROOT.TFile.Open(repohisto+label+'.root', "Update")
    
   
    
    for i_sf in range(0,nmaxiteration):
        #i_sf = str(i_sf)
        #print(i_sf)
        #print(nmaxiteration)
        #h[i_sf] = {}
        for v in var:
            #print(h.keys())
            tmp = h[v._name+"_"+str(i_sf)].GetValue()
            outfile.cd()
            #histo.Write()
            tmp.Sumw2()
            if v._name == "Z_ee":
                Z_resolution.append(myGetFitParameters(m_histo_Mee, m_histo_Mee.GetMean(),width_mass_mc, sigma_mass_mc, folder, i_sf, NbinsX, fit_lowcut, fit_highcut)[3])
                #Z_resolution.append(tmp.GetMean())
    
    outfile.Close()
    

In [14]:
#MAIN                                                                                                                                                            

# set up everything properly
if distributed == True:
    RDataFrame = ROOT.RDF.Experimental.Distributed.Dask.RDataFrame
    ROOT.RDF.Experimental.Distributed.initialize(my_initialization_function)
else:
    RDataFrame = ROOT.RDataFrame
    my_initialization_function()


# Create an RDataFrame that will use Dask as a backend for computations
if distributed ==True:
    connection = create_connection()
    df = RDataFrame("events", file, npartitions=nmaxpartition, 
                            daskclient=connection)
else:
    df = RDataFrame("events", file)


var = my_vars

for v in var:
    print(v._name)

df = df.Define('w_nominal', '1')
df = df.Define("m_e","0.0005124") #GeV                                                                                                                           
df_ge = df.Define("goodelectrons", "Particle.charge[0]*Particle.charge[1] < 0.").Filter("goodelectrons > 0")


# Inizia a misurare il tempo
start_time = time.time()


width_mass_mc = 2.49 #GeV                                                                                                                                        
sigma_mass_mc = 2.6 #GeV                                                                                                                                         


df_Mee = df_ge

for i_sf in range(0,nmaxiteration):

    df_Mee = df_Mee.Define("m_ee_"+str(i_sf), "ComputeInvariantMass(Particle.momentum.x, Particle.momentum.y, Particle.momentum.z, ComputeEnergy(Particle.momentum.x, Partic\
le.momentum.y, Particle.momentum.z,m_e))")

    '''                                                                                                                                                          
    che pesi usare?                                                                                                                                              
    df = df.Define("w_nominal","scaleFactor_ELECTRON * scaleFactor_ElectronTRIGGER * scaleFactor_PILEUP * mcWeight");                                               
    '''

    df_Mee = df_Mee.Define("e1_energy_"+str(i_sf),"ComputeEnergy(Particle.momentum.x, Particle.momentum.y, Particle.momentum.z,m_e)[0]")
    df_Mee = df_Mee.Define("e2_energy_"+str(i_sf),"ComputeEnergy(Particle.momentum.x, Particle.momentum.y, Particle.momentum.z,m_e)[1]")

    
    
tmp=bookhisto(df_Mee, var, nmaxiteration)
savehisto(tmp, var, nmaxiteration, repohisto)


# Termina la misurazione del tempo
end_time = time.time()

# Calcola il tempo trascorso
elapsed_time = end_time - start_time

# Stampa il risultato
print("Tempo impiegato in secondi: ", elapsed_time)    


    

0
end of initialization
e1_energy
e2_energy
m_ee
Done bookhisto!
Tempo impiegato in secondi:  1057.1676924228668


input_line_129:56:7: error: redefinition of 'ComputeInvariantMass'
float ComputeInvariantMass(Vec_t px, Vec_t py, Vec_t pz, Vec_t e) {                                                                                              
      ^
input_line_103:56:7: note: previous definition is here
float ComputeInvariantMass(Vec_t px, Vec_t py, Vec_t pz, Vec_t e) {                                                                                              
      ^
input_line_129:63:7: error: redefinition of 'ComputeEnergy'
RVecF ComputeEnergy(Vec_t px, Vec_t py, Vec_t pz, double m_e) {                                                                                                  
      ^
input_line_103:63:7: note: previous definition is here
RVecF ComputeEnergy(Vec_t px, Vec_t py, Vec_t pz, double m_e) {                                                                                                  
      ^


In [None]:
print('ciao')