In [1]:
import ROOT
import dask
import os
import time
# Initialize ROOT
# ROOT.PyConfig.IgnoreCommandLineOptions = True

ROOT.RDF.Experimental.Distributed.open_files_locally = False
from dask.distributed import LocalCluster, Client
from distributed.diagnostics.plugin import UploadFile

Welcome to JupyROOT 6.27/01


In [2]:
class variable(object):
    def __init__(self, name, title, nbins=None, xmin=None, xmax=None):
        self._name = name
        self._title = title
        self._nbins = nbins
        self._xmin = xmin
        self._xmax = xmax
    def __str__(self):
        return  '\"'+str(self._name)+'\",\"'+str(self._title)+'\",\"'+str(self._nbins)+','+str(self._xmin)+','+str(self._xmax)

my_vars = []

my_vars.append(variable(name = "e1_energy", title= "leading electron energy [GeV]", nbins = 50, xmin = 0, xmax=100))
my_vars.append(variable(name = "e2_energy", title= "sub leading electron energy [GeV]", nbins = 50, xmin = 0, xmax=100))
my_vars.append(variable(name = "m_ee", title= "Zee invariant mass, m_{ee} [GeV]", nbins = 50, xmin = 84, xmax=98))

In [3]:
def create_connection():
    """
    Setup connection to a Dask cluster. Two ingredients are needed:
    1. Creating a cluster object that represents computing resources. This can be
       done in various ways depending on the type of resources at disposal. To use
       only the local machine (e.g. your laptop), a `LocalCluster` object can be
       used. This step can be skipped if you have access to an existing Dask
       cluster; in that case, the cluster administrator should provide you with a
       URL to connect to the cluster in step 2. More options for cluster creation
       can be found in the Dask docs at
       http://distributed.dask.org/en/stable/api.html#cluster .
    2. Creating a Dask client object that connects to the cluster. This accepts
       directly the object previously created. In case the cluster was setup
       externally, you need to provide an endpoint URL to the client, e.g.
       'https://myscheduler.domain:8786'.
 
    Through Dask, you can connect to various types of cluster resources. For
    example, you can connect together a set of machines through SSH and use them
    to run your computations. This is done through the `SSHCluster` class. For
    example:
 
    ```python
    from dask.distributed import SSHCluster
    cluster = SSHCluster(
        # A list with machine host names, the first name will be used as
        # scheduler, following names will become workers.
        hosts=["machine1","machine2","machine3"],
        # A dictionary of options for each worker node, here we set the number
        # of cores to be used on each node.
        worker_options={"nprocs":4,},
    )
    ```
 
    Another common usecase is interfacing Dask to a batch system like HTCondor or
    Slurm. A separate package called dask-jobqueue (https://jobqueue.dask.org)
    extends the available Dask cluster classes to enable running Dask computations
    as batch jobs. In this case, the cluster object usually receives the parameters
    that would be written in the job description file. For example:
 
    ```python
    from dask_jobqueue import HTCondorCluster
    cluster = HTCondorCluster(
        cores=1,
        memory='2000MB',
        disk='1000MB',
    )
    # Use the scale method to send as many jobs as needed
    cluster.scale(4)
    ```
 
    In this tutorial, a cluster object is created for the local machine, using
    multiprocessing (processes=True) on 4 workers (n_workers=4) each using only
    1 core (threads_per_worker=1).
    """
    cluster = LocalCluster(n_workers=2, threads_per_worker=1, processes=True)
    client = Client(cluster)
    return client

In [4]:
#global variables                                                                                                                                                
fit_lowcut = 84.
fit_highcut = 98.
NbinsX = 50

nmaxiteration = 200
recreate_files= True


sched_port = 25176#Dask port
nmaxpartition = 10 # to set at lower value
distributed = True#False#
    
folder = "./output/mytest_Zee/"
if not os.path.exists(folder):
    os.mkdir(folder)
repohisto = folder+"plots/"
if not os.path.exists(repohisto):
    os.mkdir(repohisto)    
    


In [5]:
text_file = open("functions.h", "r")
data = text_file.read()
def my_initialization_function():
    print(ROOT.gInterpreter.ProcessLine(".O"))
    ROOT.gInterpreter.Declare('{}'.format(data))
    print("end of initialization")
#     

In [6]:
def bookhisto(df, var, nmaxiteration):
    h_ = {}
    
    for i_sf in range(0,nmaxiteration):
        for v in var:           
            h_[v._name+"_"+str(i_sf)]= df.Histo1D(ROOT.RDF.TH1DModel(v._name+"_"+str(i_sf), v._title+"; Events", v._nbins, v._xmin, v._xmax), v._name+"_"+str(i_sf))# 
        
    print("Done bookhisto!")
    return h_    


def savehisto(h, var, nmaxiteration, repohisto):
    label="m_ee_test"
    
    Z_resolution = []
    
    if recreate_files== True:
        outfile = ROOT.TFile.Open(repohisto+label+'.root', "RECREATE")
    else:
        outfile = ROOT.TFile.Open(repohisto+label+'.root', "Update")
    for i_sf in range(0,nmaxiteration):
        for v in var:
            # print(h.keys())
            tmp = h[v._name+"_"+str(i_sf)].GetValue()
            outfile.cd()
            tmp.Write()
            tmp.Sumw2()
            if v._name == "Z_ee":
                Z_resolution.append(tmp.GetMean())
    
    outfile.Close()

In [7]:
# set up everything properly
file_str = "input_times_100.root"
file = "/opt/workspace/persistent-storage/INFN_na_interactive_analysis/"+file_str

#ROOT.ROOT.EnableImplicitMT()

if distributed == True:
    from dask.distributed import Client
    client = Client("localhost:"+str(sched_port))
    client.restart()
    client.register_worker_plugin(UploadFile(file))
    def set_proxy(dask_worker):
        import shutil
        working_dir = dask_worker.local_directory
        shutil.copyfile(working_dir + '/'+file_str, working_dir + '/../../../'+file_str) 
        return working_dir
    client.run(set_proxy)
    # client.run(set_proxy)
    # ROOT.ROOT.EnableImplicitMT()
    RDataFrame = ROOT.RDF.Experimental.Distributed.Dask.RDataFrame
    ROOT.RDF.Experimental.Distributed.initialize(my_initialization_function)
else:
    RDataFrame = ROOT.RDataFrame
    my_initialization_function()


+---------+--------+-----------+---------+
| Package | client | scheduler | workers |
+---------+--------+-----------+---------+
| msgpack | 1.0.3  | 1.0.2     | 1.0.3   |
| toolz   | 0.11.2 | 0.11.1    | 0.11.2  |
+---------+--------+-----------+---------+
Notes: 
-  msgpack: Variation is ok, as long as everything is above 0.6


0
end of initialization


In [8]:
# Create an RDataFrame that will use Dask as a backend for computations
if distributed ==True:
    #connection = create_connection()
    df = RDataFrame("events", file_str, npartitions=nmaxpartition, 
                            daskclient=client, monitor_label = "main")
else:
    df = RDataFrame("events", file)

In [9]:
var = my_vars

for v in var:
    print(v._name)

df = df.Define('w_nominal', '1')
df = df.Define("m_e","0.0005124") #GeV                                                                                                                           
df_ge = df.Define("goodelectrons", "Particle.charge[0]*Particle.charge[1] < 0.").Filter("goodelectrons > 0")

# Inizia a misurare il tempo
start_time = time.time()


width_mass_mc = 2.49 #GeV                                                                                                                                        
sigma_mass_mc = 2.6 #GeV                                                                                                                                         



df_Mee = df_ge
for i_sf in range(0,nmaxiteration):
    # print("adding columns with i_sf=", i_sf)
    df_Mee = df_Mee.Define("m_ee_"+str(i_sf), "ComputeInvariantMass(Particle.momentum.x, Particle.momentum.y, Particle.momentum.z, ComputeEnergy(Particle.momentum.x, Particle.momentum.y, Particle.momentum.z,m_e))")

    '''                                                                                                                                                          
    che pesi usare?                                                                                                                                              
    df = df.Define("w_nominal","scaleFactor_ELECTRON * scaleFactor_ElectronTRIGGER * scaleFactor_PILEUP * mcWeight");                                               
    '''
    # print(my_vars[0]._name+"_"+str(i_sf))
    df_Mee = df_Mee.Define(my_vars[0]._name+"_"+str(i_sf),"ComputeEnergy(Particle.momentum.x, Particle.momentum.y, Particle.momentum.z,m_e)[0]")
    df_Mee = df_Mee.Define(my_vars[1]._name+"_"+str(i_sf),"ComputeEnergy(Particle.momentum.x, Particle.momentum.y, Particle.momentum.z,m_e)[1]")

tmp=bookhisto(df_Mee, var, nmaxiteration)
savehisto(tmp, var, nmaxiteration, repohisto)

# Termina la misurazione del tempo
end_time = time.time()

# Calcola il tempo trascorso
elapsed_time = end_time - start_time

# Stampa il risultato
print("Tempo impiegato in secondi: ", elapsed_time)    

e1_energy
e2_energy
m_ee
Done bookhisto!
Tempo impiegato in secondi:  159.25995349884033


