forked from AxFoundation/strax
/
context.py
2163 lines (1911 loc) · 94.4 KB
/
context.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
import datetime
import logging
import warnings
import fnmatch
from functools import partial
import typing as ty
import time
import json
import numpy as np
import pandas as pd
import click
import deepdiff
import strax
import inspect
import types
from collections import defaultdict
from immutabledict import immutabledict
from enum import IntEnum
export, __all__ = strax.exporter()
__all__ += ['RUN_DEFAULTS_KEY']
RUN_DEFAULTS_KEY = 'strax_defaults'
# use tqdm as loaded in utils (from tqdm.notebook when in a juypyter env)
tqdm = strax.utils.tqdm
@strax.takes_config(
strax.Option(name='storage_converter', default=False, type=bool,
help='If True, save data that is loaded from one frontend '
'through all willing other storage frontends.'),
strax.Option(name='fuzzy_for', default=tuple(), type=tuple,
help='Tuple or string of plugin names for which no checks for version, '
'providing plugin, and config will be performed when '
'looking for data.'),
strax.Option(name='fuzzy_for_options', default=tuple(), type=tuple,
help='Tuple of config options for which no checks will be '
'performed when looking for data.'),
strax.Option(name='allow_incomplete', default=False, type=bool,
help="Allow loading of incompletely written data, if the "
"storage systems support it"),
strax.Option(name='allow_rechunk', default=True, type=bool,
help="Allow rechunking of data during writing."),
strax.Option(name='allow_multiprocess', default=False, type=bool,
help="Allow multiprocessing."
"If False, will use multithreading only."),
strax.Option(name='allow_shm', default=False, type=bool,
help="Allow use of /dev/shm for interprocess communication."),
strax.Option(name='allow_lazy', default=True, type=bool,
help='Allow "lazy" processing. Saves memory, but incompatible '
'with multiprocessing and perhaps slightly slower.'),
strax.Option(name='forbid_creation_of', default=tuple(), type=tuple,
help="If any of the following datatypes is requested to be "
"created, throw an error instead. Useful to limit "
"descending too far into the dependency graph."),
strax.Option(name='store_run_fields', default=tuple(), type=tuple,
help="Tuple of run document fields to store "
"during scan_run."),
strax.Option(name='check_available', default=tuple(), type=tuple,
help="Tuple of data types to scan availability for "
"during scan_run."),
strax.Option(name='max_messages', default=4, type=int,
help="Maximum number of mailbox messages, i.e. size of buffer "
"between plugins. Too high = RAM blows up. "
"Too low = likely deadlocks."),
strax.Option(name='timeout', default=24 * 3600, type=int,
help="Terminate processing if any one mailbox receives "
"no result for more than this many seconds"),
strax.Option(name='saver_timeout', default=900, type=int,
help="Max time [s] a saver can take to store a result. Set "
"high for slow compression algorithms."),
strax.Option(name='use_per_run_defaults', default=False, type=bool,
help='Scan the run db for per-run defaults. '
'This is an experimental strax feature that will '
'possibly be removed, see issue #246'),
strax.Option(name='free_options', default=tuple(), type=(tuple,list),
help='Do not warn if any of these options are passed, '
'even when no registered plugin takes them.'),
strax.Option(name='apply_data_function', default=tuple(),
type=(tuple, list, ty.Callable),
help='Apply a function to the data prior to returning the'
'data. The function should take three positional arguments: '
'func(<data>, <run_id>, <targets>).'),
strax.Option(name='write_superruns', default=False, type=bool,
help='If True, save superruns as rechunked "new" data.'),
)
@export
class Context:
"""Context for strax analysis.
A context holds info on HOW to process data, such as which plugins provide
what data types, where to store which results, and configuration options
for the plugins.
You start all strax processing through a context.
"""
config: dict
context_config: dict
runs: ty.Union[pd.DataFrame, type(None)] = None
_run_defaults_cache: dict = None
_fixed_plugin_cache: dict = None
storage: ty.List[strax.StorageFrontend]
def __init__(self,
storage=None,
config=None,
register=None,
register_all=None,
**kwargs):
"""
Create a strax context.
:param storage: Storage front-ends to use. Can be:
- None (default). Will use DataDirectory('./strax_data').
- a string: path to use for DataDirectory frontend.
- list/tuple, or single instance, of storage frontends.
:param config: Dictionary with configuration options that will be
applied to plugins
:param register: plugin class or list of plugin classes to register
:param register_all: module for which all plugin classes defined in it
will be registered.
Any additional kwargs are considered Context-specific options; see
Context.takes_config.
"""
self.log = logging.getLogger('strax')
if storage is None:
storage = ['./strax_data']
if not isinstance(storage, (list, tuple)):
storage = [storage]
self.storage = [strax.DataDirectory(s) if isinstance(s, str) else s
for s in storage]
self._plugin_class_registry = dict()
self._run_defaults_cache = dict()
self.set_config(config, mode='replace')
self.set_context_config(kwargs, mode='replace')
if register_all is not None:
self.register_all(register_all)
if register is not None:
self.register(register)
def new_context(self,
storage=tuple(),
config=None,
register=None,
register_all=None,
replace=False,
**kwargs):
"""
Return a new context with new setting adding to those in
this context.
:param replace: If True, replaces settings rather than adding them.
See Context.__init__ for documentation on other parameters.
"""
if not isinstance(storage, (list, tuple)):
storage = [storage]
if config is None:
config = dict()
if register is None:
register = []
if not isinstance(register, (tuple, list)):
register = [register]
if not replace:
storage = self.storage + list(storage)
config = strax.combine_configs(self.config,
config,
mode='update')
kwargs = strax.combine_configs(self.context_config,
kwargs,
mode='update')
new_c = Context(storage=storage, config=config, **kwargs)
if not replace:
new_c._plugin_class_registry = self._plugin_class_registry.copy()
new_c.register_all(register_all)
new_c.register(register)
return new_c
def set_config(self, config=None, mode='update'):
"""Set new configuration options
:param config: dict of new options
:param mode: can be either
- update: Add to or override current options in context
- setdefault: Add to current options, but do not override
- replace: Erase config, then set only these options
"""
if not hasattr(self, 'config'):
self.config = dict()
self.config = strax.combine_configs(
old_config=self.config,
new_config=config,
mode=mode)
def set_context_config(self, context_config=None, mode='update'):
"""
Set new context configuration options
:param context_config: dict of new context configuration options
:param mode: can be either
- update: Add to or override current options in context
- setdefault: Add to current options, but do not override
- replace: Erase config, then set only these options
"""
if not hasattr(self, 'context_config'):
self.context_config = dict()
new_config = strax.combine_configs(
old_config=self.context_config,
new_config=context_config,
mode=mode)
for opt in self.takes_config.values():
opt.validate(new_config)
for k in new_config:
if k not in self.takes_config:
self.log.warning(f"Unknown config option {k}; will do nothing.")
self.context_config = new_config
for k in self.context_config:
if k not in self.takes_config:
self.log.warning(f"Invalid context option {k}; will do nothing.")
def register(self, plugin_class):
"""
Register plugin_class as provider for data types in provides.
:param plugin_class: class inheriting from strax.Plugin.
You can also pass a sequence of plugins to register, but then
you must omit the provides argument.
If a plugin class omits the .provides attribute, we will construct
one from its class name (CamelCase -> snake_case)
Returns plugin_class (so this can be used as a decorator)
"""
if isinstance(plugin_class, (tuple, list)):
# Shortcut for multiple registration
for x in plugin_class:
self.register(x)
return
if not hasattr(plugin_class, 'provides'):
# No output name specified: construct one from the class name
snake_name = strax.camel_to_snake(plugin_class.__name__)
plugin_class.provides = (snake_name,)
# Ensure plugin_class.provides is a tuple
if isinstance(plugin_class.provides, str):
plugin_class.provides = tuple([plugin_class.provides])
for p in plugin_class.provides:
self._plugin_class_registry[p] = plugin_class
already_seen = []
for plugin in self._plugin_class_registry.values():
if plugin in already_seen:
continue
already_seen.append(plugin)
for option, items in plugin.takes_config.items():
self._per_run_default_allowed_check(option, items)
try:
# Looping over the options of the new plugin and check if
# they can be found in the already registered plugins:
for new_option, new_items in plugin_class.takes_config.items():
if not new_option == option:
continue
default = items.get_default('0') # Have to pass will be changed.
new_default = new_items.get_default('0')
if default == new_default:
continue
else:
mes = (f'Two plugins have a different default value'
f' for the same option. The option'
f' "{new_option}" in "{plugin.__name__}" takes'
f' as a default "{default}" while in'
f' "{plugin_class.__name__}" the default value'
f' is set to "{new_default}". Please change'
' one of the defaults.'
)
raise ValueError(mes)
except strax.InvalidConfiguration:
# These are option which are inherited from context options.
pass
return plugin_class
def deregister_plugins_with_missing_dependencies(self):
"""
Deregister plugins in case a data_type the plugin
depends on is not provided by any other plugin.
"""
registry_changed = True
while registry_changed:
all_provides = set()
plugins_to_deregister = []
for p in self._plugin_class_registry.values():
all_provides |= set(p.provides)
for p_key, p in self._plugin_class_registry.items():
requires = set(strax.to_str_tuple(p.depends_on))
if not requires.issubset(all_provides):
plugins_to_deregister.append(p_key)
for p_key in plugins_to_deregister:
self.log.info(f'Deregister {p_key}')
del self._plugin_class_registry[p_key]
if not len(plugins_to_deregister):
registry_changed = False
def search_field(self,
pattern: str,
include_code_usage: bool = True,
return_matches: bool = False,
) -> ty.Union[None, ty.Tuple[defaultdict, dict]]:
"""
Find and print which plugin(s) provides a field that matches
pattern (fnmatch).
:param pattern: pattern to match, e.g. 'time' or 'tim*'
:param include_code_usage: Also include the code occurrences of
the fields that match the pattern.
:param return_matches: If set, return a dictionary with the
matching fields and the occurrences in code.
:return: when return_matches is set, return a dictionary with
the matching fields and the occurrences in code. Otherwise,
we are not returning anything and just print the results
"""
cache = dict()
field_matches = defaultdict(list)
code_matches = dict()
for data_type in sorted(list(self._plugin_class_registry.keys())):
if data_type not in cache:
cache.update(self._get_plugins((data_type,), run_id='0'))
plugin = cache[data_type]
for field_name in plugin.dtype_for(data_type).names:
if fnmatch.fnmatch(field_name, pattern):
field_matches[field_name].append((data_type, plugin.__class__.__name__))
if field_name in code_matches:
continue
# we need to do this for 'field_name' rather than pattern
# since we want an exact match (otherwise too fuzzy with
# comments etc.) Do this once, for all the plugins.
fields_used = self.search_field_usage(field_name, plugin=None)
if include_code_usage and fields_used:
code_matches[field_name] = fields_used
if return_matches:
return field_matches, code_matches
# Print the results and return nothing
for field_name, matches in field_matches.items():
print()
for data_type, name in matches:
print(f"{field_name} is part of {data_type} (provided by {name})")
for field_name, functions in code_matches.items():
print()
for function in functions:
print(f"{field_name} is used in {function}")
def search_field_usage(self,
search_string: str,
plugin: ty.Union[strax.Plugin, ty.List[strax.Plugin], None] = None
) -> ty.List[str]:
"""
Find and return which plugin(s) use a given field.
:param search_string: a field that matches pattern exact
:param plugin: plugin where to look for a field
:return: list of code occurrences in the form of PLUGIN.FUNCTION
"""
if plugin is None:
plugin = list(self._plugin_class_registry.values())
if not isinstance(plugin, (list, tuple)):
plugin = [plugin]
result = []
for plug in plugin:
for attribute_name, class_attribute in plug.__dict__.items():
is_function = isinstance(class_attribute, types.FunctionType)
if is_function:
for line in inspect.getsource(class_attribute).split('\n'):
if search_string in line:
if plug.__class__.__name__ == 'type':
# Make sure we have the instance, not the class:
# >>> class A: pass
# >>> A.__class__.__name__
# 'type'
# >>> A().__class__.__name__
# 'A'
plug = plug()
result += [f'{plug.__class__.__name__}.{attribute_name}']
# Likely to be used several other times
break
return result
def show_config(self, data_type=None, pattern='*', run_id='9' * 20):
"""
Return configuration options that affect data_type.
:param data_type: Data type name
:param pattern: Show only options that match (fnmatch) pattern
:param run_id: Run id to use for run-dependent config options.
If omitted, will show defaults active for new runs.
"""
r = []
if data_type is None:
# search for context options
it = [['Context', self]]
else:
it = self._get_plugins((data_type,), run_id).items()
seen = []
for d, p in it:
# Track plugins we already saw, so options from
# multi-output plugins don't come up several times
if p in seen:
continue
seen.append(p)
for opt in p.takes_config.values():
if not fnmatch.fnmatch(opt.name, pattern):
continue
try:
default = opt.get_default(run_id, self.run_defaults(run_id))
except strax.InvalidConfiguration:
default = strax.OMITTED
c = self.context_config if data_type is None else self.config
r.append(dict(
option=opt.name,
default=default,
current=c.get(opt.name, strax.OMITTED),
applies_to=(p.provides if d != 'Context' else d),
help=opt.help))
if len(r):
return pd.DataFrame(r, columns=r[0].keys())
return pd.DataFrame([])
def lineage(self, run_id, data_type):
"""
Return lineage dictionary for data_type and run_id, based on the
options in this context.
"""
return self._get_plugins((data_type,), run_id)[data_type].lineage
def register_all(self, module):
"""
Register all plugins defined in module.
Can pass a list/tuple of modules to register all in each.
"""
if isinstance(module, (tuple, list)):
# Shortcut for multiple registration
for x in module:
self.register_all(x)
return
for x in dir(module):
x = getattr(module, x)
if not isinstance(x, type(type)):
continue
if issubclass(x, strax.Plugin):
self.register(x)
def data_info(self, data_name: str) -> pd.DataFrame:
"""Return pandas DataFrame describing fields in data_name"""
p = self._get_plugins((data_name,), run_id='0')[data_name]
display_headers = ['Field name', 'Data type', 'Comment']
result = []
for name, dtype in strax.utils.unpack_dtype(p.dtype_for(data_name)):
if isinstance(name, tuple):
title, name = name
else:
title = ''
result.append([name, dtype, title])
return pd.DataFrame(result, columns=display_headers)
def get_single_plugin(self, run_id, data_name):
"""
Return a single fully initialized plugin that produces
data_name for run_id. For use in custom processing.
"""
plugin = self._get_plugins((data_name,), run_id)[data_name]
self._set_plugin_config(plugin, run_id, tolerant=False)
plugin.setup()
return plugin
def _set_plugin_config(self, p, run_id, tolerant=True):
# Explicit type check, since if someone calls this with
# plugin CLASSES, funny business might ensue
assert isinstance(p, strax.Plugin)
config = self.config.copy()
for opt in p.takes_config.values():
try:
opt.validate(config,
run_id=run_id,
run_defaults=self.run_defaults(run_id))
except strax.InvalidConfiguration:
if not tolerant:
raise
p.config = {k: v for k, v in config.items()
if k in p.takes_config}
if p.child_plugin:
# This plugin is a child of another plugin. This means we have to overwrite
# the registered option settings in p.config with the options specified by the
# child. This is required since the super().compute() method in a child plugins
# will still point to the option names of the parent (e.g. self.config['parent_name']).
# options to pass. So update parent config according to child:
for option_name, opt in p.takes_config.items():
# Now loop again overall options for this plugin (parent + child)
# and get all child options:
if opt.child_option:
# See if option is tagged as a child option. In that case replace the
# config value of the parent with the value of the child
option_value = config[option_name]
parent_name = opt.parent_option_name
mes = (f'Cannot find "{parent_name}" among the options of the parent.'
f' Either you specified by accident {option_name} as child option'
f' or you specified the wrong parent_option_name. Have you specified '
'the correct parent option name?')
assert parent_name in p.config, mes
p.config[parent_name] = option_value
def _context_hash(self):
"""
Dump the current config + plugin class registry to a hash as a
sanity check for building the _fixed_plugin_cache. If any item
changes in the config, so does this hash.
"""
base_hash_on_config = self.config.copy()
# Also take into account the versions of the plugins registered
base_hash_on_config.update(
{data_type: (plugin.__version__, plugin.compressor, plugin.input_timeout)
for data_type, plugin in self._plugin_class_registry.items()
if not data_type.startswith('_temp_')
})
return strax.deterministic_hash(base_hash_on_config)
def _plugins_are_cached(self, targets: ty.Tuple[str],) -> bool:
"""Check if all the requested targets are in the _fixed_plugin_cache"""
if self.context_config['use_per_run_defaults'] or self._fixed_plugin_cache is None:
# There is no point in caching if plugins (lineage) can
# change per run or the cache is empty.
return False
context_hash = self._context_hash()
if context_hash not in self._fixed_plugin_cache:
return False
plugin_cache = self._fixed_plugin_cache[context_hash]
return all([t in plugin_cache for t in targets])
def _plugins_to_cache(self, plugins: dict) -> None:
if self.context_config['use_per_run_defaults']:
# There is no point in caching if plugins (lineage) can change per run
return
context_hash = self._context_hash()
if self._fixed_plugin_cache is None:
self._fixed_plugin_cache = {context_hash: dict()}
elif context_hash not in self._fixed_plugin_cache:
# Create a new cache every time the hash is not matching to
# save memory. If a config changes, building the cache again
# should be fast, we just need to track which cache to use.
self.log.info('Replacing context._fixed_plugin_cache since '
'plugins/versions changed')
self._fixed_plugin_cache = {context_hash: dict()}
for target, plugin in plugins.items():
self._fixed_plugin_cache[context_hash][target] = plugin
def _fix_dependency(self, plugin_registry: dict, end_plugin: str):
"""
Starting from end-plugin, fix the dtype until there is nothing
left to fix. Keep in mind that dtypes can be chained.
"""
for go_to in plugin_registry[end_plugin].depends_on:
self._fix_dependency(plugin_registry, go_to)
plugin_registry[end_plugin].fix_dtype()
def __get_plugins_from_cache(self,
run_id: str) -> ty.Dict[str, strax.Plugin]:
# Doubly underscored since we don't do any key-checks etc here
"""Load requested plugins from the plugin_cache"""
requested_plugins = {}
for target, plugin in self._fixed_plugin_cache[self._context_hash()].items():
if target in requested_plugins:
# If e.g. target is already seen because the plugin is
# multi output
continue
requested_p = plugin.__copy__()
requested_p.run_id = run_id
# Re-use only one instance if the plugin is multi output
for provides in strax.to_str_tuple(requested_p.provides):
requested_plugins[provides] = requested_p
# At this stage, all the plugins should be in requested_plugins
# To prevent infinite copying, we are only now linking the
# dependencies of each plugin to another where needed.
for target, plugin in requested_plugins.items():
plugin.deps = {dependency: requested_plugins[dependency]
for dependency in plugin.depends_on
}
# Finally, fix the dtype. Since infer_dtype may depend on the
# entire deps chain, we need to start at the last plugin and go
# all the way down to the lowest level.
for final_plugins in self._get_end_targets(requested_plugins):
self._fix_dependency(requested_plugins, final_plugins)
return requested_plugins
def _get_plugins(self,
targets: ty.Tuple[str],
run_id: str) -> ty.Dict[str, strax.Plugin]:
"""
Return dictionary of plugin instances necessary to compute targets
from scratch.
For a plugin that produces multiple outputs, we make only a single
instance, which is referenced under multiple keys in the output dict.
"""
if self._plugins_are_cached(targets):
return self.__get_plugins_from_cache(run_id)
# Check all config options are taken by some registered plugin class
# (helps spot typos)
all_opts = set().union(*[
pc.takes_config.keys()
for pc in self._plugin_class_registry.values()])
for k in self.config:
if not (k in all_opts or k in self.context_config['free_options']):
self.log.warning(f"Option {k} not taken by any registered plugin")
# Initialize plugins for the entire computation graph
# (most likely far further down than we need)
# to get lineages and dependency info.
def get_plugin(data_type):
nonlocal non_local_plugins
if data_type not in self._plugin_class_registry:
raise KeyError(f"No plugin class registered that provides {data_type}")
plugin = self._plugin_class_registry[data_type]()
d_provides = None # just to make codefactor happy
for d_provides in plugin.provides:
non_local_plugins[d_provides] = plugin
plugin.run_id = run_id
# The plugin may not get all the required options here
# but we don't know if we need the plugin yet
self._set_plugin_config(plugin, run_id, tolerant=True)
plugin.deps = {d_depends: get_plugin(d_depends) for d_depends in plugin.depends_on}
last_provide = d_provides
if plugin.child_plugin:
# Plugin is a child of another plugin, hence we have to
# drop the parents config from the lineage
configs = {}
# Getting information about the parent:
parent_class = plugin.__class__.__bases__[0]
# Get all parent options which are overwritten by a child:
parent_options = [option.parent_option_name for option in plugin.takes_config.values()
if option.child_option]
for option_name, v in plugin.config.items():
# Looping over all settings, option_name is either the option name of the
# parent or the child.
if option_name in parent_options:
# In case it is the parent we continue
continue
if plugin.takes_config[option_name].track:
# Add all options which should be tracked:
configs[option_name] = v
# Also adding name and version of the parent to the lineage:
configs[parent_class.__name__] = parent_class.__version__
plugin.lineage = {last_provide: (
plugin.__class__.__name__,
plugin.version(run_id),
configs)}
else:
plugin.lineage = {last_provide: (
plugin.__class__.__name__,
plugin.version(run_id),
{option: setting for option, setting
in plugin.config.items()
if plugin.takes_config[option].track})}
for d_depends in plugin.depends_on:
plugin.lineage.update(plugin.deps[d_depends].lineage)
if not hasattr(plugin, 'data_kind') and not plugin.multi_output:
if len(plugin.depends_on):
# Assume data kind is the same as the first dependency
first_dep = plugin.depends_on[0]
plugin.data_kind = plugin.deps[first_dep].data_kind_for(first_dep)
else:
# No dependencies: assume provided data kind and
# data type are synonymous
plugin.data_kind = plugin.provides[0]
plugin.fix_dtype()
return plugin
non_local_plugins = {}
for t in targets:
p = get_plugin(t)
non_local_plugins[t] = p
self._plugins_to_cache(non_local_plugins)
return non_local_plugins
def _per_run_default_allowed_check(self, option_name, option):
"""Check if an option of a registered plugin is allowed"""
per_run_default = option.default_by_run != strax.OMITTED
not_overwritten = option_name not in self.config
per_run_is_forbidden = not self.context_config['use_per_run_defaults']
if per_run_default and not_overwritten and per_run_is_forbidden:
raise strax.InvalidConfiguration(
f'{option_name} is specified as a per-run-default which is not '
f'allowed by the context')
@staticmethod
def _get_end_targets(plugins: dict) -> ty.Tuple[str]:
"""
Get the datatype that is provided by a plugin but not depended
on by any other plugin
"""
provides = [prov for p in plugins.values()
for prov in strax.to_str_tuple(p.provides)]
depends_on = [dep for p in plugins.values()
for dep in strax.to_str_tuple(p.depends_on)]
uniques = list(set(provides) ^ set(depends_on))
return strax.to_str_tuple(uniques)
@property
def _find_options(self):
# The plugin settings in the lineage are stored with the last
# plugin provides name as a key. This can be quite confusing
# since e.g. to be fuzzy for the peaklets settings the user has
# to specify fuzzy_for=('lone_hits'). Here a small work around
# to change this and not to reprocess the entire data set.
fuzzy_for_keys = strax.to_str_tuple(self.context_config['fuzzy_for'])
last_provides = []
for key in fuzzy_for_keys:
last_provides.append(self._plugin_class_registry[key].provides[-1])
last_provides = tuple(last_provides)
return dict(fuzzy_for=last_provides,
fuzzy_for_options=self.context_config['fuzzy_for_options'],
allow_incomplete=self.context_config['allow_incomplete'])
@property
def _sorted_storage(self) -> ty.List[strax.StorageFrontend]:
"""
Simple ordering of the storage frontends on the fly when e.g.
looking for data. This allows us to use the simple self.storage
as a simple list without asking users to keep any particular
order in mind. Return the fastest first and try loading from it
"""
return sorted(self.storage, key=lambda x: x.storage_type)
def _get_partial_loader_for(self, key, time_range=None, chunk_number=None):
"""
Get partial loaders to allow loading data later
:param key: strax.DataKey
:param time_range: 2-length arraylike of (start, exclusive end) of row
numbers to get. Default is None, which means get the entire run.
:param chunk_number: number of the chunk for data specified by
strax.DataKey. This chunck is loaded exclusively.
:return: partial object
"""
for sf in self._sorted_storage:
try:
# Partial is clunky... but allows specifying executor later
# Since it doesn't run until later, we must do a find now
# that we can still handle DataNotAvailable
sf.find(key, **self._find_options)
return partial(sf.loader,
key,
time_range=time_range,
chunk_number=chunk_number,
**self._find_options)
except strax.DataNotAvailable:
continue
return False
def get_components(self, run_id: str,
targets=tuple(), save=tuple(),
time_range=None, chunk_number=None,
) -> strax.ProcessorComponents:
"""
Return components for setting up a processor
{get_docs}
"""
save = strax.to_str_tuple(save)
targets = strax.to_str_tuple(targets)
for t in targets:
if len(t) == 1:
raise ValueError(f"Plugin names must be more than one letter, not {t}")
plugins = self._get_plugins(targets, run_id)
# Get savers/loaders, and meanwhile filter out plugins that do not
# have to do computation. (their instances will stick around
# though the .deps attribute of plugins that do)
loaders = dict()
loader_plugins = dict()
savers = dict()
seen = set()
to_compute = dict()
def check_cache(target_i):
"""For some target, add loaders, and savers where appropriate"""
nonlocal plugins, loaders, savers, seen
if target_i in seen:
return
seen.add(target_i)
target_plugin = plugins[target_i]
# Can we load this data?
loading_this_data = False
key = self.key_for(run_id, target_i)
loader = self._get_partial_loader_for(
key,
chunk_number=chunk_number,
time_range=time_range)
_is_superrun = (run_id.startswith('_') and
not target_plugin.provides[0].startswith('_temp'))
if not loader and _is_superrun:
if time_range is not None:
raise NotImplementedError("time range loading not yet "
"supported for superruns")
sub_run_spec = self.run_metadata(
run_id, 'sub_run_spec')['sub_run_spec']
# Make subruns if they do not exist.
self.make(list(sub_run_spec.keys()), target_i, save=(target_i,))
ldrs = []
for subrun in sub_run_spec:
sub_key = self.key_for(subrun, target_i)
if sub_run_spec[subrun] == 'all':
_subrun_time_range = None
else:
_subrun_time_range = sub_run_spec[subrun]
loader = self._get_partial_loader_for(
sub_key,
time_range=_subrun_time_range,
chunk_number=chunk_number)
if not loader:
raise RuntimeError(
f"Could not load {target_i} for subrun {subrun} "
"even though we made it? Is the plugin "
"you are requesting a SaveWhen.NEVER-plguin?")
ldrs.append(loader)
def concat_loader(*args, **kwargs):
for x in ldrs:
yield from x(*args, **kwargs)
# pylint: disable=unnecessary-lambda
loader = lambda *args, **kwargs: concat_loader(*args, **kwargs)
if loader:
# Found it! No need to make it or look in other frontends
loading_this_data = True
loaders[target_i] = loader
loader_plugins[target_i] = target_plugin
del plugins[target_i]
else:
# Data not found anywhere. We will be computing it.
self._check_forbidden()
if (time_range is not None
and target_plugin.save_when[target_i] > strax.SaveWhen.EXPLICIT):
# While the data type providing the time information is
# available (else we'd have failed earlier), one of the
# other requested data types is not.
error_message = (
f"Time range selection assumes data is already available,"
f" but {target_i} for {run_id} is not.")
if target_plugin.save_when[target_i] == strax.SaveWhen.TARGET:
error_message += (f"\nFirst run st.make({run_id}, "
f"{target_i}) to make {target_i}.")
raise strax.DataNotAvailable(error_message)
if '*' in self.context_config['forbid_creation_of']:
raise strax.DataNotAvailable(
f"{target_i} for {run_id} not found in any storage, and "
"your context specifies no new data can be created.")
if target_i in self.context_config['forbid_creation_of']:
raise strax.DataNotAvailable(
f"{target_i} for {run_id} not found in any storage, and "
"your context specifies it cannot be created.")
to_compute[target_i] = target_plugin
for dep_d in target_plugin.depends_on:
check_cache(dep_d)
if self.context_config['storage_converter']:
warnings.warn('The storage converter mode will be replaced by "copy_to_frontend" soon. '
'It will be removed in one of the future releases. Please let us know if '
'you are still using the "storage_converter" option.', DeprecationWarning)
# Should we save this data? If not, return.
_can_store_superrun = (self.context_config['write_superruns'] and _is_superrun)
# In case we can load the data already we want either use the storage converter
# or make a new superrun.
if (loading_this_data
and not self.context_config['storage_converter']
and not _can_store_superrun):
return
# Now we should check whether we meet the saving requirements (Explicit, Target etc.)
# In case of the storage converter mode we copy already existing data. So we do not
# have to check for the saving requirements here.
current_plugin_to_savers = [target_i]
if (not self._target_should_be_saved(
target_plugin, target_i, targets, save, loader, _is_superrun)
and not self.context_config['storage_converter']):
if len(target_plugin.provides) > 1:
# In case the plugin has more then a single provides we also have to check
# whether any of the other data_types should be stored. Hence only remove
# the current traget from the list of plugins_to_savers.
current_plugin_to_savers = []
else:
# In case of a single-provide plugin we can return now.
return
# Warn about conditions that preclude saving, but the user
# might not expect.
if time_range is not None:
# We're not even getting the whole data.
# Without this check, saving could be attempted if the
# storage converter mode is enabled.
self.log.warning(f"Not saving {target_i} while "
f"selecting a time range in the run")
return
if any([len(v) > 0
for k, v in self._find_options.items()
if 'fuzzy' in k]):
# In fuzzy matching mode, we cannot (yet) derive the
# lineage of any data we are creating. To avoid creating
# false data entries, we currently do not save at all.
self.log.warning(f"Not saving {target_i} while fuzzy matching is"
f" turned on.")
return
if self.context_config['allow_incomplete']:
self.log.warning(f"Not saving {target_i} while loading incomplete"
f" data is allowed.")
return
# Save the target and any other outputs of the plugin.
if _is_superrun:
# In case of a superrun we are only interested in the specified targets
# and not any other stuff provided by the corresponding plugin.
savers = self._add_saver(savers, target_i, target_plugin,
_is_superrun, loading_this_data)
else:
for d_to_save in set(current_plugin_to_savers + list(target_plugin.provides)):
key = self.key_for(run_id, d_to_save)
loader = self._get_partial_loader_for(key,
time_range=time_range,
chunk_number=chunk_number)
if ((not self._target_should_be_saved(
target_plugin, d_to_save, targets, save, loader, _is_superrun)
and not self.context_config['storage_converter'])
or savers.get(d_to_save)):
# This multi-output plugin was scanned before
# let's not create doubled savers or store data_types we do not want to.