-
Notifications
You must be signed in to change notification settings - Fork 31
/
__init__.py
2083 lines (1783 loc) · 78.3 KB
/
__init__.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
from collections import defaultdict, deque, namedtuple
import collections.abc
import copy
import json
from enum import Enum
from functools import partial
import itertools
import os
from pkg_resources import resource_filename as rs_fn
import threading
import time as ttime
import uuid
import warnings
import jsonschema
import numpy
from ._version import get_versions
__all__ = ['DocumentNames', 'schemas', 'schema_validators', 'compose_run']
class DocumentNames(Enum):
stop = 'stop'
start = 'start'
descriptor = 'descriptor'
event = 'event'
datum = 'datum'
resource = 'resource'
event_page = 'event_page'
datum_page = 'datum_page'
bulk_datum = 'bulk_datum' # deprecated
bulk_events = 'bulk_events' # deprecated
class DocumentRouter:
"""
Route each document by type to a corresponding method.
When an instance is called with a document type and a document like::
router(name, doc)
the document is passed to the method of the corresponding name, as in::
getattr(router, name)(doc)
The method is expected to return ``None`` or a valid document of the same
type. It may be the original instance (passed through), a copy, or a
different dict altogether.
Finally, the call to ``router(name, doc)`` returns::
(name, getattr(router, name)(doc))
"""
def __call__(self, name, doc, validate=False):
"""
Process a document.
Parameters
----------
name : string
doc : dict
validate : boolean
Apply jsonschema validation to the documents coming *out*. This is
False by default.
Returns
-------
name, output_doc : string, dict
The same name as what was passed in, and a doc that may be the same
instance as doc, a copy of doc, or a different dict altogether.
"""
return self._dispatch(name, doc, validate)
def _dispatch(self, name, doc, validate):
"""
Dispatch to the method corresponding to the `name`.
Optionally validate that the result is still a valid document.
"""
output_doc = getattr(self, name)(doc)
# If 'event' is not defined by the subclass but 'event_page' is, or
# vice versa, use that. And the same for 'datum_page' / 'datum.
if output_doc is NotImplemented:
if name == 'event':
event_page = pack_event_page(doc)
# Subclass' implementation of event_page may return a valid
# EventPage or None or NotImplemented.
output_event_page = self.event_page(event_page)
output_event_page = output_event_page if output_event_page is not None else event_page
if output_event_page is not NotImplemented:
output_doc, = unpack_event_page(output_event_page)
elif name == 'datum':
datum_page = pack_datum_page(doc)
# Subclass' implementation of datum_page may return a valid
# DatumPage or None or NotImplemented.
output_datum_page = self.datum_page(datum_page)
output_datum_page = output_datum_page if output_datum_page is not None else datum_page
if output_datum_page is not NotImplemented:
output_doc, = unpack_datum_page(output_datum_page)
elif name == 'event_page':
output_events = []
for event in unpack_event_page(doc):
# Subclass' implementation of event may return a valid
# Event or None or NotImplemented.
output_event = self.event(event)
output_event = output_event if output_event is not None else event
if output_event is NotImplemented:
break
output_events.append(output_event)
else:
output_doc = pack_event_page(*output_events)
elif name == 'datum_page':
output_datums = []
for datum in unpack_datum_page(doc):
# Subclass' implementation of datum may return a valid
# Datum or None or NotImplemented.
output_datum = self.datum(datum)
output_datum = output_datum if output_datum is not None else datum
if output_datum is NotImplemented:
break
output_datums.append(output_datum)
else:
output_doc = pack_datum_page(*output_datums)
# If we still don't find an implemented method by here, then pass the
# original document through.
if output_doc is NotImplemented:
output_doc = doc
if validate:
schema_validators[getattr(DocumentNames, name)].validate(output_doc)
return (name, output_doc if output_doc is not None else doc)
# The methods below return NotImplemented, a built-in Python constant.
# Note that it is not interchangeable with NotImplementedError. See docs at
# https://docs.python.org/3/library/constants.html#NotImplemented
# It is used here so that _dispatch, defined above, can detect whether a
# subclass implements event, event_page, both, or neither. This is similar
# to how Python uses NotImplemented in arithmetic operations, as described
# in the documentation.
def start(self, doc):
return NotImplemented
def stop(self, doc):
return NotImplemented
def descriptor(self, doc):
return NotImplemented
def resource(self, doc):
return NotImplemented
def event(self, doc):
return NotImplemented
def datum(self, doc):
return NotImplemented
def event_page(self, doc):
return NotImplemented
def datum_page(self, doc):
return NotImplemented
def bulk_events(self, doc):
# Do not modify this in a subclass. Use event_page.
warnings.warn(
"The document type 'bulk_events' has been deprecated in favor of "
"'event_page', whose structure is a transpose of 'bulk_events'.")
for page in bulk_events_to_event_pages(doc):
self.event_page(page)
def bulk_datum(self, doc):
# Do not modify this in a subclass. Use event_page.
warnings.warn(
"The document type 'bulk_datum' has been deprecated in favor of "
"'datum_page', whose structure is a transpose of 'bulk_datum'.")
self.datum_page(bulk_datum_to_datum_page(doc))
class SingleRunDocumentRouter(DocumentRouter):
"""
A DocumentRouter intended to process events from exactly one run.
"""
def __init__(self):
super().__init__()
self._start_doc = None
self._descriptors = dict()
def __call__(self, name, doc, validate=False):
"""
Process a document.
Also, track of the start document and descriptor documents
passed to this SingleRunDocumentRouter in caches.
Parameters
----------
name : string
doc : dict
validate : boolean
Apply jsonschema validation to the documents coming *out*. This is
False by default.
Returns
-------
name, output_doc : string, dict
The same name as what was passed in, and a doc that may be the same
instance as doc, a copy of doc, or a different dict altogether.
"""
if name == 'start':
if self._start_doc is None:
self._start_doc = doc
else:
raise EventModelValueError(
f'SingleRunDocumentRouter associated with start document {self._start_doc["uid"]} '
f'received a second start document with uid {doc["uid"]}'
)
elif name == 'descriptor':
if doc['run_start'] == self._start_doc['uid']:
self._descriptors[doc['uid']] = doc
else:
raise EventModelValueError(
f'SingleRunDocumentRouter associated with start document {self._start_doc["uid"]} '
f'received a descriptor {doc["uid"]} associated with start document {doc["run_start"]}'
)
# Defer to superclass for dispatch/processing.
return super().__call__(name, doc, validate)
def get_start(self):
"""Convenience method returning the start document for the associated run.
If no start document has been processed EventModelError will be raised.
Returns
-------
start document : dict
"""
if self._start_doc is None:
raise EventModelError('SingleRunDocumentRouter has not processed a start document yet')
return self._start_doc
def get_descriptor(self, doc):
"""Convenience method returning the descriptor associated with the specified document.
Parameters
----------
doc : dict
event-model document
Returns
-------
descriptor document : dict
"""
if 'descriptor' not in doc:
raise EventModelValueError(f'document is not associated with a descriptor:\n{doc}')
elif doc['descriptor'] not in self._descriptors:
raise EventModelValueError(
f'SingleRunDocumentRouter has not processed a descriptor with uid {doc["descriptor"]}'
)
return self._descriptors[doc['descriptor']]
def get_stream_name(self, doc):
"""Convenience method returning the name of the stream for the specified document.
Parameters
----------
doc : dict
event-model document
Returns
-------
stream name : str
"""
return self.get_descriptor(doc).get('name')
class HandlerRegistryView(collections.abc.Mapping):
def __init__(self, handler_registry):
self._handler_registry = handler_registry
def __repr__(self):
return f"HandlerRegistryView({self._handler_registry!r})"
def __getitem__(self, key):
return self._handler_registry[key]
def __iter__(self):
yield from self._handler_registry
def __len__(self):
return len(self._handler_registry)
def __setitem__(self, key, value):
raise EventModelTypeError(
"The handler registry cannot be edited directly. "
"Instead, use the method Filler.register_handler.")
def __delitem__(self, key):
raise EventModelTypeError(
"The handler registry cannot be edited directly. "
"Instead, use the method Filler.deregister_handler.")
# A "coersion funcion" is a hook that Filler can use to, for example, ensure
# all the external data read in my handlers is an *actual* numpy array as
# opposed to some other array-like such as h5py.Dataset or dask.array.Array,
# or wrap every result is dask.array.from_array(...).
#
# It has access to the handler_class as it is registered and to some state
# provided by the Filler (more on that below). It is expected to return
# something that is API-compatible with handler_class. That might be
# handler_class itself (a no-op), a subclass, or an altogether different class
# with the same API. See example below.
#
# The "state provided by the Filler", mentioned above is passed into the
# coersion functions below as ``filler_state``. It is a namespace containing
# information that may be useful for the coersion functions. Currently, it has
# ``filler_state.descriptor`` and ``filler_state.key``. More may be added in
# the future if the need arises. Ultimately, this is necessary because Resource
# documents don't know the shape and dtype of the data that they reference.
# That situation could be improved in the future; to some degree this is a
# work-around.
#
# As an implementation detail, the ``filler_state`` is a ``threading.local``
# object to ensure that filling is thread-safe.
#
# Third-party libraries can register custom coersion options via the
# register_coersion function below. For example, databroker uses this to
# register a 'delayed' option. This avoids introducing dependency on a specific
# delayed-computation framework (e.g. dask) in event-model itself.
def as_is(handler_class, filler_state):
"A no-op coersion function that returns handler_class unchanged."
return handler_class
def force_numpy(handler_class, filler_state):
"A coersion that makes handler_class.__call__ return actual numpy.ndarray."
class Subclass(handler_class):
def __call__(self, *args, **kwargs):
raw_result = super().__call__(*args, **kwargs)
result_as_array = numpy.asarray(raw_result)
return result_as_array
return Subclass
# maps coerce option to corresponding coersion function
_coersion_registry = {'as_is': as_is,
'force_numpy': force_numpy}
def register_coersion(name, func, overwrite=False):
"""
Register a new option for :class:`Filler`'s ``coerce`` argument.
This is an advanced feature. See source code for comments and examples.
Parameters
----------
name : string
The new value for ``coerce`` that will invoke this function.
func : callable
Expected signature::
func(filler, handler_class) -> handler_class
overwrite : boolean, optional
False by default. Name collissions will raise ``EventModelValueError``
unless this is set to ``True``.
"""
if name in _coersion_registry and not overwrite:
# If we are re-registering the same object, there is no problem.
original = _coersion_registry[name]
if original is func:
return
raise EventModelValueError(
f"The coersion function {func} could not be registered for the "
f"name {name} because {_coersion_registry[name]} is already "
f"registered. Use overwrite=True to force it.")
_coersion_registry[name] = func
class Filler(DocumentRouter):
"""Pass documents through, loading any externally-referenced data.
It is recommended to use the Filler as a context manager. Because the
Filler manages caches of potentially expensive resources (e.g. large data
in memory) managing its lifecycle is important. If used as a context
manager, it will drop references to its caches upon exit from the
context. Unless the user holds additional references to those caches, they
will be garbage collected.
But for some applications, such as taking multiple passes over the same
data, it may be useful to keep a longer-lived Filler instance and then
manually delete it when finished.
See Examples below.
Parameters
----------
handler_registry : dict
Maps each 'spec' (a string identifying a given type or external
resource) to a handler class.
A 'handler class' may be any callable with the signature::
handler_class(full_path, **resource_kwargs)
It is expected to return an object, a 'handler instance', which is also
callable and has the following signature::
handler_instance(**datum_kwargs)
As the names 'handler class' and 'handler instance' suggest, this is
typically implemented using a class that implements ``__init__`` and
``__call__``, with the respective signatures. But in general it may be
any callable-that-returns-a-callable.
include : Iterable
The set of fields to fill. By default all unfilled fields are filled.
This parameter is mutually incompatible with the ``exclude`` parameter.
exclude : Iterable
The set of fields to skip filling. By default all unfilled fields are
filled. This parameter is mutually incompatible with the ``include``
parameter.
root_map: dict
str -> str mapping to account for temporarily moved/copied/remounted
files. Any resources which have a ``root`` in ``root_map`` will be
loaded using the mapped ``root``.
coerce : {'as_is', 'numpy'}
Default is 'as_is'. Other options (e.g. 'delayed') may be registered by
external packages at runtime.
handler_cache : dict, optional
A cache of handler instances. If None, a dict is used.
resource_cache : dict, optional
A cache of Resource documents. If None, a dict is used.
datum_cache : dict, optional
A cache of Datum documents. If None, a dict is used.
descriptor_cache : dict, optional
A cache of EventDescriptor documents. If None, a dict is used.
retry_intervals : Iterable, optional
If data is not found on the first try, there may a race between the
I/O systems creating the external data and this stream of Documents
that reference it. If Filler encounters an ``IOError`` it will wait a
bit and retry. This list specifies how long to sleep (in seconds)
between subsequent attempts. Set to ``None`` to try only once before
raising ``DataNotAccessible``. A subclass may catch this exception and
implement a different retry mechanism --- for example using a different
implementation of sleep from an async framework. But by default, a
sequence of several retries with increasing sleep intervals is used.
The default sequence should not be considered stable; it may change at
any time as the authors tune it.
Raises
------
DataNotAccessible
If an IOError is raised when loading the data after the configured
number of attempts. See the ``retry_intervals`` parameter for details.
Examples
--------
A Filler may be used as a context manager.
>>> with Filler(handler_registry) as filler:
... for name, doc in stream:
... filler(name, doc) # mutates doc in place
... # Do some analysis or export with name and doc.
Or as a long-lived object.
>>> f = Filler(handler_registry)
>>> for name, doc in stream:
... filler(name, doc) # mutates doc in place
... # Do some analysis or export with name and doc.
...
>>> del filler # Free up memory from potentially large caches.
"""
def __init__(self, handler_registry, *,
include=None, exclude=None, root_map=None, coerce='as_is',
handler_cache=None, resource_cache=None, datum_cache=None,
descriptor_cache=None, inplace=None,
retry_intervals=(0.001, 0.002, 0.004, 0.008, 0.016, 0.032,
0.064, 0.128, 0.256, 0.512, 1.024)):
if inplace is None:
self._inplace = True
warnings.warn(
"'inplace' argument not specified. It is recommended to "
"specify True or False. In future releases, 'inplace' "
"will default to False.")
else:
self._inplace = inplace
if include is not None and exclude is not None:
raise EventModelValueError(
"The parameters `include` and `exclude` are mutually "
"incompatible. At least one must be left as the default, "
"None.")
try:
self._coersion_func = _coersion_registry[coerce]
except KeyError:
raise EventModelKeyError(
f"The option coerce={coerce!r} was given to event_model.Filler. "
f"The valid options are {set(_coersion_registry)}.")
self._coerce = coerce
# See comments on coerision functions above for the use of
# _current_state, which is passed to coersion functions' `filler_state`
# parameter.
self._current_state = threading.local()
self._unpatched_handler_registry = {}
self._handler_registry = {}
for spec, handler_class in handler_registry.items():
self.register_handler(spec, handler_class)
self.handler_registry = HandlerRegistryView(self._handler_registry)
if include is not None:
warnings.warn(
"In a future release of event-model, the argument `include` "
"will be removed from Filler.", DeprecationWarning)
self.include = include
if exclude is not None:
warnings.warn(
"In a future release of event-model, the argument `exclude` "
"will be removed from Filler.", DeprecationWarning)
self.exclude = exclude
self.root_map = root_map or {}
if handler_cache is None:
handler_cache = self.get_default_handler_cache()
if resource_cache is None:
resource_cache = self.get_default_resource_cache()
if datum_cache is None:
datum_cache = self.get_default_datum_cache()
if descriptor_cache is None:
descriptor_cache = self.get_default_descriptor_cache()
self._handler_cache = handler_cache
self._resource_cache = resource_cache
self._datum_cache = datum_cache
self._descriptor_cache = descriptor_cache
if retry_intervals is None:
retry_intervals = []
self.retry_intervals = retry_intervals
self._closed = False
def __eq__(self, other):
return (
type(self) is type(other) and
self.inplace == other.inplace and
self._coerce == other._coerce and
self.include == other.include and
self.exclude == other.exclude and
self.root_map == other.root_map and
type(self._handler_cache) is type(other._handler_cache) and
type(self._resource_cache) is type(other._resource_cache) and
type(self._datum_cache) is type(other._datum_cache) and
type(self._descriptor_cache) is type(other._descriptor_cache) and
self.retry_intervals == other.retry_intervals
)
def __getstate__(self):
return dict(
inplace=self._inplace,
coersion_func=self._coerce,
handler_registry=self._unpatched_handler_registry,
include=self.include,
exclude=self.exclude,
root_map=self.root_map,
handler_cache=self._handler_cache,
resource_cache=self._resource_cache,
datum_cache=self._datum_cache,
descriptor_cache=self._descriptor_cache,
retry_intervals=self.retry_intervals)
def __setstate__(self, d):
self._inplace = d['inplace']
self._coerce = d['coersion_func']
# See comments on coerision functions above for the use of
# _current_state, which is passed to coersion functions' `filler_state`
# parameter.
self._current_state = threading.local()
self._unpatched_handler_registry = {}
self._handler_registry = {}
for spec, handler_class in d['handler_registry'].items():
self.register_handler(spec, handler_class)
self.handler_registry = HandlerRegistryView(self._handler_registry)
self.include = d['include']
self.exclude = d['exclude']
self.root_map = d['root_map']
self._handler_cache = d['handler_cache']
self._resource_cache = d['resource_cache']
self._datum_cache = d['datum_cache']
self._descriptor_cache = d['descriptor_cache']
retry_intervals = d['retry_intervals']
if retry_intervals is None:
retry_intervals = []
self._retry_intervals = retry_intervals
self._closed = False
@property
def retry_intervals(self):
return self._retry_intervals
@retry_intervals.setter
def retry_intervals(self, value):
self._retry_intervals = list(value)
def __repr__(self):
return "<Filler>" if not self._closed else "<Closed Filler>"
@staticmethod
def get_default_resource_cache():
return {}
@staticmethod
def get_default_descriptor_cache():
return {}
@staticmethod
def get_default_datum_cache():
return {}
@staticmethod
def get_default_handler_cache():
return {}
@property
def inplace(self):
return self._inplace
def clone(self, handler_registry=None, *,
root_map=None, coerce=None,
handler_cache=None, resource_cache=None, datum_cache=None,
descriptor_cache=None, inplace=None,
retry_intervals=None):
"""
Create a new Filler instance from this one.
By default it will be created with the same settings that this Filler
has. Individual settings may be overridden here.
The clone does *not* share any caches or internal state with the
original.
"""
if handler_registry is None:
handler_registry = self._unpatched_handler_registry
if root_map is None:
root_map = self.root_map
if coerce is None:
coerce = self._coerce
if inplace is None:
inplace = self.inplace
if retry_intervals is None:
retry_intervals = self.retry_intervals
return Filler(handler_registry, root_map=root_map,
coerce=coerce,
handler_cache=handler_cache,
resource_cache=resource_cache,
datum_cache=datum_cache,
descriptor_cache=descriptor_cache,
inplace=inplace,
retry_intervals=retry_intervals)
def register_handler(self, spec, handler, overwrite=False):
if (not overwrite) and (spec in self._handler_registry):
original = self._unpatched_handler_registry[spec]
if original is handler:
return
raise DuplicateHandler(
f"There is already a handler registered for the spec {spec!r}. "
f"Use overwrite=True to deregister the original.\n"
f"Original: {original}\n"
f"New: {handler}")
self.deregister_handler(spec)
# Keep a raw copy, unused above for identifying redundant registration.
self._unpatched_handler_registry[spec] = handler
# Let the 'coerce' argument to Filler.__init__ modify the handler if it
# wants to.
self._handler_registry[spec] = self._coersion_func(
handler, self._current_state)
def deregister_handler(self, spec):
handler = self._handler_registry.pop(spec, None)
if handler is not None:
self._unpatched_handler_registry.pop(spec)
for key in list(self._handler_cache):
resource_uid, spec_ = key
if spec == spec_:
del self._handler_cache[key]
def resource(self, doc):
# Defer creating the handler instance until we actually need it, when
# we fill the first Event field that requires this Resource.
self._resource_cache[doc['uid']] = doc
return doc
# Handlers operate document-wise, so we'll explode pages into individual
# documents.
def datum_page(self, doc):
datum = self.datum # Avoid attribute lookup in hot loop.
for datum_doc in unpack_datum_page(doc):
datum(datum_doc)
return doc
def datum(self, doc):
self._datum_cache[doc['datum_id']] = doc
return doc
def event_page(self, doc):
# TODO We may be able to fill a page in place, and that may be more
# efficient than unpacking the page in to Events, filling them, and the
# re-packing a new page. But that seems tricky in general since the
# page may be implemented as a DataFrame or dict, etc.
filled_doc = self.fill_event_page(doc, include=self.include,
exclude=self.exclude)
return filled_doc
def event(self, doc):
filled_doc = self.fill_event(doc, include=self.include,
exclude=self.exclude)
return filled_doc
def fill_event_page(self, doc, include=None, exclude=None, inplace=None):
filled_events = []
for event_doc in unpack_event_page(doc):
filled_events.append(self.fill_event(event_doc,
include=include,
exclude=exclude,
inplace=True))
filled_doc = pack_event_page(*filled_events)
if inplace is None:
inplace = self._inplace
if inplace:
doc['data'] = filled_doc['data']
doc['filled'] = filled_doc['filled']
return doc
else:
return filled_doc
def get_handler(self, resource):
"""
Return a new Handler instance for this Resource.
Parameters
----------
resource: dict
Returns
-------
handler: Handler
"""
if self._closed:
raise EventModelRuntimeError(
"This Filler has been closed and is no longer usable.")
try:
handler_class = self.handler_registry[resource['spec']]
except KeyError as err:
raise UndefinedAssetSpecification(
f"Resource document with uid {resource['uid']} "
f"refers to spec {resource['spec']!r} which is "
f"not defined in the Filler's "
f"handler registry.") from err
# Apply root_map.
resource_path = resource['resource_path']
original_root = resource.get('root', '')
root = self.root_map.get(original_root, original_root)
if root:
resource_path = os.path.join(root, resource_path)
msg = (f"Error instantiating handler "
f"class {handler_class} "
f"with Resource document {resource}. ")
if root != original_root:
msg += (f"Its 'root' field was "
f"mapped from {original_root} to {root} by root_map.")
else:
msg += (f"Its 'root' field {original_root} was "
f"*not* modified by root_map.")
error_to_raise = EventModelError(msg)
handler = _attempt_with_retries(
func=handler_class,
args=(resource_path,),
kwargs=resource['resource_kwargs'],
intervals=[0] + self.retry_intervals,
error_to_catch=IOError,
error_to_raise=error_to_raise)
return handler
def _get_handler_maybe_cached(self, resource):
"Get a cached handler for this resource or make one and cache it."
key = (resource['uid'], resource['spec'])
try:
handler = self._handler_cache[key]
except KeyError:
handler = self.get_handler(resource)
self._handler_cache[key] = handler
return handler
def fill_event(self, doc, include=None, exclude=None, inplace=None):
if inplace is None:
inplace = self._inplace
if inplace:
filled_doc = doc
else:
filled_doc = copy.deepcopy(doc)
descriptor = self._descriptor_cache[doc['descriptor']]
from_datakeys = False
self._current_state.descriptor = descriptor
try:
needs_filling = {key for key, val in doc['filled'].items()
if val is False}
except KeyError:
# This document is not telling us which, if any, keys are filled.
# Infer that none of the external data is filled.
needs_filling = {key for key, val in descriptor['data_keys'].items()
if 'external' in val}
from_datakeys = True
for key in needs_filling:
self._current_state.key = key
if exclude is not None and key in exclude:
continue
if include is not None and key not in include:
continue
try:
datum_id = doc['data'][key]
except KeyError as err:
if from_datakeys:
raise MismatchedDataKeys(
"The documents are not valid. Either because they "
"were recorded incorrectly in the first place, "
"corrupted since, or exercising a yet-undiscovered "
"bug in a reader. event['data'].keys() "
"must equal descriptor['data_keys'].keys(). "
f"event['data'].keys(): {doc['data'].keys()}, "
"descriptor['data_keys'].keys(): "
f"{descriptor['data_keys'].keys()}") from err
else:
raise MismatchedDataKeys(
"The documents are not valid. Either because they "
"were recorded incorrectly in the first place, "
"corrupted since, or exercising a yet-undiscovered "
"bug in a reader. event['filled'].keys() "
"must be a subset of event['data'].keys(). "
f"event['data'].keys(): {doc['data'].keys()}, "
"event['filled'].keys(): "
f"{doc['filled'].keys()}") from err
# Look up the cached Datum doc.
try:
datum_doc = self._datum_cache[datum_id]
except KeyError as err:
raise UnresolvableForeignKeyError(
datum_id,
f"Event with uid {doc['uid']} refers to unknown Datum "
f"datum_id {datum_id}") from err
resource_uid = datum_doc['resource']
# Look up the cached Resource.
try:
resource = self._resource_cache[resource_uid]
except KeyError as err:
raise UnresolvableForeignKeyError(
resource_uid,
f"Datum with id {datum_id} refers to unknown Resource "
f"uid {resource_uid}") from err
handler = self._get_handler_maybe_cached(resource)
error_to_raise = DataNotAccessible(
f"Filler was unable to load the data referenced by "
f"the Datum document {datum_doc} and the Resource "
f"document {resource}.")
payload = _attempt_with_retries(
func=handler,
args=(),
kwargs=datum_doc['datum_kwargs'],
intervals=[0] + self.retry_intervals,
error_to_catch=IOError,
error_to_raise=error_to_raise)
# Here we are intentionally modifying doc in place.
filled_doc['data'][key] = payload
filled_doc['filled'][key] = datum_id
self._current_state.key = None
self._current_state.descriptor = None
return filled_doc
def descriptor(self, doc):
self._descriptor_cache[doc['uid']] = doc
return doc
def __enter__(self):
return self
def close(self):
# Drop references to the caches. If the user holds another reference to
# them it's the user's problem to manage their lifecycle. If the user
# does not (e.g. they are the default caches) the gc will look after
# them.
self._closed = True
self._handler_cache = None
self._resource_cache = None
self._datum_cache = None
def __exit__(self, *exc_details):
self.close()
def __call__(self, name, doc, validate=False):
if self._closed:
raise EventModelRuntimeError(
"This Filler has been closed and is no longer usable.")
return super().__call__(name, doc, validate)
def _attempt_with_retries(func, args, kwargs,
intervals,
error_to_catch, error_to_raise):
"""
Return func(*args, **kwargs), using a retry loop.
func, args, kwargs: self-explanatory
intervals: list
How long to wait (seconds) between each attempt including the first.
error_to_catch: Exception class
If this is raised, retry.
error_to_raise: Exception instance or class
If we run out of retries, raise this from the proximate error.
"""
error = None
for interval in intervals:
ttime.sleep(interval)
try:
return func(*args, **kwargs)
except error_to_catch as error_:
# The file may not be visible on the filesystem yet.
# Wait and try again. Stash the error in a variable
# that we can access later if we run out of attempts.
error = error_
else:
break
else:
# We have used up all our attempts. There seems to be an
# actual problem. Raise specified error from the error stashed above.
raise error_to_raise from error
class NoFiller(Filler):
"""
This does not fill the documents; it merely validates them.
It checks that all the references between the documents are resolvable and
*could* be filled. This is useful when the filling will be done later, as
a delayed computation, but we want to make sure in advance that we have all
the information that we will need when that computation occurs.
"""
def __init__(self, *args, **kwargs):
# Do not make Filler make copies because we are not going to alter the
# documents anyway.
kwargs.setdefault('inplace', True)
super().__init__(*args, **kwargs)
def fill_event_page(self, doc, include=None, exclude=None):
filled_events = []
for event_doc in unpack_event_page(doc):
filled_events.append(self.fill_event(event_doc,
include=include,
exclude=exclude,
inplace=True))
filled_doc = pack_event_page(*filled_events)
return filled_doc
def fill_event(self, doc, include=None, exclude=None, inplace=None):
descriptor = self._descriptor_cache[doc['descriptor']]
from_datakeys = False
try:
needs_filling = {key for key, val in doc['filled'].items()
if val is False}
except KeyError:
# This document is not telling us which, if any, keys are filled.
# Infer that none of the external data is filled.
needs_filling = {key for key, val in descriptor['data_keys'].items()
if 'external' in val}
from_datakeys = True
for key in needs_filling:
if exclude is not None and key in exclude:
continue
if include is not None and key not in include:
continue
try:
datum_id = doc['data'][key]
except KeyError as err:
if from_datakeys:
raise MismatchedDataKeys(
"The documents are not valid. Either because they "
"were recorded incorrectly in the first place, "
"corrupted since, or exercising a yet-undiscovered "
"bug in a reader. event['data'].keys() "
"must equal descriptor['data_keys'].keys(). "
f"event['data'].keys(): {doc['data'].keys()}, "
"descriptor['data_keys'].keys(): "
f"{descriptor['data_keys'].keys()}") from err
else: