/
runner.py
184 lines (142 loc) · 6.07 KB
/
runner.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
import collections.abc
import functools
from typing import Any, Callable, Dict, Iterable, Optional, Tuple
import prefect
from prefect.engine import signals
from prefect.engine.state import Failed, Pending, State
from prefect.utilities import logging
# for backwards compatibility
ENDRUN = signals.ENDRUN
def call_state_handlers(method: Callable[..., State]) -> Callable[..., State]:
"""
Decorator that calls the Runner's `handle_state_change()` method.
If used on a Runner method that has the signature:
method(self, state: State, *args, **kwargs) -> State
this decorator will inspect the provided State and the returned State and call
the Runner's `handle_state_change()` method if they are different.
For example:
```python
@call_state_handlers
def check_if_task_is_pending(self, state: State):
if not state.is_pending()
return Failed()
return state
```
Args:
- method (Callable): a Runner method with the signature:
method(self, state: State, *args, **kwargs) -> State
Returns:
Callable: a decorated method that calls Runner.handle_state_change() if the
state it returns is different than the state it was passed.
"""
@functools.wraps(method)
def inner(self: "Runner", state: State, *args: Any, **kwargs: Any) -> State:
raise_end_run = False
raise_on_exception = prefect.context.get("raise_on_exception", False)
try:
new_state = method(self, state, *args, **kwargs)
except ENDRUN as exc:
raise_end_run = True
new_state = exc.state
# PrefectStateSignals are trapped and turned into States
except signals.PrefectStateSignal as exc:
self.logger.info(
"{name} signal raised: {rep}".format(
name=type(exc).__name__, rep=repr(exc)
)
)
if raise_on_exception:
raise exc
new_state = exc.state
except Exception as exc:
formatted = "Unexpected error: {}".format(repr(exc))
self.logger.exception(formatted, exc_info=True)
if raise_on_exception:
raise exc
new_state = Failed(formatted, result=exc)
if new_state is not state:
new_state = self.handle_state_change(old_state=state, new_state=new_state)
# if an ENDRUN was raised, reraise so it can be trapped
if raise_end_run:
raise ENDRUN(new_state)
return new_state
return inner
class Runner:
def __init__(self, state_handlers: Iterable[Callable] = None):
if state_handlers is not None and not isinstance(
state_handlers, collections.abc.Sequence
):
raise TypeError("state_handlers should be iterable.")
self.state_handlers = state_handlers or []
self.logger = logging.get_logger(type(self).__name__)
def __repr__(self) -> str:
return '<"Runner">'
def _heartbeat(self) -> bool:
return False
def initialize_run(
self, state: Optional[State], context: Dict[str, Any]
) -> Tuple[State, Dict[str, Any]]:
"""
Initializes the Task run by initializing state and context appropriately.
If the provided state is a meta state, the state it wraps is extracted.
Args:
- state (Optional[State]): the initial state of the run
- context (dict): the context to be updated with relevant information
Returns:
- tuple: a tuple of the updated state and context objects
"""
# extract possibly nested meta states -> for example a Submitted( Queued( Retry ) )
while isinstance(state, State) and state.is_meta_state():
state = state.state # type: ignore
state = state or Pending()
return state, context
def call_runner_target_handlers(self, old_state: State, new_state: State) -> State:
"""
Runners are used to execute a target object, usually a `Task` or a `Flow`, and those
objects may have state handlers of their own. This method will always be called as
the Runner's first state handler, and provides an entrypoint that can be overriden
to target either a Task or Flow's own handlers.
Args:
- old_state (State): the old (previous) state
- new_state (State): the new (current) state
Returns:
State: the new state
"""
return new_state
def handle_state_change(self, old_state: State, new_state: State) -> State:
"""
Calls any handlers associated with the Runner
This method will only be called when the state changes (`old_state is not new_state`)
Args:
- old_state (State): the old (previous) state of the task
- new_state (State): the new (current) state of the task
Returns:
State: the updated state of the task
Raises:
- PAUSE: if raised by a handler
- ENDRUN: if raised by a handler
- ENDRUN(Failed()): if any of the handlers fail unexpectedly
"""
raise_on_exception = prefect.context.get("raise_on_exception", False)
try:
# call runner's target handlers
new_state = self.call_runner_target_handlers(old_state, new_state)
# call runner's own handlers
for handler in self.state_handlers:
new_state = handler(self, old_state, new_state) or new_state
# raise pauses and ENDRUNs
except (signals.PAUSE, ENDRUN):
raise
# trap signals
except signals.PrefectStateSignal as exc:
if raise_on_exception:
raise
return exc.state
# abort on errors
except Exception as exc:
if raise_on_exception:
raise
msg = "Unexpected error while calling state handlers: {}".format(repr(exc))
self.logger.exception(msg)
raise ENDRUN(Failed(msg, result=exc)) from exc
return new_state