-
Notifications
You must be signed in to change notification settings - Fork 74
/
Copy pathpolling.py
250 lines (191 loc) · 6.81 KB
/
polling.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
import datetime
from typing import Any, Dict, List, Optional
import polling
from linode_api4.objects import Event
class EventError(Exception):
"""
Represents a failed Linode event.
"""
def __init__(self, event_id: int, message: Optional[str]):
# Edge case, sometimes the message is populated with an empty string
if message is not None and len(message) < 1:
message = None
self.event_id = event_id
self.message = message
error_fmt = f"Event {event_id} failed"
if message is not None:
error_fmt += f": {message}"
super().__init__(error_fmt)
class TimeoutContext:
"""
TimeoutContext should be used by polling resources to track their provisioning time.
"""
def __init__(self, timeout_seconds=120):
self._start_time = datetime.datetime.now()
self._timeout_seconds = timeout_seconds
def start(self, start_time=datetime.datetime.now()):
"""
Sets the timeout start time to the current time.
:param start_time: The moment when the context started.
:type start_time: datetime
"""
self._start_time = start_time
def extend(self, seconds: int):
"""
Extends the timeout window.
:param seconds: The number of seconds to extend the timeout period by.
:type seconds: int
"""
self._timeout_seconds += seconds
@property
def expired(self):
"""
Whether the current timeout period has been exceeded.
:returns: Whether this context is expired.
:rtype: bool
"""
return self.seconds_remaining < 0
@property
def valid(self):
"""
Whether the current timeout period has not been exceeded.
:returns: Whether this context is valid.
:rtype: bool
"""
return not self.expired
@property
def seconds_remaining(self):
"""
The number of seconds until the timeout period has expired.
:returns: The number of seconds remaining in this context.
:rtype: int
"""
return self._timeout_seconds - self.seconds_since_started
@property
def seconds_since_started(self):
"""
The number of seconds since the timeout period started.
:returns: The number of seconds since the context started.
:rtype: int
"""
return (datetime.datetime.now() - self._start_time).seconds
class EventPoller:
"""
EventPoller allows modules to dynamically poll for Linode events
"""
def __init__(
self,
client: "LinodeClient",
entity_type: str,
action: str,
entity_id: Optional[int] = None,
):
self._client = client
self._entity_type = entity_type
self._entity_id = entity_id
self._action = action
# Initialize with an empty cache if no entity is specified
if self._entity_id is None:
self._previous_event_cache = {}
return
# We only want the first page of this response
result = client.get("/account/events", filters=self._build_filter())
self._previous_event_cache = {v["id"]: v for v in result["data"]}
def _build_filter(self) -> Dict[str, Any]:
"""Generates a filter dict to use in HTTP requests"""
return {
"+order": "asc",
"+order_by": "created",
"entity.id": self._entity_id,
"entity.type": self._entity_type,
"action": self._action,
}
def set_entity_id(self, entity_id: int) -> None:
"""
Sets the ID of the entity to filter on.
This is useful for create operations where
the entity id might not be known in __init__.
:param entity_id: The ID of the entity to poll for.
:type entity_id: int
"""
self._entity_id = entity_id
def _attempt_merge_event_into_cache(self, event: Dict[str, Any]):
"""
Attempts to merge the given event into the event cache.
"""
if event["id"] in self._previous_event_cache:
return
self._previous_event_cache[event["id"]] = event
def _check_has_new_event(
self, events: List[Dict[str, Any]]
) -> Optional[Dict[str, Any]]:
"""
If a new event is found in the given list, return it.
"""
for event in events:
# Ignore cached events
if event["id"] in self._previous_event_cache:
continue
return event
return None
def wait_for_next_event(
self, timeout: int = 240, interval: int = 5
) -> Event:
"""
Waits for and returns the next event matching the
poller's configuration.
:param timeout: The timeout in seconds before this polling operation will fail.
:type timeout: int
:param interval: The time in seconds to wait between polls.
:type interval: int
:returns: The resulting event.
:rtype: Event
"""
result_event: Dict[str, Any] = {}
def poll_func():
new_event = self._check_has_new_event(
self._client.get(
"/account/events", filters=self._build_filter()
)["data"]
)
event_exists = new_event is not None
if event_exists:
nonlocal result_event
result_event = new_event
self._attempt_merge_event_into_cache(new_event)
return event_exists
if poll_func():
return Event(self._client, result_event["id"], json=result_event)
polling.poll(
poll_func,
step=interval,
timeout=timeout,
)
return Event(self._client, result_event["id"], json=result_event)
def wait_for_next_event_finished(
self, timeout: int = 240, interval: int = 5
) -> Event:
"""
Waits for the next event to enter status `finished` or `notification`.
:param timeout: The timeout in seconds before this polling operation will fail.
:type timeout: int
:param interval: The time in seconds to wait between polls.
:type interval: int
:returns: The resulting event.
:rtype: Event
"""
timeout_ctx = TimeoutContext(timeout_seconds=timeout)
event = self.wait_for_next_event(timeout_ctx.seconds_remaining)
def poll_func():
event._api_get()
if event.status == "failed":
raise EventError(event.id, event.message)
return event.status in ["finished", "notification"]
if poll_func():
return event
polling.poll(
poll_func,
step=interval,
timeout=timeout_ctx.seconds_remaining,
)
return event