-
Notifications
You must be signed in to change notification settings - Fork 0
/
__init__.py
315 lines (264 loc) · 10.4 KB
/
__init__.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
"""Madgraph interface."""
import json
import re
import subprocess
import numpy as np
import pandas as pd
import pineappl
from ... import configs, install, log, tools
from .. import interface
from . import paths
URL = "https://launchpad.net/mg5amcnlo/{major}.0/{major}.{minor}.x/+download/MG5_aMC_v{version}.tar.gz"
"URL template for MG5aMC\\@NLO release"
VERSION = "3.4.2"
"Version in use"
CONVERT_MODEL = """
set auto_convert_model True
import model loop_qcd_qed_sm_Gmu
quit
"""
"Instructions to set the correct model for MG5aMC\\@NLO."
def url():
"""Compute actual download URL."""
major, minor, _ = VERSION.split(".")
return URL.format(version=VERSION, major=major, minor=minor)
class Mg5(interface.External):
"""Interface provider."""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.user_cuts = []
self.patches = []
self.tau_min = None
@property
def mg5_dir(self):
"""Return output dir."""
return self.dest / self.name
@staticmethod
def install():
"""Execute installer."""
install.pineappl()
install.mg5amc()
@property
def pdf_id(self):
"""Convert PDF to SetIndex."""
import lhapdf # pylint: disable=import-error
return lhapdf.mkPDF(self.pdf).info().get_entry("SetIndex")
def run(self):
"""Execute program."""
# copy the output file to the directory and replace the variables
output = (self.source / "output.txt").read_text().replace("@OUTPUT@", self.name)
output_file = self.dest / "output.txt"
output_file.write_text(output)
# create output folder
log.subprocess(
[str(configs.configs["commands"]["mg5"]), str(output_file)],
cwd=self.dest,
out=(self.dest / "output.log"),
)
# copy patches if there are any; use xargs to properly signal failures
for p in self.source.iterdir():
if p.suffix == ".patch":
subprocess.run(
"patch -p1".split(),
input=p.read_text(),
text=True,
cwd=self.mg5_dir,
)
# enforce proper analysis
# - copy analysis.f
analysis = (self.source / "analysis.f").read_text()
(self.mg5_dir / "FixedOrderAnalysis" / f"{self.name}.f").write_text(analysis)
# - update analysis card
analysis_card = self.mg5_dir / "Cards" / "FO_analyse_card.dat"
analysis_card.write_text(
analysis_card.read_text().replace("analysis_HwU_template", self.name)
)
# copy the launch file to the directory and replace the variables
launch = (self.source / "launch.txt").read_text().replace("@OUTPUT@", self.name)
# TODO: write a list with variables that should be replaced in the
# launch file; for the time being we create the file here, but in the
# future it should be read from the theory database EDIT: now available
# in self.theory
variables = json.loads((paths.subpkg.parents[1] / "variables.json").read_text())
variables["LHAPDF_ID"] = self.pdf_id
# replace the variables with their values
for name, value in variables.items():
launch = launch.replace(f"@{name}@", str(value))
# finally write launch
launch_file = self.dest / "launch.txt"
launch_file.write_text(launch)
# parse launch file for user-defined cuts
user_cuts_pattern = re.compile(
r"^#user_defined_cut set (\w+)\s+=\s+([+-]?\d+(?:\.\d+)?|True|False)$"
)
for line in launch.splitlines():
m = re.fullmatch(user_cuts_pattern, line)
if m is not None:
self.user_cuts.append((m[1], m[2]))
# if there are user-defined cuts, implement them
apply_user_cuts(self.mg5_dir / "SubProcesses" / "cuts.f", self.user_cuts)
# parse launch file for user-defined minimum tau
user_taumin_pattern = re.compile(r"^#user_defined_tau_min (.*)")
user_taumin = None
for line in launch.splitlines():
m = re.fullmatch(user_taumin_pattern, line)
if m is not None:
try:
user_taumin = float(m[1])
except ValueError:
raise ValueError("User defined tau_min is expected to be a number")
if user_taumin is not None:
set_tau_min_patch = (
(paths.patches / "set_tau_min.patch")
.read_text()
.replace("@TAU_MIN@", f"{user_taumin}d0")
)
(self.dest / "set_tau_min.patch").write_text(set_tau_min_patch)
self.tau_min = user_taumin
tools.patch(set_tau_min_patch, self.mg5_dir)
# parse launch file for other patches
enable_patches_pattern = re.compile(r"^#enable_patch (.*)")
enable_patches_list = []
for line in launch.splitlines():
m = re.fullmatch(enable_patches_pattern, line)
if m is not None:
enable_patches_list.append(m[1])
if len(enable_patches_list) != 0:
for patch in enable_patches_list:
patch_file = paths.patches / patch
patch_file = patch_file.with_suffix(patch_file.suffix + ".patch")
if not patch_file.exists():
raise ValueError(
f"Patch '{patch}' requested, but does not exist in patches folder"
)
self.patches.append(patch)
tools.patch(patch_file.read_text(), self.mg5_dir)
# launch run
log.subprocess(
[str(configs.configs["commands"]["mg5"]), str(launch_file)],
cwd=self.dest,
out=self.dest / "launch.log",
)
def generate_pineappl(self):
"""Generate grid."""
# if rerunning without regenerating, let's remove the already merged
# grid (it will be soon reobtained)
if self.timestamp is not None:
self.grid.unlink()
# merge the final bins
mg5_grids = sorted(
str(p.absolute())
for p in self.mg5_dir.glob("Events/run_01*/amcblast_obs_*.pineappl")
)
# read the first one from file
grid = pineappl.grid.Grid.read(mg5_grids[0])
# subsequently merge all the others (disk -> memory)
for path in mg5_grids[1:]:
grid.merge_from_file(path)
# optimize the grids
grid.optimize()
# add results to metadata
runcard = next(
iter(self.mg5_dir.glob("Events/run_01*/run_01*_tag_1_banner.txt"))
)
grid.set_key_value("runcard", runcard.read_text())
# add generated cards to metadata
grid.set_key_value("output.txt", (self.dest / "output.txt").read_text())
grid.set_key_value("launch.txt", (self.dest / "launch.txt").read_text())
# add patches and cuts used to metadata
grid.set_key_value("patches", "\n".join(self.patches))
grid.set_key_value(
"tau_min", str(self.tau_min) if self.tau_min is not None else ""
)
grid.set_key_value(
"user_cuts", "\n".join(f"{var}={value}" for var, value in self.user_cuts)
)
grid.write(str(self.grid))
def results(self):
"""Collect PDF results."""
madatnlo = next(
iter(self.mg5_dir.glob("Events/run_01*/MADatNLO.HwU"))
).read_text()
table = filter(
lambda line: re.match("^ [+-]", line) is not None, madatnlo.splitlines()
)
df = pd.DataFrame(
np.array([[float(x) for x in line.split()] for line in table])
)
# start column from 1
df.columns += 1
df["result"] = df[3]
df["error"] = df[4]
df["sv_min"] = df[6]
df["sv_max"] = df[7]
return df
def collect_versions(self):
"""Add versions."""
versions = {}
versions["mg5amc_revno"] = (
subprocess.run(
"brz revno".split(),
cwd=configs.configs["paths"]["mg5amc"],
stdout=subprocess.PIPE,
)
.stdout.decode()
.strip()
)
mg5amc_repo = (
subprocess.run(
"brz info".split(),
cwd=configs.configs["paths"]["mg5amc"],
stdout=subprocess.PIPE,
)
.stdout.decode()
.strip()
)
repo = re.search(r"\s*parent branch:\s*(.*)", mg5amc_repo)
if repo is None:
print("Invalid mg5 repository")
versions["mg5amc_repo"] = repo[1] if repo is not None else None
return versions
def find_marker_position(insertion_marker, contents):
"""Find in file."""
marker_pos = -1
for lineno, value in enumerate(contents):
if insertion_marker in value:
marker_pos = lineno
break
if marker_pos == -1:
raise ValueError(
"Error: could not find insertion marker `{insertion_marker}` in cut file `{file_path}`"
)
return marker_pos
def apply_user_cuts(cuts_file, user_cuts):
"""Apply a user defined cut, patching a suitable cuts file."""
with open(cuts_file) as fd:
contents = fd.readlines()
# insert variable declaration
marker_pos = find_marker_position("logical function passcuts_user", contents)
marker_pos = marker_pos + 8
for fname in paths.cuts_variables.iterdir():
name = fname.stem
if any(i[0].startswith(name) for i in user_cuts):
contents.insert(marker_pos, fname.read_text())
marker_pos = find_marker_position("USER-DEFINED CUTS", contents)
# skip some lines with comments
marker_pos = marker_pos + 4
# insert and empty line
contents.insert(marker_pos - 1, "\n")
for name, value in reversed(user_cuts):
# map to fortran syntax
if value == "True":
value = ".true."
elif value == "False":
value = ".false."
else:
try:
float(value)
except ValueError:
raise ValueError(f"Error: format of value `{value}` not understood")
value = value + "d0"
code = (paths.cuts_code / f"{name}.f").read_text().format(value)
contents.insert(marker_pos, code)
with open(cuts_file, "w") as fd:
fd.writelines(contents)