-
Notifications
You must be signed in to change notification settings - Fork 1
/
execution.py
376 lines (317 loc) · 14.2 KB
/
execution.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
#!/usr/bin/python
# -*- coding: utf-8 -*-
# Hive Flask Quorum
# Copyright (c) 2008-2020 Hive Solutions Lda.
#
# This file is part of Hive Flask Quorum.
#
# Hive Flask Quorum is free software: you can redistribute it and/or modify
# it under the terms of the Apache License as published by the Apache
# Foundation, either version 2.0 of the License, or (at your option) any
# later version.
#
# Hive Flask Quorum is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# Apache License for more details.
#
# You should have received a copy of the Apache License along with
# Hive Flask Quorum. If not, see <http://www.apache.org/licenses/>.
__author__ = "João Magalhães <joamag@hive.pt>"
""" The author(s) of the module """
__version__ = "1.0.0"
""" The version of the module """
__revision__ = "$LastChangedRevision$"
""" The revision number of the module """
__date__ = "$LastChangedDate$"
""" The last change date of the module """
__copyright__ = "Copyright (c) 2008-2020 Hive Solutions Lda."
""" The copyright for the module """
__license__ = "Apache License, Version 2.0"
""" The license for the module """
import time
import heapq
import calendar
import datetime
import threading
from . import log
BACKGROUND = []
""" The list containing the various global registered
functions to be executed as background operations, note
that only the name is used in the list so a possible
collision of tasks is possible """
SLEEP_TIME = 0.5
""" The amount of time to sleep between iteration
this amount should be small enough to provide some
resolution level to the schedule execution """
background_t = None
""" The background execution task to be started by
the quorum execution system (global value) """
class ExecutionThread(threading.Thread):
"""
The thread to be used in the execution of "random"
"callables" for a provided time, this thread contains
a series of thread safe method for operating over
the work tuples.
"""
run_flag = True
""" The flag that controls the running operations
of the execution thread, once this value is unset
the thread is exited """
work_list = []
""" The list containing the various work descriptors
for the work to be done, this work is going to be
run in a single thread (in sequence) """
work_lock = None
""" The lock that control the access to the list of
work to be executed """
def __init__(self):
"""
Constructor of the class.
"""
threading.Thread.__init__(self, name = "Execution")
self.daemon = True
self.work_list = []
self.work_lock = threading.RLock()
def run(self):
# iterates continuously (executing work)
# while the run flag is set
while self.run_flag:
# creates a list list that will would the
# work tuples to be executed (this way the
# lock problem is avoided)
execution_list = []
# acquires the lock to access the list
# of work and execute it
self.work_lock.acquire()
# retrieves the current time, this variable
# is going to be used to check if the work in
# iteration should be run or not
current_time = time.time()
try:
# iterates continuously to execute all the
# work that can be executed in the work list
while True:
# in case there is no work pending to be
# executed must exist immediately
if not self.work_list: break
# retrieves the current work tuple to
# be used and executes it in case the
# time has passed (should be executed)
_time, callable, callback, args, kwargs = self.work_list[0]
if _time < current_time:
execution_list.append((callable, callback, args, kwargs))
heapq.heappop(self.work_list)
else:
break
finally:
# releases the work lock providing access
# to the work list
self.work_lock.release()
# iterates over all the "callables" in the execution
# list to execute their operations
for callable, callback, args, kwargs in execution_list:
# sets the initial (default) value for the error
# variable that controls the result of the execution
error = None
# executes the "callable" and logs the error in case the
# execution fails (must be done to log the error) then
# sets the error flag with the exception variable
try:
callable(*args, **kwargs)
except Exception as exception:
error = exception
log.warning(str(exception), log_trace = True)
# calls the callback method with the currently set error
# in order to notify the runtime about the problem, only
# calls the callback in case such method is defined
callback and callback(error = error)
# sleeps for a while so that the process may
# released for different tasks
time.sleep(SLEEP_TIME)
def stop(self):
self.run_flag = False
def insert_work(self, callable, args = [], kwargs = {}, target_time = None, callback = None):
target_time = target_time or time.time()
work = (target_time, callable, callback, args, kwargs)
self.work_lock.acquire()
try: heapq.heappush(self.work_list, work)
finally: self.work_lock.release()
def background(timeout = None):
def decorator(function):
_timeout = timeout or 0.0
def schedule(error = None, force = False):
if timeout == None and not force: return
target = time.time() + _timeout
insert_work(function, target, schedule)
# retrieves the name of the function and in
# case the name already exists in the global
# list of background execution tasks returns
# immediately (nothing to be done, duplicate)
fname = function.__name__
exists = fname in BACKGROUND
if exists: return function
# runs the scheduling operation on the task and
# then adds the function name to the list of already
# registered names
schedule(force = True)
BACKGROUND.append(fname)
return function
return decorator
def insert_work(callable, args = [], kwargs = {}, target_time = None, callback = None):
"""
Runs the provided callable (function, method, etc) in a separated
thread context under submission of a queue system.
It's possible to control the runtime for the execution with the
``target_time`` argument and it's also possible to be notified
of the end of the execution providing a callable to the ``callback``
parameter.
.. warning::
The execution is not guaranteed as the system process may be
interrupted and resuming of the execution would not be possible.
:type callable: Function
:param callable: The callable object to be called in a separated\
execution environment.
:type args: List
:param args: The list of unnamed argument values to be send to the\
callable upon execution.
:type args: Dictionary
:param args: The dictionary of named argument values to be send to the\
callable upon execution.
:type target_time: float
:param target_time: The target timestamp value for execution, in case\
it's not provided the current time is used as the target one.
:type callback: Function
:param callback: The callback function to be called upon finishing the\
execution of the callable, in case an error (exception) on executing\
the callback the error is passed as error argument.
"""
background_t.insert_work(
callable,
args = args,
kwargs = kwargs,
target_time = target_time,
callback = callback
)
def interval_work(
callable,
args = [],
kwargs = {},
callback = None,
initial = None,
interval = 60,
eval = None
):
initial = initial or (eval and eval()) or time.time()
composed = build_composed(callable, initial, interval, eval, callback)
insert_work(
composed,
args = args,
kwargs = kwargs,
target_time = initial,
callback = callback
)
return initial
def seconds_work(callable, offset = 0, *args, **kwargs):
eval = lambda: seconds_eval(offset)
return interval_work(callable, eval = eval, *args, **kwargs)
def minutes_work(callable, offset = 0, *args, **kwargs):
eval = lambda: minutes_eval(offset)
return interval_work(callable, eval = eval, *args, **kwargs)
def hourly_work(callable, offset = 0, *args, **kwargs):
eval = lambda: hourly_eval(offset)
return interval_work(callable, eval = eval, *args, **kwargs)
def daily_work(callable, offset = 0, *args, **kwargs):
eval = lambda: daily_eval(offset)
return interval_work(callable, eval = eval, *args, **kwargs)
def weekly_work(callable, weekday = 4, offset = 0, *args, **kwargs):
eval = lambda: weekly_eval(weekday, offset)
return interval_work(callable, eval = eval, *args, **kwargs)
def monthly_work(callable, monthday = 1, offset = 0, *args, **kwargs):
eval = lambda: monthly_eval(monthday, offset)
return interval_work(callable, eval = eval, *args, **kwargs)
def seconds_eval(offset, now = None):
now = now or datetime.datetime.utcnow()
next = now + datetime.timedelta(seconds = offset)
next_tuple = next.utctimetuple()
return calendar.timegm(next_tuple)
def minutes_eval(offset, now = None):
now = now or datetime.datetime.utcnow()
current = datetime.datetime(
year = now.year,
month = now.month,
day = now.day,
hour = now.hour,
minute = now.minute
)
next = current + datetime.timedelta(minutes = 1, seconds = offset)
next_tuple = next.utctimetuple()
return calendar.timegm(next_tuple)
def hourly_eval(offset, now = None):
now = now or datetime.datetime.utcnow()
current = datetime.datetime(year = now.year, month = now.month, day = now.day, hour = now.hour)
next = current + datetime.timedelta(hours = 1, seconds = offset)
next_tuple = next.utctimetuple()
return calendar.timegm(next_tuple)
def daily_eval(offset, now = None):
now = now or datetime.datetime.utcnow()
today = datetime.datetime(year = now.year, month = now.month, day = now.day)
tomorrow = today + datetime.timedelta(days = 1, seconds = offset)
tomorrow_tuple = tomorrow.utctimetuple()
return calendar.timegm(tomorrow_tuple)
def weekly_eval(weekday, offset, now = None):
now = now or datetime.datetime.utcnow()
today = datetime.datetime(year = now.year, month = now.month, day = now.day)
distance = (weekday - today.weekday()) % 7
weekday = today + datetime.timedelta(days = distance, seconds = offset)
if weekday < now: weekday += datetime.timedelta(days = 7)
weekday_tuple = weekday.utctimetuple()
return calendar.timegm(weekday_tuple)
def monthly_eval(monthday, offset, now = None):
now = now or datetime.datetime.utcnow()
next_year, next_month = (now.year + 1, 1) if now.month == 12 else (now.year, now.month + 1)
if now.day > monthday: month, year = (next_month, next_year)
else: month, year = (now.month, now.year)
monthday = datetime.datetime(year = year, month = month, day = monthday)
monthday = monthday + datetime.timedelta(seconds = offset)
if monthday < now:
monthday = datetime.datetime(year = next_year, month = next_month, day = monthday.day)
monthday += datetime.timedelta(seconds = offset)
monthday_tuple = monthday.utctimetuple()
return calendar.timegm(monthday_tuple)
def build_composed(callable, target_time, interval, eval, callback):
def composed(*args, **kwargs):
try:
# runs the initial callable, propagating the provided normal arguments
# and keyword based ones to the callable as it's expected by the current
# underlying running logic (and by the specification)
result = callable(*args, **kwargs)
finally:
if eval:
# in case the evaluation function for the next timing exists it must be
# called to be able to retrieve the target timing for the next execution
# this is required from a specification point of view (dual mode)
next_time = eval()
else:
# retrieves the current time value as the final value of execution, then
# calculates the delta value and uses it to verify if the current work is
# allowed for initial based time delta calculus (avoiding queue starvation)
final = time.time()
delta = final - target_time
is_valid = delta < interval
if is_valid: next_time = target_time + interval
else: next_time = final + interval
# builds a new callable (composed) method taking into account the state and
# inserts the work unit again into the queue of processing
composed = build_composed(callable, next_time, interval, eval, callback)
insert_work(
composed,
args = args,
kwargs = kwargs,
target_time = next_time,
callback = callback
)
# returns the current result from the original callable to the calling method,
# this is the expected behavior from the scheduler point of view
return result
return composed