/
celery_utils.py
45 lines (34 loc) · 1.45 KB
/
celery_utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
from celery import Celery
from microsetta_private_api.config_manager import SERVER_CONFIG
PACKAGE = __name__.split('.')[0]
CELERY_BACKEND_URI = 'celery_backend_uri'
CELERY_BROKER_URI = 'celery_broker_uri'
# derived from
# https://medium.com/@frassetto.stefano/flask-celery-howto-d106958a15fe
def init_celery(celery, app):
celery.conf.update(app.config)
celery.conf.task_default_queue = 'microsetta-private-api'
TaskBase = celery.Task
class ContextTask(TaskBase):
def __call__(self, *args, **kwargs):
with app.app_context():
return TaskBase.__call__(self, *args, **kwargs)
celery.Task = ContextTask
celery.autodiscover_tasks([PACKAGE])
# Set up "Celery Beat", events that run on a fixed time interval
celery.conf.beat_schedule = {
# Vioscreen tokens are good for an hour, refresh every 55 minutes
"refresh_vioscreen_token": {
"task": "microsetta_private_api.util.vioscreen.refresh_headers",
"schedule": 55 * 60
},
"update_vioscreen_sessions": {
"task": "microsetta_private_api.util.vioscreen.update_session_detail", # noqa
"schedule": 60 * 60 * 24 # every 24 hours
}
}
def make_celery(app_name):
celery_backend = SERVER_CONFIG[CELERY_BACKEND_URI]
celery_broker = SERVER_CONFIG[CELERY_BROKER_URI]
return Celery(app_name, backend=celery_backend, broker=celery_broker)
celery = make_celery(PACKAGE)