/
run_selection.py
555 lines (459 loc) · 19.6 KB
/
run_selection.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
"""Context methods dealing with run scanning and selection."""
import fnmatch
import re
import typing as ty
import warnings
from collections import defaultdict
import numpy as np
import pandas as pd
import pytz
import datetime
import strax
# use tqdm as loaded in utils (from tqdm.notebook when in a juypyter env)
tqdm = strax.utils.tqdm
export, __all__ = strax.exporter()
@strax.Context.add_method
def list_available(self, target, runs=None, **kwargs) -> list:
"""Return sorted list of run_id's for which target is available.
:param target: Data type to check
:param runs: Runs to check. If None, check all runs.
"""
if len(kwargs):
# noinspection PyMethodFirstArgAssignment
self = self.new_context(**kwargs)
if self.runs is None:
self.scan_runs()
if runs is None:
runs = self.runs["name"].values # type: ignore
else:
runs = strax.to_str_tuple(runs)
keys = set(self.keys_for_runs(target, runs))
found: ty.Set[strax.DataKey] = set()
for sf in self.storage:
remaining = keys - found
is_found = sf.find_several(list(remaining), **self._find_options)
found |= set([k for i, k in enumerate(remaining) if is_found[i]])
return list(sorted([x.run_id for x in found]))
@strax.Context.add_method
def keys_for_runs(
self, target: str, run_ids: ty.Union[np.ndarray, list, tuple, str]
) -> ty.List[strax.DataKey]:
"""Get the data-keys for a multitude of runs. If use_per_run_defaults is False which it
preferably is (#246), getting many keys should be fast as we only only compute the lineage once.
:param run_ids: Runs to get datakeys for
:param target: datatype requested
:return: list of datakeys of the target for the given runs.
"""
run_ids = strax.to_str_tuple(run_ids)
if self.context_config["use_per_run_defaults"]:
return [self.key_for(r, target) for r in run_ids]
elif len(run_ids):
# Get the lineage once, for the context specifies that the
# defaults may not change!
p = self._get_plugins((target,), run_ids[0])[target]
return [strax.DataKey(r, target, p.lineage) for r in run_ids]
else:
return []
@strax.Context.add_method
def scan_runs(
self: strax.Context,
check_available=tuple(),
if_check_available="raise",
store_fields=tuple(),
) -> pd.DataFrame:
"""Update and return self.runs with runs currently available in all storage frontends.
:param check_available: Check whether these data types are available Availability of xxx is
stored as a boolean in the xxx_available column.
:param if_check_available: 'raise' (default) or 'skip', whether to do the check
:param store_fields: Additional fields from run doc to include as rows in the dataframe. The
context options scan_availability and store_run_fields list data types and run fields,
respectively, that will always be scanned.
"""
store_fields = tuple(
set(
list(strax.to_str_tuple(store_fields))
+ ["name", "number", "tags", "mode", strax.RUN_DEFAULTS_KEY]
+ list(self.context_config["store_run_fields"])
)
)
if if_check_available == "raise":
check_available = tuple(
set(
list(strax.to_str_tuple(check_available))
+ list(self.context_config["check_available"])
)
)
elif if_check_available == "skip":
check_available = tuple()
else:
raise ValueError(f"Invalid value for if_check_available: {if_check_available}")
for target in check_available:
save_when = self.get_save_when(target)
if save_when < strax.SaveWhen.ALWAYS:
p = self._plugin_class_registry[target]
self.log.warning(
f"{p.__name__}-plugin is {str(save_when)}. "
f"Therefore {target} is most likely not stored!"
)
docs = None
for sf in self.storage:
_temp_docs = []
for doc in sf._scan_runs(store_fields=store_fields):
# If there is no number, make one from the name
_is_superrun = doc.get("name", "not_a_superrun_if_no_name").startswith("_")
if "number" not in doc:
if "name" not in doc:
raise ValueError(f"Invalid run doc {doc}, contains neither name nor number.")
if not _is_superrun:
# If there is no name, make one from the number
doc["number"] = int(doc["name"])
if not _is_superrun:
doc.setdefault("name", f"{doc['number']:06d}")
# Convert tags/mode/source list to a,separated string if needed (mode and source can
# be lists for superruns)
doc.setdefault("mode", "")
if isinstance(doc["mode"], list):
doc["mode"] = ",".join(doc["mode"])
doc.setdefault("source", "")
if isinstance(doc["source"], list):
doc["source"] = ",".join(doc["source"])
doc["tags"] = ",".join(
[t["name"] if isinstance(t, dict) else t for t in doc.get("tags", [])]
)
# Set a default livetime if we have start and stop
if (
"start" in store_fields
and "end" in store_fields
and "livetime" in store_fields
and doc.get("start") is not None
and doc.get("end") is not None
):
doc.setdefault("livetime", doc["end"] - doc["start"])
if _is_superrun:
# In contrast to regular run-docs,
# superruns are timezone aware. So strip off timezone
# again:
start = doc.get("start")
if start:
doc["start"] = start.replace(tzinfo=None)
start = doc.get("end")
if start:
doc["end"] = start.replace(tzinfo=None)
if type(doc.get("livetime")) == float:
# If we have a superrun livetime is stored as intger seconds
# as timedelta is not json serializable
doc["livetime"] = datetime.timedelta(seconds=doc["livetime"])
# Put the strax defaults stuff into a different cache
if strax.RUN_DEFAULTS_KEY in doc:
self._run_defaults_cache[doc["name"]] = doc[strax.RUN_DEFAULTS_KEY]
del doc[strax.RUN_DEFAULTS_KEY]
doc = flatten_run_metadata(doc)
_temp_docs.append(doc)
if len(_temp_docs):
new_docs = pd.DataFrame(_temp_docs)
else:
new_docs = pd.DataFrame([], columns=store_fields)
if docs is None:
docs = new_docs
else:
# Keep only new runs (not found by earlier frontends)
docs = pd.concat([docs, new_docs[~np.in1d(new_docs["name"], docs["name"])]], sort=False)
docs.reset_index(drop=True, inplace=True)
# Rearrange columns
if not self.context_config["use_per_run_defaults"] and strax.RUN_DEFAULTS_KEY in docs.columns:
del docs[strax.RUN_DEFAULTS_KEY]
docs = docs[["name"] + [x for x in docs.columns.tolist() if x != "name"]] # type: ignore
self.runs = docs
# Add available data types,
# this is kept for the case users directly call list_available
for d in tqdm(
check_available,
desc="Checking data availability scan_runs",
disable=not len(check_available),
):
self.runs[d + "_available"] = np.in1d(self.runs.name.values, self.list_available(d))
return self.runs
@strax.Context.add_method
def select_runs(
self,
run_mode=None,
run_id=None,
include_tags=None,
exclude_tags=None,
available=tuple(),
pattern_type="fnmatch",
ignore_underscore=True,
force_reload=False,
):
"""Return pandas.DataFrame with basic info from runs that match selection criteria.
:param run_mode: Pattern to match run modes (reader.ini.name)
:param run_id: Pattern to match a run_id or run_ids
:param available: str or tuple of strs of data types for which data
must be available according to the runs DB.
:param include_tags: String or list of strings of patterns
for required tags
:param exclude_tags: String / list of strings of patterns
for forbidden tags.
Exclusion criteria have higher priority than inclusion criteria.
:param pattern_type: Type of pattern matching to use.
Defaults to 'fnmatch', which means you can use
unix shell-style wildcards (`?`, `*`).
The alternative is 're', which means you can use
full python regular expressions.
:param ignore_underscore: Ignore the underscore at the start of tags
(indicating some degree of officialness or automation).
:param force_reload: Force reloading of runs from storage.
Otherwise, runs are cached after the first time they are loaded in self.runs.
Examples:
- `run_selection(include_tags='blinded')`
select all datasets with a blinded or _blinded tag.
- `run_selection(include_tags='*blinded')`
... with blinded or _blinded, unblinded, blablinded, etc.
- `run_selection(include_tags=['blinded', 'unblinded'])`
... with blinded OR unblinded, but not blablinded.
- `run_selection(include_tags='blinded',
exclude_tags=['bad', 'messy'])`
... select blinded dsatasets that aren't bad or messy
"""
if self.runs is None or force_reload:
self.scan_runs(if_check_available="skip")
dsets = self.runs.copy()
if pattern_type not in ("re", "fnmatch"):
raise ValueError("Pattern type must be 're' or 'fnmatch'")
# Filter datasets by run mode and/or name first
for field_name, requested_value in (("name", run_id), ("mode", run_mode)):
if requested_value is None:
continue
requested_value = strax.to_str_tuple(requested_value)
values = dsets[field_name].values
mask = np.zeros(len(values), dtype=np.bool_)
if pattern_type == "fnmatch":
for i, x in enumerate(values):
mask[i] = np.any([fnmatch.fnmatch(x, rv) for rv in requested_value])
elif pattern_type == "re":
for i, x in enumerate(values):
mask[i] = np.any([re.match(rv, x) for rv in requested_value])
dsets = dsets[mask]
dsets = _include_exclude_tags(
dsets,
include_tags,
exclude_tags,
pattern_type,
ignore_underscore,
)
# Check data availability only for selected datasets
check_available = tuple(
set(list(strax.to_str_tuple(available)) + list(self.context_config["check_available"]))
)
for d in tqdm(check_available, desc="Checking data availability"):
dsets[d + "_available"] = np.in1d(
dsets.name.values, self.list_available(target=d, runs=dsets.name.values)
)
# This will help users call select_runs multiple times
# with same run mode and/or name, but different available
have_available = strax.to_str_tuple(available)
for d in have_available:
if not d + "_available" in dsets.columns:
# Get extra availability info from the run db
d_available = np.in1d(
dsets.name.values, self.list_available(target=d, runs=dsets.name.values)
)
# Save both in the context and for this selection using
# available = ('data_type',)
dsets[d + "_available"] = d_available
# Only return dsets available
for d in have_available:
dsets = dsets[dsets[d + "_available"]]
return dsets
@strax.Context.add_method
def define_run(
self: strax.Context,
name: str,
data: ty.Union[np.ndarray, pd.DataFrame, dict, list, tuple],
from_run: ty.Union[str, None] = None,
):
"""Function for defining new superruns from a list of run_ids.
Note:
The function also allows to create a superrun from data
(numpy.arrays/pandas.DataFframrs). However, this is currently
not supported from the data loading side.
:param name: Name/run_id of the superrun. Suoerrun names must start
with an underscore.
:param data: Data from which the superrun should be created. Can be
either one of the following: a tuple/list of run_ids or a
numpy.array/pandas.DataFrame containing some data.
:param from_run: List of run_ids which were used to create the
numpy.array/pandas.DataFrame passed in data.
"""
if isinstance(data, (pd.DataFrame, np.ndarray)):
if isinstance(data, np.ndarray):
data = pd.DataFrame.from_records(data)
# strax.endtime does not work with DataFrames due to numba
if "endtime" in data.columns:
end = data["endtime"]
else:
end = data["time"] + data["length"] * data["dt"]
# Array of events / regions of interest
start, end = data["time"], end
if from_run is not None:
return self.define_run(name, {from_run: np.transpose([start, end])})
elif "run_id" not in data.columns:
raise ValueError(
"Must provide from_run or data with a run_id column to define a superrun"
)
else:
df = pd.DataFrame(dict(start=start, end=end, run_id=data["run_id"]))
return self.define_run(
name,
{
run_id: rs[["start", "end"]].values.transpose()
for run_id, rs in df.groupby("run_id")
},
)
if isinstance(data, (list, tuple)):
# list of runids
data = strax.to_str_tuple(data)
return self.define_run(name, {run_id: "all" for run_id in data})
if not isinstance(data, dict):
raise ValueError(f"Can't define run from {type(data)}")
# Find start and end time of the new run = earliest start time of other runs
run_md = dict(
start=datetime.datetime.max.replace(tzinfo=pytz.utc),
end=datetime.datetime.min.replace(tzinfo=pytz.utc),
livetime=0,
)
keys = []
starts = []
tags = set()
modes = set()
sources = set()
for _subrunid in data:
doc = self.run_metadata(_subrunid, ["start", "end", "mode", "tags", "source"])
doc.setdefault(
"tags",
[
{"name": ""},
],
)
doc.setdefault("mode", "")
doc.setdefault("source", "")
tags |= set([tag["name"] for tag in doc["tags"]])
modes |= set(strax.to_str_tuple(doc["mode"]))
sources |= set(strax.to_str_tuple(doc["source"]))
if len(sources) > 1:
warnings.warn(f'You are defining a superrun with more than one source: "{sources}"')
run_doc_start = doc["start"].replace(tzinfo=pytz.utc)
run_doc_end = doc["end"].replace(tzinfo=pytz.utc)
run_md["start"] = min(run_md["start"], run_doc_start)
run_md["end"] = max(run_md["end"], run_doc_end)
time_delta = run_doc_end - run_doc_start
run_md["livetime"] += time_delta.total_seconds()
keys.append(_subrunid)
starts.append(run_doc_start)
run_md["tags"] = tuple(tags)
run_md["mode"] = tuple(modes)
run_md["source"] = tuple(sources)
# Make sure subruns are sorted in time
sort_index = np.argsort(starts)
data = {keys[i]: data[keys[i]] for i in sort_index}
# Superrun names must start with an underscore
if not name.startswith("_"):
name = "_" + name
# Dict mapping run_id: array of time ranges or all
for sf in self.storage:
if not sf.readonly and sf.can_define_runs:
sf.define_run(name, sub_run_spec=data, **run_md)
break
else:
raise RuntimeError("No storage frontend registered that allows run definition")
@strax.Context.add_method
def available_for_run(
self: strax.Context,
run_id: str,
include_targets: ty.Union[None, list, tuple, str] = None,
exclude_targets: ty.Union[None, list, tuple, str] = None,
pattern_type: str = "fnmatch",
) -> pd.DataFrame:
"""For a given single run, check all the targets if they are stored. Excludes the target if
never stored anyway.
:param run_id: requested run
:param include_targets: targets to include e.g. raw_records, raw_records* or *_nv. If multiple
targets (e.g. a list) is provided, the target should match any of the arguments!
:param exclude_targets: targets to exclude e.g. raw_records, raw_records* or *_nv. If multiple
targets (e.g. a list) is provided, the target should match none of the arguments!
:param pattern_type: either 'fnmatch' (Unix filename pattern matching) or 're' (Regular
expression operations).
:return: Table of available data per target
"""
if not isinstance(run_id, str):
raise ValueError(f"Only single run_id is allowed (str), got {run_id} ({type(run_id)})")
if exclude_targets is None:
exclude_targets = []
if include_targets is None:
include_targets = []
is_stored = defaultdict(list)
for target in self._plugin_class_registry.keys():
# Skip targets that are not stored
save_when = self.get_save_when(target)
if save_when == strax.SaveWhen.NEVER:
continue
# Should we include this target or exclude it?
include_t: ty.Any = []
exclude_t = False
for excl in strax.to_str_tuple(exclude_targets):
# Simple logic, if we match the excluded target, we should
# should not continue
if _tag_match(target, excl, pattern_type, False):
exclude_t = True
break
# We can match any of the "incl" targets, keep a list and check
# of any of the "incl" matches the target.
for incl in strax.to_str_tuple(include_targets):
include_t.append(_tag_match(target, incl, pattern_type, False))
# Convert to simple bool. If no include_targets is specified,
# all are fine, otherwise check at least one is matching.
include_t = True if not len(include_t) else any(include_t)
if include_t and not exclude_t:
is_stored["target"].append(target)
is_stored["is_stored"].append(self.is_stored(run_id, target))
return pd.DataFrame(is_stored)
def _include_exclude_tags(
dsets,
include_tags,
exclude_tags,
pattern_type,
ignore_underscore,
):
if include_tags is not None:
dsets = dsets[_tags_match(dsets, include_tags, pattern_type, ignore_underscore)]
if exclude_tags is not None:
dsets = dsets[True ^ _tags_match(dsets, exclude_tags, pattern_type, ignore_underscore)]
return dsets
def _tags_match(dsets, patterns, pattern_type, ignore_underscore):
result = np.zeros(len(dsets), dtype=np.bool_)
if isinstance(patterns, str):
patterns = [patterns]
for i, tags in enumerate(dsets.tags):
result[i] = any([
any([
_tag_match(tag, pattern, pattern_type, ignore_underscore)
for tag in tags.split(",")
for pattern in patterns
])
])
return result
def _tag_match(tag, pattern, pattern_type, ignore_underscore):
if ignore_underscore and tag.startswith("_") and not pattern.startswith("_"):
tag = tag[1:]
if pattern_type == "fnmatch":
return fnmatch.fnmatch(tag, pattern)
elif pattern_type == "re":
return bool(re.match(pattern, tag))
raise NotImplementedError
@export
def flatten_run_metadata(md):
# Flatten the tags field. Note this sets it to an empty string
# if it does not exist.
return strax.flatten_dict(
md, separator=".", keep=[strax.RUN_DEFAULTS_KEY, "sub_run_spec", "tags"]
)