-
Notifications
You must be signed in to change notification settings - Fork 8
/
polling2.py
234 lines (165 loc) · 9.75 KB
/
polling2.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
"""Polling2 module containing all exceptions and helpers used for the polling function
Never write another polling function again.
"""
__version__ = '0.5.0'
from functools import wraps
import logging
import time
try:
from Queue import Queue
except ImportError:
from queue import Queue
LOGGER = logging.getLogger(__name__)
class PollingException(Exception):
"""Base exception that stores all return values of attempted polls"""
def __init__(self, values, last=None):
self.values = values
self.last = last
class TimeoutException(PollingException):
"""Exception raised if polling function times out"""
class MaxCallException(PollingException):
"""Exception raised if maximum number of iterations is exceeded"""
def step_constant(step):
"""Use this function when you want the step to remain fixed in every iteration
(typically good for instances when you know approximately how long the function should poll for)
:param step: a number
:return: step
"""
return step
def step_linear_double(step):
"""Use this function when you want the step to double each iteration
(e.g. like the way ArrayList works in Java).
Note that this can result in very long poll times after a few iterations
:param step: a number, that is doubled
:return: double of step
"""
return step * 2
def is_truthy(val):
"""Use this function to test if a return value is truthy
:return: boolean
"""
return bool(val)
def is_value(val):
"""Use this function to create a custom checker.
:param val: Whatever val is, the checker checks that whatever is returned from that target is that value.
:return: checker function testing if parameter is val, call checker to get a boolean
"""
def checker(_val):
return val is _val
return checker
def log_value(check_success, level=logging.DEBUG):
"""A decorator for a check_success function that logs the return_value passed to check_success.
:param level: (optional) the level at which to log the return value, defaults to debug. Must be
one of the values in logging._levelNames (i.e. an int or a string).
:return: decorator check_success function.
"""
def wrap_check_success(return_val):
LOGGER.log(level, "poll() calls check_success(%s)", return_val)
return check_success(return_val)
return wrap_check_success
def poll(target, step, args=(), kwargs=None, timeout=None, max_tries=None, check_success=is_truthy,
step_function=step_constant, ignore_exceptions=(), poll_forever=False, collect_values=None,
log=logging.NOTSET, log_error=logging.NOTSET):
"""Poll by calling a target function until a certain condition is met.
You must specify at least a target function to be called and the step -- base wait time between each function call.
:param step: Step defines the amount of time to wait (in seconds)
:param args: Arguments to be passed to the target function
:type kwargs: dict
:param kwargs: Keyword arguments to be passed to the target function
:param timeout: The target function will be called until the time elapsed is greater than the maximum timeout
(in seconds). NOTE timeout == 0 or timeout == None is equivalent to setting poll_forever=True.
NOTE that the actual execution time of the function *can* exceed the time specified in the timeout.
For instance, if the target function takes 10 seconds to execute and the timeout is 21 seconds, the polling
function will take a total of 30 seconds (two iterations of the target --20s which is less than the timeout--21s,
and a final iteration).
:param max_tries: Maximum number of times the target function will be called before failing
:param check_success: A callback function that accepts the return value of the target function. It should
return true if you want the polling function to stop and return this value. It should return false if you want it
to continue executing. The default is a callback that tests for truthiness (anything not False, 0, or empty
collection).
:param step_function: A callback function that accepts each iteration's "step." By default, this is constant,
but you can also pass a function that will increase or decrease the step. As an example, you can increase the wait
time between calling the target function by 10 seconds every iteration until the step is 100 seconds--at which
point it should remain constant at 100 seconds
>>> def my_step_function(step):
>>> step += 10
>>> return max(step, 100)
:type ignore_exceptions: tuple
:param ignore_exceptions: You can specify a tuple of exceptions that should be caught and ignored on every
iteration. If the target function raises one of these exceptions, it will be caught and the exception
instance will be pushed to the queue of values collected during polling. Any other exceptions raised will be
raised as normal.
:param poll_forever: If set to true, this function will retry until an exception is raised or the target's
return value satisfies the check_success function. If this is not set, then a timeout or a max_tries must be set.
:type collect_values: Queue
:param collect_values: By default, polling will create a new Queue to store all of the target's return values.
Optionally, you can specify your own queue to collect these values for access to it outside of function scope.
:type log: int or str, one of logging._levelNames
:param log: (optional) By default, return values passed to check_success are not logged. However, if this param is
set to a log level greater than NOTSET, then the return values passed to check_success will be logged.
This is done by using the decorator log_value.
:type log_error: int or str, one of logging._levelNames
:param log_level: (optional) If ignore_exception has been set, you might want to log the exceptions that are
ignored. If the log_error level is greater than NOTSET, then any caught exceptions will be logged at that
level. Note: the logger.exception() function is not used. That would print the stacktrace in the logs. Because
you are ignoring these exceptions, it seems unlikely that'd you'd want a full stack trace for each exception.
However, if you do what this, you can retrieve the exceptions using the collect_values parameter.
:return: Polling will return first value from the target function that meets the condions of the check_success
callback. By default, this will be the first value that is not None, 0, False, '', or an empty collection.
Note: a message is written to polling2 logger when poll() is called. This logs a message like so:
>>> Begin poll(target=<>, step=<>, timeout=<>, max_tries=<>, poll_forever=<>)
This message should allow a user to work-out how long the poll could take, and thereby detect a hang in real-time
if the poll takes longer than it should.
"""
assert (timeout is not None or max_tries is not None) or poll_forever, \
('You did not specify a maximum number of tries or a timeout. Without either of these set, the polling '
'function will poll forever. If this is the behavior you want, pass "poll_forever=True"')
assert not ((timeout is not None or max_tries is not None) and poll_forever), \
'You cannot specify both the option to poll_forever and max_tries/timeout.'
kwargs = kwargs or dict()
values = collect_values or Queue()
timeout = time.time() + timeout if timeout else None
tries = 0
# Always log what polling is about to take place.
msg = ("Begin poll(target=%s, step=%s, timeout=%s, max_tries=%s, poll_forever=%s)")
LOGGER.debug(msg, target, step, timeout, max_tries, poll_forever)
if log:
check_success = log_value(check_success, level=log)
last_item = None
while True:
if max_tries is not None and tries >= max_tries:
raise MaxCallException(values, last_item)
try:
val = target(*args, **kwargs)
last_item = val
except ignore_exceptions as e:
last_item = e
if log_error: # NOTSET is 0, so it'll evaluate to False.
LOGGER.log(log_error, "poll() ignored exception %r", e)
else:
# Condition passes, this is the only "successful" exit from the polling function
if check_success(val):
return val
values.put(last_item)
tries += 1
# Check the max tries at this point so it will not sleep before raising the exception
if max_tries is not None and tries >= max_tries:
raise MaxCallException(values, last_item)
# Check the time after to make sure the poll function is called at least once
if timeout is not None and time.time() >= timeout:
raise TimeoutException(values, last_item)
time.sleep(step)
step = step_function(step)
def poll_decorator(step, timeout=None, max_tries=None, check_success=is_truthy,
step_function=step_constant, ignore_exceptions=(), poll_forever=False,
collect_values=None, log=logging.NOTSET, log_error=logging.NOTSET):
"""Use poll() as a decorator.
:return: decorator using poll()"""
def decorator(target):
@wraps(target)
def wrapper(*args, **kwargs):
return poll(target=target, step=step, args=args, kwargs=kwargs, timeout=timeout, max_tries=max_tries,
check_success=check_success, step_function=step_function, ignore_exceptions=ignore_exceptions,
poll_forever=poll_forever, collect_values=collect_values, log=log, log_error=log_error)
return wrapper
return decorator