Flavor physics analysis looking for decays of D or B mesons. In this example we run the reconstruction of Ds->phi(mumu)pi decays, looking for two collimated muons and a charged track originating from a common vertex. The analysis is performed on ntuples produced running on CMS data in the MINIAOD format.

## Basic imports

In [None]:
import sys, os, time
start = time.time()
import json
import ROOT
from ROOT import * 

## Dask scheduler

In [None]:
from dask.distributed import Client

Now start new Dask cluster, scale the number of workers

In [None]:
# fill this cell with "<>" button of Dask labextension

In [None]:
# alternative to previous cell
sched_port = 20974 ## Change this from the DASK cluster information panel
client = Client("localhost:" + str(sched_port))
client

In [None]:
#client.restart() #Execute this only to restart the workers (to relaunch the notebook, for example)

## Declare custom C++ functions 

In [None]:
text_file = open("Utilities.h", "r")
data = text_file.read()


def my_initialization_function():
    ROOT.gInterpreter.Declare('{}'.format(data))

ROOT.RDF.Experimental.Distributed.initialize(my_initialization_function)

## X509 proxy configuration
The `/tmp/x509up_u` file should be generated prior running the notebook using `voms-proxy-init `

In [None]:
from distributed.diagnostics.plugin import UploadFile
client.register_worker_plugin(UploadFile("/tmp/x509up_u0"))

In [None]:
def set_proxy(dask_worker):
    import os
    import shutil
    working_dir = dask_worker.local_directory
    proxy_name = 'x509up_u0'
    os.environ['X509_USER_PROXY'] = working_dir + '/' + proxy_name
    os.environ['X509_CERT_DIR']="/cvmfs/grid.cern.ch/etc/grid-security/certificates/"
    return os.environ.get("X509_USER_PROXY"), os.environ.get("X509_CERT_DIR") 

In [None]:
client.run(set_proxy)

In [None]:
# Loading PROXY locally
os.environ['X509_USER_PROXY'] = "/tmp/x509up_u0"
os.environ['X509_CERT_DIR'] = "/cvmfs/grid.cern.ch/etc/grid-security/certificates/"

In [None]:
def clear_nodes(dask_worker):
    import os
    os.popen('rm ./*.root')
    return True

In [None]:
client.run(clear_nodes)

Configuration files or additional input files (i.e. scale factors, PU reweighting) should be pushed to the worker

In [None]:
#myfile = localpath/tomyfile
#client.register_worker_plugin(UploadFile(config[myfile]))

## Define chain of rootfiles to analyze
ntuples corresponding to different datasets and eras are defined in a configuration json file

In [None]:
configjson = "/opt/workspace/persistent-storage/BPH_usecase/input.json"

with open(configjson, "r") as f:
    config = json.loads(f.read())

Set here which dataset to use

In [None]:
dataset = "2018A"

path = "xroot://xr-4-4-9-3.recas.ba.infn.it:8080/"+config[dataset]["rootpath"]
print(path)
treename = config[dataset]["treename"]

In [None]:
#generating the list of all .root files in given directory and subdirectories
chain = []
for r, d, f in os.walk(path): # r=root, d=directories, f = files
    for file in f:
        if '.root' in file:
            chain.append(os.path.join(r, file))

print(chain)

In [None]:
#df = ROOT.RDataFrame(treename, "/opt/workspace/persistent-storage/BPH_usecase/Tree_PhiPi_1-1.root") #not distributed

numWorkers= len(client.scheduler_info()['workers'])
npartitions = 3 * numWorkers

print("Number of workers is: {}".format(numWorkers))
print("Number of total partitions is: {}".format(npartitions))

#df = ROOT.RDF.Experimental.Distributed.Dask.RDataFrame(treename, chain, npartitions=npartitions, daskclient=client)   
df = ROOT.RDF.Experimental.Distributed.Dask.RDataFrame(treename, "/opt/workspace/persistent-storage/BPH_usecase/Tree_PhiPi_1-1.root", npartitions=npartitions, daskclient=client)   

# DsPhiPi Analysis cutflow

In [None]:
# 1 -> Event fires L1 and HLT

df = df.Define("HLT_mask", "Trigger_hltdecision==1 && contains(Trigger_hltname,\"HLT_DoubleMu3_Trk_Tau3mu_v\")").Filter("ROOT::VecOps::Sum(HLT_mask) >0")
df = df.Define("L1_mask", "Trigger_l1decision==1 && ( contains(Trigger_l1name,\"L1_DoubleMu\") || contains(Trigger_l1name,\"L1_TripleMu\"))").Filter("ROOT::VecOps::Sum(L1_mask) >0")

# Selections on triplets
# 2 -> 2mu+track candidate mass in (1.62-2.02)GeV
# 3 -> at least 2 track associated with PV
# 4 -> Significance of BS-SV distance in the transverse plane > 2
triplet_selection = "Triplet2_Mass>1.62 && Triplet2_Mass<2.02 && \
                     RefittedPV2_NTracks > 1 && \
                     FlightDistBS_SV_Significance > 2 "

# Events with at least one good candidate
df = df.Define("triplet_mask1", triplet_selection).Filter("ROOT::VecOps::Sum(triplet_mask1) >0")

# 5 -> Muons and track within CMS acceptance
acceptance_selection = "((abs(Mu01_Eta)<1.2 && Mu01_Pt>3.5) || (abs(Mu01_Eta)>=1.2 && abs(Mu01_Eta)<2.4 && Mu01_Pt>2.0)) &&\
                        ((abs(Mu02_Eta)<1.2 && Mu02_Pt>3.5) || (abs(Mu02_Eta)>=1.2 && abs(Mu02_Eta)<2.4 && Mu02_Pt>2.0)) &&\
                        Tr_Pt>1.2"
# Events with at least one good candidate
df = df.Define("triplet_mask2", acceptance_selection).Filter("ROOT::VecOps::Sum(triplet_mask2)>0")

# Compute dR and dZ between muons/tracks
df = df.Define("dR12", "deltaR_vec(Mu01_Eta, Mu02_Eta, Mu01_Phi, Mu02_Phi)")
df = df.Define("dR13", "deltaR_vec(Mu01_Eta, Tr_Eta, Mu01_Phi, Tr_Phi)")
df = df.Define("dR23", "deltaR_vec(Mu02_Eta, Tr_Eta, Mu02_Phi, Tr_Phi)")

# 6 -> min and max deltaR requirement
dR_selection = "dR12>DELTAR_MIN && dR13>DELTAR_MIN && dR23>DELTAR_MIN &&\
                dR12<DELTAR_MAX && dR13<DELTAR_MAX && dR23<DELTAR_MAX"
df = df.Define("triplet_mask3", dR_selection).Filter("ROOT::VecOps::Sum(triplet_mask3)>0")

In [None]:
# Find index in "Muon_" and "Track_" branches
df = df.Define("Mu01_index", "match(MuonPt, Mu01_Pt)")
df = df.Define("Mu02_index", "match(MuonPt, Mu02_Pt)")
df = df.Define("Tr_index",   "match(MuonPt, Tr_Pt)")

# 7 -> Apply Muon ID Global and Particle Flow
df = df.Define("Mu01_ID", "muon_id(Mu01_index, Muon_isGlobal && Muon_isPF)")
df = df.Define("Mu02_ID", "muon_id(Mu02_index, Muon_isGlobal && Muon_isPF)")

# 8 -> IP(track, BS) z direction < 20 cm and xy direction < 0.3 cm
df = df.Define("Tr_IPcut", "muon_id(Tr_index, (Track_dz<20 && Track_dxy<0.3) )")
df = df.Define("triplet_mask4", "Mu01_ID && Mu02_ID && Tr_IPcut").Filter("ROOT::VecOps::Sum(triplet_mask4)>0")

# 9 -> dimuon mass compatible with phi(1020) RefTrack1_Eta
df = df.Define("Dimu_mass", "dimu_mass(RefTrack1_Pt, RefTrack1_Eta, RefTrack1_Phi, RefTrack2_Pt, RefTrack2_Eta, RefTrack2_Phi)")
df = df.Define("triplet_mask5", "Dimu_mass>1.0 && Dimu_mass<1.04").Filter("ROOT::VecOps::Sum(triplet_mask5)>0")

# 10 -> Trigger Matching (to-do)

# Keep best candidate based on vertex chi2
df = df.Define("BestTriplet_index", "bestcandidate(TripletVtx2_Chi2)")
df = df.Define("BestTriplet_mass", "flattening(Triplet2_Mass, BestTriplet_index)")

In [None]:
# Create a histogram from `x` and draw it
h = df.Histo1D(("h_mass", "h_mass", 80, 1.65, 2.05), "BestTriplet_mass")
c =  ROOT.TCanvas()
h.Draw("hist")
c.Draw()
# Save output for further processing
df_out = df.Snapshot("ntuple", "out.root", ["BestTriplet_mass"]);

In [None]:
#fitting invariant mass :)
from ROOT import RooRealVar

x = ROOT.RooRealVar("BestTriplet_mass", "2mu+1trk inv. mass (GeV)", 1.65, 2.05)
x.setBins(80)

# We first declare the RooDataHistHelper
rdhMaker = ROOT.RooDataHistHelper("dataset", "Title of dataset", ROOT.RooArgSet(x))

# Then, we move it into an RDataFrame action:
data_result = df_out.Book(ROOT.std.move(rdhMaker), ("BestTriplet_mass"))
data = roo_data_hist_result.GetValue()
entries = data.numEntries()

#data = RooDataHist("data", "h_mass", RooArgSet(x), RooFit.Import(h, ROOT.kFALSE))

x.setRange("R1", 1.70, 1.80)
x.setRange("R2", 1.89, 1.925)
x.setRange("R3", 1.99, 2.02)

meanCB = RooRealVar("mean", "meanCB", 1.97, 1.94, 2.1)
sigmaCB1 = RooRealVar("#sigma_{CB}", "sigmaCB1", 0.02, 0.001, 0.1)
alpha1 = RooRealVar("#alpha1", "alpha1", 1.0, 0.5, 10.0)
nSigma1 = RooRealVar("n1", "n1", 1.0, 0.1, 25.0)
sig_right = RooCBShape("sig_right", "sig_right", x, meanCB, sigmaCB1, alpha1, nSigma1)

meanCB2 = RooRealVar("mean2", "meanCB2", 1.87, 1.82, 1.89)
sigmaCB2 = RooRealVar("#sigma2_{CB}", "sigmaCB2", 0.05, 0.001, 0.05)
alpha2 = RooRealVar("#alpha2", "alpha2", 1.0, 0.5, 10.0)
nSigma2 = RooRealVar("n2", "n2", 1.0, 0.1, 25.0)
sig_left = RooCBShape("sig_left", "sig_left", x, meanCB2, sigmaCB2, alpha2, nSigma2)

gamma = RooRealVar("#Gamma", "Gamma", -1, -2.0, -1e-2)
exp_bkg = RooExponential("exp_bkg", "exp_bkg", x, gamma)
exp_bkg.fitTo(data, RooFit.Range("R1,R2,R3"))

nSig_right = RooRealVar("nSig_R", "Number of signal candidates", entries*0.05, 1.0, 1e+6)
nSig_left = RooRealVar("nSig_L", "Number of signal 2 candidates", entries*0.02, 1.0, 1e+6)
nBkg = RooRealVar("nBkg", "Bkg component", entries*0.8, 1.0, 1e+6)

totalPDF = RooAddPdf("totalPDF", "totalPDF", RooArgList(sig_right, sig_left, exp_bkg), RooArgList(nSig_right, nSig_left, nBkg))

r = totalPDF.fitTo(data, RooFit.Extended(ROOT.kTRUE), RooFit.Save(ROOT.kTRUE))

xframe = x.frame()
xframe.SetTitle("")
xframe.SetXTitle("2mu +1trk inv. mass (GeV)")
#totalPDF.paramOn(xframe, RooFit.Parameters(RooArgSet(meanCB, meanCB2, sigmaCB1, sigmaCB2, gamma, nSig_right, nSig_left, nBkg)), RooFit.Layout(0.2, 0.2, 0.6))
data.plotOn(xframe)
