/
event.py
85 lines (68 loc) · 2.5 KB
/
event.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
import json
import uuid
from typing import Any
from pyrsistent import field
from pyrsistent import freeze
from pyrsistent import m
from pyrsistent import PMap # type: ignore
from pyrsistent import pmap
from pyrsistent import PRecord
EVENT_KINDS = {'task', 'control'}
EVENT_TASK_ATTRS = {'task_id', 'task_config'}
class Event(PRecord):
def __invariant__(self):
if self.kind != 'task':
return True, 'Not a task event'
missing_attrs = EVENT_TASK_ATTRS - set(self.keys())
if missing_attrs:
return False, 'Missing task attributes: {}'.format(missing_attrs)
return True, 'Task is valid'
kind = field(type=str,
mandatory=True,
invariant=lambda x: (x in EVENT_KINDS,
'kind not in {}'.format(EVENT_KINDS)))
# we store timestamps as seconds since epoch.
# use time.time() to generate
timestamp = field(type=float, initial=0.0)
# reference to platform-specific event object
raw: Any = field(mandatory=True, initial=None)
# free-form dictionary for stack-specific data
extensions = field(type=PMap, initial=m(), factory=pmap)
# is this the last event for a task?
terminal = field(type=bool, initial=False)
# task-specific fields
# task_id this event pertains to
task_id = field(type=(str, type(None)), initial=None)
# task config dict that sourced the task this event refers to
task_config = field(
invariant=lambda x: (isinstance(x, PMap),
'task_config must inherit from PMap'),
factory=lambda x: pmap(x) if not isinstance(x, PMap) else x,
initial=m(),
)
# the task finished with exit code 0
success = field(type=(bool, type(None)), initial=None)
# platform-specific event type
platform_type = field(type=(str, type(None)), initial=None)
# control events
message = field(type=(str, type(None)), initial=None)
def task_event(**kwargs):
kwargs.setdefault('kind', 'task')
return Event(**kwargs)
def control_event(**kwargs):
kwargs.setdefault('kind', 'control')
return Event(**kwargs)
def json_serializer(o):
if isinstance(o, uuid.UUID):
return o.hex
return json.JSONEncoder.default(o)
def json_deserializer(dct):
for k, v in dct.items():
if k == "uuid":
try:
dct[k] = uuid.UUID(hex=v)
except ValueError:
dct[k] = freeze(v)
else:
dct[k] = freeze(v)
return dct