forked from celery/celery
-
Notifications
You must be signed in to change notification settings - Fork 40
/
database.py
96 lines (84 loc) · 3.23 KB
/
database.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
from datetime import datetime
from celery import conf
from celery.backends.base import BaseDictBackend
from celery.db.models import Task, TaskSet
from celery.db.session import ResultSession
from celery.exceptions import ImproperlyConfigured
class DatabaseBackend(BaseDictBackend):
"""The database result backend."""
def __init__(self, dburi=None, result_expires=None,
engine_options=None, **kwargs):
self.result_expires = result_expires or conf.TASK_RESULT_EXPIRES
self.dburi = dburi or conf.RESULT_DBURI
self.engine_options = dict(engine_options or {},
**conf.RESULT_ENGINE_OPTIONS or {})
if not self.dburi:
raise ImproperlyConfigured(
"Missing connection string! Do you have "
"CELERY_RESULT_DBURI set to a real value?")
super(DatabaseBackend, self).__init__(**kwargs)
def ResultSession(self):
return ResultSession(dburi=self.dburi, **self.engine_options)
def _store_result(self, task_id, result, status, traceback=None):
"""Store return value and status of an executed task."""
session = self.ResultSession()
try:
task = session.query(Task).filter(Task.task_id == task_id).first()
if not task:
task = Task(task_id)
session.add(task)
session.flush()
task.result = result
task.status = status
task.traceback = traceback
session.commit()
finally:
session.close()
return result
def _save_taskset(self, taskset_id, result):
"""Store the result of an executed taskset."""
taskset = TaskSet(taskset_id, result)
session = self.ResultSession()
try:
session.add(taskset)
session.flush()
session.commit()
finally:
session.close()
return result
def _get_task_meta_for(self, task_id):
"""Get task metadata for a task by id."""
session = self.ResultSession()
try:
task = session.query(Task).filter(Task.task_id == task_id).first()
if not task:
task = Task(task_id)
session.add(task)
session.flush()
session.commit()
if task:
return task.to_dict()
finally:
session.close()
def _restore_taskset(self, taskset_id):
"""Get taskset metadata for a taskset by id."""
session = self.ResultSession()
try:
qs = session.query(TaskSet)
taskset = qs.filter(TaskSet.taskset_id == taskset_id).first()
if taskset:
return taskset.to_dict()
finally:
session.close()
def cleanup(self):
"""Delete expired metadata."""
session = self.ResultSession()
expires = self.result_expires
try:
session.query(Task).filter(
Task.date_done < (datetime.now() - expires)).delete()
session.query(TaskSet).filter(
TaskSet.date_done < (datetime.now() - expires)).delete()
session.commit()
finally:
session.close()