-
Notifications
You must be signed in to change notification settings - Fork 16
/
background_helper.py
179 lines (139 loc) · 5.56 KB
/
background_helper.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
""" collections of wrapper function for helping you to create BackgroundTask
~~BackgroundTasks:Feature~~
"""
import inspect
import pkgutil
from typing import Callable, Type, Iterable, Tuple
from huey import RedisHuey
import portality.tasks
from portality import models, constants
from portality.background import BackgroundApi, BackgroundTask
from portality.core import app
from portality.decorators import write_required
from portality.tasks.redis_huey import long_running, main_queue, configure, schedule
TaskFactory = Callable[[models.BackgroundJob], BackgroundTask]
_queue_for_action = None
def get_queue_id_by_task_queue(task_queue: RedisHuey):
if task_queue is None:
return constants.BGJOB_QUEUE_ID_UNKNOWN
elif task_queue.name == long_running.name:
return constants.BGJOB_QUEUE_ID_LONG
elif task_queue.name == main_queue.name:
return constants.BGJOB_QUEUE_ID_MAIN
else:
app.logger.warn(f'unknown task_queue[{task_queue}]')
return constants.BGJOB_QUEUE_ID_UNKNOWN
def create_job(username, action,
queue_id=constants.BGJOB_QUEUE_ID_UNKNOWN,
task_queue: RedisHuey = None,
params=None):
""" Common way to create BackgroundJob
"""
job = models.BackgroundJob()
job.user = username
job.action = action
if params is not None:
job.params = params
if task_queue is not None:
queue_id = get_queue_id_by_task_queue(task_queue)
job.queue_id = queue_id
return job
def submit_by_bg_task_type(background_task: Type[BackgroundTask], **prepare_kwargs):
""" Common way to submit task by BackgroundTask Class
"""
user = app.config.get("SYSTEM_USERNAME")
job = background_task.prepare(user, **prepare_kwargs)
background_task.submit(job)
def execute_by_job_id(job_id, task_factory: TaskFactory):
""" Common way to execute BackgroundTask by job_id
"""
job = models.BackgroundJob.pull(job_id)
task = task_factory(job)
BackgroundApi.execute(task)
def execute_by_bg_task_type(bg_task_type: Type[BackgroundTask], **prepare_kwargs):
""" wrapper for execute by BackgroundTask
"""
user = app.config.get("SYSTEM_USERNAME")
job = bg_task_type.prepare(user, **prepare_kwargs)
task = bg_task_type(job)
BackgroundApi.execute(task)
return task
class RedisHueyTaskHelper:
def __init__(self, task_queue: RedisHuey, task_name: str):
self.task_queue = task_queue
self.task_name = task_name
@property
def queue_id(self):
return get_queue_id_by_task_queue(self.task_queue)
def register_schedule(self, fn):
fn = write_required(script=True)(fn)
fn = self.task_queue.periodic_task(schedule(self.task_name))(fn)
return fn
def register_execute(self, is_load_config=False):
def wrapper(fn):
if is_load_config:
conf = configure(self.task_name)
else:
conf = {}
fn = write_required(script=True)(fn)
fn = self.task_queue.task(**conf)(fn)
return fn
return wrapper
def _get_background_task_spec(module):
queue_id = None
task_name = None
bg_class = None
for n, member in inspect.getmembers(module):
if isinstance(member, RedisHuey):
queue_id = get_queue_id_by_task_queue(member)
elif (
inspect.isclass(member)
and issubclass(member, BackgroundTask)
and member != BackgroundTask
):
task_name = getattr(member, '__action__', None)
bg_class = member
if queue_id and task_name and bg_class:
return queue_id, task_name, bg_class
return None
def lookup_queue_for_action(action):
""" Find which queue an action is registered to, by action name """
""" Inspect the background tasks to find some useful details. Store in a singleton to reduce work. """
global _queue_for_action
if _queue_for_action is None:
_queue_for_action = {_action: _queue for _queue, _action, _class in get_all_background_task_specs()}
return _queue_for_action.get(action, constants.BGJOB_QUEUE_ID_UNKNOWN)
def get_all_background_task_specs() -> Iterable[Tuple[str, str, Type]]:
def _load_bgtask_safe(_mi):
try:
return _mi.module_finder.find_spec(_mi.name).loader.load_module(_mi.name)
except RuntimeError as e:
if 'No configuration for scheduled action' in str(e):
app.logger.warn(f'config for {_mi.name} not found')
return None
raise e
module_infos = (m for m in pkgutil.walk_packages(portality.tasks.__path__) if not m.ispkg)
modules = (_load_bgtask_safe(mi) for mi in module_infos)
modules = filter(None, modules)
bgspec_list = map(_get_background_task_spec, modules)
bgspec_list = filter(None, bgspec_list)
return bgspec_list
def get_value_safe(key, default_v, kwargs, default_cond_fn=None):
""" get value from kwargs and return default_v if condition match
"""
v = kwargs.get(key, default_v)
default_cond_fn = default_cond_fn or (lambda _v: _v is None)
if default_cond_fn(v):
v = default_v
return v
def submit_by_background_job(background_job, execute_fn):
""" Common way of `BackgroundTask.submit`
"""
background_job.save()
execute_fn.schedule(args=(background_job.id,), delay=10)
def create_execute_fn(redis_huey, task_factory: TaskFactory):
@redis_huey.task()
@write_required(script=True)
def _execute_fn(job_id):
execute_by_job_id(job_id, task_factory)
return _execute_fn