-
Notifications
You must be signed in to change notification settings - Fork 103
/
smear_jets.py
231 lines (174 loc) · 8.44 KB
/
smear_jets.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
import os
import urllib.request
import ROOT
from copy import deepcopy
"""
This example runs the jet clustering sequence with Durham N=2 exclusive algorithm and produces jet scores for the various
flavour with variations of the impact parameter resolution and neutral hadrons energy resolutions.
To run this example:
fccanalysis run examples/FCCee/smearing/smear_jets.py \
--files-list /eos/experiment/fcc/ee/generation/DelphesEvents/winter2023/IDEA/wzp6_ee_nunuH_Hss_ecm240/events_196755633.root \
--nevents 100 \
"""
# ____________________________________________________________
def get_file_path(url, filename):
if os.path.exists(filename):
return os.path.abspath(filename)
else:
urllib.request.urlretrieve(url, os.path.basename(url))
return os.path.basename(url)
# ____________________________________________________________
def jet_sequence(df, collections, output_branches, tag=""):
## define jet clustering parameters
njets = 2
jetClusteringHelper = ExclusiveJetClusteringHelper(collections["PFParticles"], njets, tag)
## run jet clustering
df = jetClusteringHelper.define(df)
output_branches += jetClusteringHelper.outputBranches()
jets_p4 = "tlv_jets"
mjj = "mjj"
if tag != "":
jets_p4 = "tlv_jets_{}".format(tag)
mjj = "mjj_{}".format(tag)
df = df.Define(jets_p4, "JetConstituentsUtils::compute_tlv_jets({})".format(jetClusteringHelper.jets))
df = df.Define(mjj, "JetConstituentsUtils::InvariantMass({}[0], {}[1])".format(jets_p4, jets_p4))
output_branches.append(mjj)
## define jet flavour tagging parameters
jetFlavourHelper = JetFlavourHelper(
collections,
jetClusteringHelper.jets,
jetClusteringHelper.constituents,
tag,
)
## define observables for tagger
df = jetFlavourHelper.define(df)
## tagger inference
df = jetFlavourHelper.inference(weaver_preproc, weaver_model, df)
output_branches += jetFlavourHelper.outputBranches()
return df
# ____________________________________________________________
## input file needed for unit test in CI
testFile = "https://fccsw.web.cern.ch/fccsw/testsamples/wzp6_ee_nunuH_Hss_ecm240.root"
## latest particle transformer model, trainied on 9M jets in winter2023 samples
model_name = "fccee_flavtagging_edm4hep_wc_v1"
## model files needed for unit testing in CI
url_model_dir = "https://fccsw.web.cern.ch/fccsw/testsamples/jet_flavour_tagging/winter2023/wc_pt_13_01_2022/"
url_preproc = "{}/{}.json".format(url_model_dir, model_name)
url_model = "{}/{}.onnx".format(url_model_dir, model_name)
## model files locally stored on /eos
model_dir = "/eos/experiment/fcc/ee/jet_flavour_tagging/winter2023/wc_pt_13_01_2022/"
local_preproc = "{}/{}.json".format(model_dir, model_name)
local_model = "{}/{}.onnx".format(model_dir, model_name)
## get local file, else download from url
weaver_preproc = get_file_path(url_preproc, local_preproc)
weaver_model = get_file_path(url_model, local_model)
from addons.ONNXRuntime.jetFlavourHelper import JetFlavourHelper
from addons.FastJet.jetClusteringHelper import ExclusiveJetClusteringHelper
output_branches = []
# Mandatory: RDFanalysis class where the use defines the operations on the TTree
class RDFanalysis:
# __________________________________________________________
# Mandatory: analysers funtion to define the analysers to process, please make sure you return the last dataframe, in this example it is df2
def analysers(df):
## name of collections in EDM root files
collections = {
"GenParticles": "Particle",
"MCRecoMap": "MCRecoAssociations",
"PFParticles": "ReconstructedParticles",
"PFTracks": "EFlowTrack",
"PFPhotons": "EFlowPhoton",
"PFNeutralHadrons": "EFlowNeutralHadron",
"TrackState": "EFlowTrack_1",
"TrackerHits": "TrackerHits",
"CalorimeterHits": "CalorimeterHits",
"dNdx": "EFlowTrack_2",
"PathLength": "EFlowTrack_L",
"Bz": "magFieldBz",
}
## run full sequence with nominal detector
df = jet_sequence(df, collections, output_branches)
## define MC/Reco links, needed for smearing
df = (
df.Alias("mc_reco_0", "{}#0.index".format(collections["MCRecoMap"])).Alias(
"mc_reco_1", "{}#1.index".format(collections["MCRecoMap"])
)
# matching between the RecoParticles and the MCParticles:
.Define(
"reco_mc_index",
"ReconstructedParticle2MC::getRP2MC_index(mc_reco_0,mc_reco_1,{})".format(collections["PFParticles"]),
)
)
## produce smeared collections
### run same sequences but with smeared collection
scale_factors = [1.0, 2.0, 5.0, 10.0]
for sf in scale_factors:
## 1. do Impact parameter smearing first
collections_ip = deepcopy(collections)
ip_tag = "ip{}".format(sf).replace(".", "p")
collections_ip["TrackState"] = "TrackState_{}".format(ip_tag)
# Generate a new set of tracks, re-scaling the covariance matrix
# order of the scaling factors : smear_d0, smear_phi, smear_omega, smear_z0, smear_tlambda
# the boolean flag is for debugging
# here smear only d0, z0
df = df.Define(
collections_ip["TrackState"],
ROOT.SmearObjects.SmearedTracks(sf, 1.0, 1.0, sf, 1.0, False),
[collections["PFParticles"], collections["TrackState"], "reco_mc_index", collections["GenParticles"]],
)
## run full sequence with covariance smeared detector
df = jet_sequence(df, collections_ip, output_branches, ip_tag)
## 2. do Neutral Hadron energy smearing
collections_res = deepcopy(collections)
res_tag = "res{}".format(sf).replace(".", "p")
collections_res["PFParticles"] = "ReconstructedParticles_{}".format(res_tag)
df = df.Define(
collections_res["PFParticles"],
# type: 11 (electrons), 13 (muons), 130 (neutral hadrons), 22 (photon), 0 (charged hadrons), -1 (all)
# mode: 0 energy, 1 momentum
# parameters (scale, type, mode, debug)
# here re-smear only neutral hadrons (130) in energy mode
ROOT.SmearObjects.SmearedReconstructedParticle(sf, 130, 0, False),
[collections["PFParticles"], "reco_mc_index", collections["GenParticles"]],
)
## run full sequence with energy nh smeared
df = jet_sequence(df, collections_res, output_branches, res_tag)
## 3. do dNdx smearing
collections_dndx = deepcopy(collections)
dndx_tag = "dndx{}".format(sf).replace(".", "p")
collections_dndx["dNdx"] = "dNdx_{}".format(dndx_tag)
df = df.Define(
collections_dndx["dNdx"],
ROOT.SmearObjects.SmearedTracksdNdx(sf, False),
[
collections["PFParticles"],
collections["dNdx"],
collections["PathLength"],
"reco_mc_index",
collections["GenParticles"],
],
)
## run full sequence with energy nh smeared
df = jet_sequence(df, collections_dndx, output_branches, dndx_tag)
## 4. do tof smearing
collections_tof = deepcopy(collections)
tof_tag = "tof{}".format(sf).replace(".", "p")
collections_tof["TrackerHits"] = "tof_{}".format(tof_tag)
df = df.Define(
collections_tof["TrackerHits"],
ROOT.SmearObjects.SmearedTracksTOF(sf, False),
[
collections["PFParticles"],
collections["PFTracks"],
collections["TrackerHits"],
collections["PathLength"],
"reco_mc_index",
collections["GenParticles"],
],
)
## run full sequence with energy nh smeared
df = jet_sequence(df, collections_tof, output_branches, tof_tag)
return df
# __________________________________________________________
# Mandatory: output function, please make sure you return the branchlist as a python list
def output():
return output_branches