Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add attempt_count to limit the number of times a filed task will be re-attempted #466

Merged
merged 7 commits into from Aug 15, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 6 additions & 0 deletions django_q/cluster.py
Expand Up @@ -478,7 +478,12 @@ def save_task(task, broker: Broker):
existing_task.stopped = task["stopped"]
existing_task.result = task["result"]
existing_task.success = task["success"]
existing_task.attempt_count = existing_task.attempt_count + 1
existing_task.save()

if Conf.MAX_ATTEMPTS > 0 and existing_task.attempt_count >= Conf.MAX_ATTEMPTS:
broker.acknowledge(task['ack_id'])

else:
Task.objects.create(
id=task["id"],
Expand All @@ -492,6 +497,7 @@ def save_task(task, broker: Broker):
result=task["result"],
group=task.get("group"),
success=task["success"],
attempt_count=1
)
except Exception as e:
logger.error(e)
Expand Down
3 changes: 3 additions & 0 deletions django_q/conf.py
Expand Up @@ -164,6 +164,9 @@ class Conf:
# Optional error reporting setup
ERROR_REPORTER = conf.get("error_reporter", {})

# Optional attempt count. set to 0 for infinite attempts
MAX_ATTEMPTS = conf.get('max_attempts', 0)

# OSX doesn't implement qsize because of missing sem_getvalue()
try:
QSIZE = Queue().qsize() == 0
Expand Down
18 changes: 18 additions & 0 deletions django_q/migrations/0013_task_attempt_count.py
@@ -0,0 +1,18 @@
# Generated by Django 3.0.7 on 2020-08-11 15:17

from django.db import migrations, models


class Migration(migrations.Migration):

dependencies = [
('django_q', '0012_auto_20200702_1608'),
]

operations = [
migrations.AddField(
model_name='task',
name='attempt_count',
field=models.IntegerField(default=0),
),
]
1 change: 1 addition & 0 deletions django_q/models.py
Expand Up @@ -29,6 +29,7 @@ class Task(models.Model):
started = models.DateTimeField(editable=False)
stopped = models.DateTimeField(editable=False)
success = models.BooleanField(default=True, editable=False)
attempt_count = models.IntegerField(default=0)

@staticmethod
def get_result(task_id):
Expand Down
35 changes: 35 additions & 0 deletions django_q/tests/test_cluster.py
Expand Up @@ -397,6 +397,41 @@ def test_bad_secret(broker, monkeypatch):
broker.delete_queue()


@pytest.mark.django_db
def test_attempt_count(broker, monkeypatch):
monkeypatch.setattr(Conf, 'MAX_ATTEMPTS', 3)
tag = uuid()
task = {'id': tag[1],
'name': tag[0],
'func': 'math.copysign',
'args': (1, -1),
'kwargs': {},
'started': timezone.now(),
'stopped': timezone.now(),
'success': False,
'result': None}
# initial save - no success
save_task(task, broker)
assert Task.objects.filter(id=task['id']).exists()
saved_task = Task.objects.get(id=task['id'])
assert saved_task.attempt_count == 1
sleep(0.5)
# second save
old_stopped = task['stopped']
task['stopped'] = timezone.now()
save_task(task, broker)
saved_task = Task.objects.get(id=task['id'])
assert saved_task.attempt_count == 2
# third save -
task['stopped'] = timezone.now()
save_task(task, broker)
saved_task = Task.objects.get(id=task['id'])
assert saved_task.attempt_count == 3
# task should be removed from queue
assert broker.queue_size() == 0



@pytest.mark.django_db
def test_update_failed(broker):
tag = uuid()
Expand Down
23 changes: 23 additions & 0 deletions docs/admin.rst
Expand Up @@ -33,6 +33,29 @@ You can resubmit a failed task back to the queue using the admins action menu.

Uses the :class:`Failure` proxy model



Customize the admin UI by creating your own ``admin.ModelAdmin`` class and use ``admin.site.unregister`` and ``admin.site.register`` to replace the default
for example:

.. code-block:: python
from django_q import models as q_models
from django_q import admin as q_admin

admin.site.unregister([q_models.Failure])
@admin.register(q_models.Failure)
class ChildClassAdmin(q_admin.FailAdmin):
list_display = (
'name',
'func',
'result',
'started',
# add attempt_count to list_display
'attempt_count'
)



Scheduled tasks
---------------

Expand Down
9 changes: 9 additions & 0 deletions docs/configure.rst
Expand Up @@ -75,6 +75,15 @@ ack_failures

When set to ``True``, also acknowledge unsuccessful tasks. This causes failed tasks to be considered as successful deliveries, thereby removing them from the task queue. Can also be set per-task by passing the ``ack_failure`` option to :func:`async_task`. Defaults to ``False``.


.. _max_attempts:

max_attempts
~~~~~~~~~~~~~

Limit the number of retry attempts for failed tasks. Set to 0 for infinite retries. Defaults to 0


.. _retry:

retry
Expand Down