forked from lilspikey/django-background-task
/
models.py
331 lines (285 loc) · 11.6 KB
/
models.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
# -*- coding: utf-8 -*-
from datetime import timedelta
from hashlib import sha1
import json
import logging
import os
import traceback
from compat import StringIO
from compat.models import GenericForeignKey
from django.contrib.contenttypes.models import ContentType
from django.db import models
from django.db.models import Q
from django.utils import timezone
from django.utils.six import python_2_unicode_compatible
from background_task.exceptions import InvalidTaskError
from background_task.settings import app_settings
from background_task.signals import task_failed
from background_task.signals import task_rescheduled
logger = logging.getLogger(__name__)
class TaskQuerySet(models.QuerySet):
def created_by(self, creator):
"""
:return: A Task queryset filtered by creator
"""
content_type = ContentType.objects.get_for_model(creator)
return self.filter(
creator_content_type=content_type,
creator_object_id=creator.id,
)
class TaskManager(models.Manager):
def get_queryset(self):
return TaskQuerySet(self.model, using=self._db)
def created_by(self, creator):
return self.get_queryset().created_by(creator)
def find_available(self, queue=None):
now = timezone.now()
qs = self.unlocked(now)
if queue:
qs = qs.filter(queue=queue)
ready = qs.filter(run_at__lte=now, failed_at=None)
_priority_ordering = '{}priority'.format(app_settings.BACKGROUND_TASK_PRIORITY_ORDERING)
ready = ready.order_by(_priority_ordering, 'run_at')
if app_settings.BACKGROUND_TASK_RUN_ASYNC:
currently_failed = self.failed().count()
currently_locked = self.locked(now).count()
count = app_settings.BACKGROUND_TASK_ASYNC_THREADS - \
(currently_locked - currently_failed)
if count > 0:
ready = ready[:count]
else:
ready = self.none()
return ready
def unlocked(self, now):
max_run_time = app_settings.BACKGROUND_TASK_MAX_RUN_TIME
qs = self.get_queryset()
expires_at = now - timedelta(seconds=max_run_time)
unlocked = Q(locked_by=None) | Q(locked_at__lt=expires_at)
return qs.filter(unlocked)
def locked(self, now):
max_run_time = app_settings.BACKGROUND_TASK_MAX_RUN_TIME
qs = self.get_queryset()
expires_at = now - timedelta(seconds=max_run_time)
locked = Q(locked_by__isnull=False) | Q(locked_at__gt=expires_at)
return qs.filter(locked)
def failed(self):
"""
`currently_locked - currently_failed` in `find_available` assues that
tasks marked as failed are also in processing by the running PID.
"""
qs = self.get_queryset()
return qs.filter(failed_at__isnull=False)
def new_task(self, task_name, args=None, kwargs=None,
run_at=None, priority=0, queue=None, verbose_name=None,
creator=None, repeat=None, repeat_until=None,
remove_existing_tasks=False):
"""
If `remove_existing_tasks` is True, all unlocked tasks with the identical task hash will be removed.
The attributes `repeat` and `repeat_until` are not supported at the moment.
"""
args = args or ()
kwargs = kwargs or {}
if run_at is None:
run_at = timezone.now()
task_params = json.dumps((args, kwargs), sort_keys=True)
s = "%s%s" % (task_name, task_params)
task_hash = sha1(s.encode('utf-8')).hexdigest()
if remove_existing_tasks:
Task.objects.filter(task_hash=task_hash, locked_at__isnull=True).delete()
return Task(task_name=task_name,
task_params=task_params,
task_hash=task_hash,
priority=priority,
run_at=run_at,
queue=queue,
verbose_name=verbose_name,
creator=creator,
repeat=repeat or Task.NEVER,
repeat_until=repeat_until,
)
def get_task(self, task_name, args=None, kwargs=None):
args = args or ()
kwargs = kwargs or {}
task_params = json.dumps((args, kwargs), sort_keys=True)
s = "%s%s" % (task_name, task_params)
task_hash = sha1(s.encode('utf-8')).hexdigest()
qs = self.get_queryset()
return qs.filter(task_hash=task_hash)
def drop_task(self, task_name, args=None, kwargs=None):
return self.get_task(task_name, args, kwargs).delete()
@python_2_unicode_compatible
class Task(models.Model):
# the "name" of the task/function to be run
task_name = models.CharField(max_length=190, db_index=True)
# the json encoded parameters to pass to the task
task_params = models.TextField()
# a sha1 hash of the name and params, to lookup already scheduled tasks
task_hash = models.CharField(max_length=40, db_index=True)
verbose_name = models.CharField(max_length=255, null=True, blank=True)
# what priority the task has
priority = models.IntegerField(default=0, db_index=True)
# when the task should be run
run_at = models.DateTimeField(db_index=True)
# Repeat choices are encoded as number of seconds
# The repeat implementation is based on this encoding
HOURLY = 3600
DAILY = 24 * HOURLY
WEEKLY = 7 * DAILY
EVERY_2_WEEKS = 2 * WEEKLY
EVERY_4_WEEKS = 4 * WEEKLY
NEVER = 0
REPEAT_CHOICES = (
(HOURLY, 'hourly'),
(DAILY, 'daily'),
(WEEKLY, 'weekly'),
(EVERY_2_WEEKS, 'every 2 weeks'),
(EVERY_4_WEEKS, 'every 4 weeks'),
(NEVER, 'never'),
)
repeat = models.BigIntegerField(choices=REPEAT_CHOICES, default=NEVER)
repeat_until = models.DateTimeField(null=True, blank=True)
# the "name" of the queue this is to be run on
queue = models.CharField(max_length=190, db_index=True,
null=True, blank=True)
# how many times the task has been tried
attempts = models.IntegerField(default=0, db_index=True)
# when the task last failed
failed_at = models.DateTimeField(db_index=True, null=True, blank=True)
# details of the error that occurred
last_error = models.TextField(blank=True)
# details of who's trying to run the task at the moment
locked_by = models.CharField(max_length=64, db_index=True,
null=True, blank=True)
locked_at = models.DateTimeField(db_index=True, null=True, blank=True)
creator_content_type = models.ForeignKey(
ContentType, null=True, blank=True,
related_name='background_task', on_delete=models.CASCADE
)
creator_object_id = models.PositiveIntegerField(null=True, blank=True)
creator = GenericForeignKey('creator_content_type', 'creator_object_id')
objects = TaskManager()
def locked_by_pid_running(self):
"""
Check if the locked_by process is still running.
"""
if self.locked_by:
try:
# won't kill the process. kill is a bad named system call
os.kill(int(self.locked_by), 0)
return True
except:
return False
else:
return None
locked_by_pid_running.boolean = True
def has_error(self):
"""
Check if the last_error field is empty.
"""
return bool(self.last_error)
has_error.boolean = True
def params(self):
args, kwargs = json.loads(self.task_params)
# need to coerce kwargs keys to str
kwargs = dict((str(k), v) for k, v in kwargs.items())
return args, kwargs
def lock(self, locked_by):
now = timezone.now()
unlocked = Task.objects.unlocked(now).filter(pk=self.pk)
updated = unlocked.update(locked_by=locked_by, locked_at=now)
if updated:
return Task.objects.get(pk=self.pk)
return None
def _extract_error(self, type, err, tb):
file = StringIO()
traceback.print_exception(type, err, tb, None, file)
return file.getvalue()
def increment_attempts(self):
self.attempts += 1
self.save()
def has_reached_max_attempts(self):
max_attempts = app_settings.BACKGROUND_TASK_MAX_ATTEMPTS
return self.attempts >= max_attempts
def is_repeating_task(self):
return self.repeat > self.NEVER
def reschedule(self, type, err, traceback):
'''
Set a new time to run the task in future, or create a CompletedTask and delete the Task
if it has reached the maximum of allowed attempts
'''
self.last_error = self._extract_error(type, err, traceback)
self.increment_attempts()
if self.has_reached_max_attempts() or isinstance(err, InvalidTaskError):
self.failed_at = timezone.now()
logger.warning('Marking task %s as failed', self)
completed = self.create_completed_task()
task_failed.send(sender=self.__class__, task_id=self.id, completed_task=completed)
self.delete()
else:
backoff = timedelta(seconds=(self.attempts ** 4) + 5)
self.run_at = timezone.now() + backoff
logger.warning('Rescheduling task %s for %s later at %s', self,
backoff, self.run_at)
task_rescheduled.send(sender=self.__class__, task=self)
self.locked_by = None
self.locked_at = None
self.save()
def create_completed_task(self):
'''
Returns a new CompletedTask instance with the same values
'''
from background_task.models_completed import CompletedTask
completed_task = CompletedTask(
task_name=self.task_name,
task_params=self.task_params,
task_hash=self.task_hash,
priority=self.priority,
run_at=timezone.now(),
queue=self.queue,
attempts=self.attempts,
failed_at=self.failed_at,
last_error=self.last_error,
locked_by=self.locked_by,
locked_at=self.locked_at,
verbose_name=self.verbose_name,
creator=self.creator,
repeat=self.repeat,
repeat_until=self.repeat_until,
)
completed_task.save()
return completed_task
def create_repetition(self):
"""
:return: A new Task with an offset of self.repeat, or None if the self.repeat_until is reached
"""
if not self.is_repeating_task():
return None
if self.repeat_until and self.repeat_until <= timezone.now():
# Repeat chain completed
return None
args, kwargs = self.params()
new_run_at = self.run_at + timedelta(seconds=self.repeat)
while new_run_at < timezone.now():
new_run_at += timedelta(seconds=self.repeat)
new_task = TaskManager().new_task(
task_name=self.task_name,
args=args,
kwargs=kwargs,
run_at=new_run_at,
priority=self.priority,
queue=self.queue,
verbose_name=self.verbose_name,
creator=self.creator,
repeat=self.repeat,
repeat_until=self.repeat_until,
)
new_task.save()
return new_task
def save(self, *arg, **kw):
# force NULL rather than empty string
self.locked_by = self.locked_by or None
return super(Task, self).save(*arg, **kw)
def __str__(self):
return u'{}'.format(self.verbose_name or self.task_name)
class Meta:
db_table = 'background_task'