Skip to content

Commit

Permalink
[#5249] Add decorator to make django-q tasks unique
Browse files Browse the repository at this point in the history
django-q doesn't support tracking of active task.
We therefore cannot make a simple query of "is this task already running".

All that is provided is a django signal before a task is executed with the task dictionary
 https://django-q.readthedocs.io/en/latest/signals.html#before-executing-a-task .
That's unfortunately not useful for our scenario as the edge case of an unexpected shutdown
 will block the unique task from executing in the future.
Were a timeout to be added and the task to still be running, the same task could be started again.

The chosen solution therefore, is a heartbeat thread.
  • Loading branch information
MichaelAkvo committed Mar 20, 2023
1 parent 869b5c9 commit 7bed016
Show file tree
Hide file tree
Showing 7 changed files with 168 additions and 4 deletions.
45 changes: 45 additions & 0 deletions akvo/cache/heartbeat.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
import datetime
import logging
from threading import Event, Thread
from typing import Union

from django.core.cache import cache


class CacheHeartbeat(Thread):
"""
Thread to update set a cache key with a max life and refresh it as long as the thread is alive
The thread can be ended by setting the `event_end` flag
"""

def __init__(self, cache_key: str, key_timeout: float = 30.0, beat_interval: int = 3):
"""
:param cache_key: The cache key to keep alive
:param key_timeout: How long the cache key should live without the heartbeat thread
:param beat_interval: How often per timeout the key should "beat"
"""
super().__init__()
self.cache_key = cache_key
self.event_end = Event()
self.key_timeout = key_timeout
self.beat_interval = beat_interval

def run(self) -> None:
logger = logging.getLogger("akvo.rsr.CacheHeartBeat")
logger.info("Starting cache heartbeat for '%s' with timeout %s", self.cache_key, self.key_timeout)
self.event_end.clear()
while not self.event_end.is_set():
# Refresh the heartbeat
self.set_cache_value()
self.event_end.wait(self.key_timeout / self.beat_interval)

cache.delete(self.cache_key)
logger.info("Ended cache heartbeat for '%s'", self.cache_key)

def set_cache_value(self):
cache.set(self.cache_key, self.get_calc_value(), self.key_timeout)

def get_calc_value(self) -> Union[str, int, float]:
return datetime.datetime.utcnow().timestamp()
Empty file.
47 changes: 47 additions & 0 deletions akvo/rsr/tests/usecases/django_q/test_decorators.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
import datetime

from django.core.cache import cache
from django_q.models import Task

from akvo.rsr.tests.base import BaseTestCase
from akvo.rsr.usecases.django_q.decorators import (
UNIQUE_KEY_FORMAT, get_unique_cache_heartbeat, unique_task,
)


class UniqueTaskTestcase(BaseTestCase):

def setUp(self):
super().setUp()

def must_be_unique():
return "I was unique once"

self.unique_task_name = "must_be_unique"
self.unique_cache_key = UNIQUE_KEY_FORMAT.format(task_name=self.unique_task_name)
self.unique_method = unique_task(self.unique_task_name)(must_be_unique)

cache.delete(self.unique_cache_key)

def test_no_running_task(self):
self.assertEqual(self.unique_method(), "I was unique once")
self.assertFalse(self.unique_cache_key in cache)

def test_old_running_task(self):
cache.set(UNIQUE_KEY_FORMAT.format(task_name=self.unique_task_name), 0)
Task.objects.create(
id="an id",
name=self.unique_task_name,
func="must_be_unique",
started=datetime.datetime.now(),
stopped=datetime.datetime.now(),
)
self.assertEqual(self.unique_method(), "I was unique once")
self.assertFalse(self.unique_cache_key in cache)

def test_running_task(self):
cache_heartbeat = get_unique_cache_heartbeat(self.unique_cache_key)
cache_heartbeat.set_cache_value()

self.assertIsNone(self.unique_method())
self.assertTrue(self.unique_cache_key in cache)
62 changes: 62 additions & 0 deletions akvo/rsr/usecases/django_q/decorators.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
import datetime
import logging
from datetime import timedelta
from functools import wraps
from typing import Callable

from django.conf import settings
from django.core.cache import cache

from akvo.cache.heartbeat import CacheHeartbeat

UNIQUE_KEY_FORMAT = "unique_django_q:{task_name}"


def unique_task(task_name: str) -> Callable[[Callable], Callable]:
"""
Creates a decorator to ensure that the task isn't executed in parallel
:param task_name: The task's unique name
:return: A decorator
"""

def decorator(func):

@wraps(func)
def wrapper(*args, **kwargs):
logger = logging.getLogger("akvo.rsr.unique_task_wrapper")

cache_key = UNIQUE_KEY_FORMAT.format(task_name=task_name)
cached_time_utc: float = cache.get(cache_key)

if cached_time_utc:
key_timeout = settings.UNIQUE_TASK_KEY_TIMEOUT
# Key timed out?
if cached_time_utc < (datetime.datetime.utcnow() - timedelta(seconds=key_timeout)).timestamp():
cache.delete(cache_key)
else:
logger.info("%s has a valid unique task heartbeat. Skipping run...", task_name)
return

heartbeat_thread = get_unique_cache_heartbeat(cache_key)
heartbeat_thread.start()
try:
return func(*args, **kwargs)
finally:
# Let the heartbeat thread end
logger.info("Signaling '%s' heartbeat thread should end", task_name)
heartbeat_thread.event_end.set()

# Clean up the cache
try:
cache.delete(cache_key)
except:
logger.warning("Couldn't delete cache key %s", cache_key, exc_info=True)

return wrapper

return decorator


def get_unique_cache_heartbeat(cache_key):
return CacheHeartbeat(cache_key, key_timeout=settings.UNIQUE_TASK_KEY_TIMEOUT)
2 changes: 2 additions & 0 deletions akvo/rsr/usecases/jobs/aggregation.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from django.db.models import QuerySet
from django.db.transaction import atomic

from akvo.rsr.usecases.django_q.decorators import unique_task
from akvo.utils import rsr_send_mail_to_users

if TYPE_CHECKING:
Expand Down Expand Up @@ -82,6 +83,7 @@ def _create_aggregation_job(period: IndicatorPeriod) -> IndicatorPeriodAggregati
return IndicatorPeriodAggregationJob.objects.create(period=period, root_period=root_period)


@unique_task("execute_aggregation_jobs")
def execute_aggregation_jobs():
"""
Call the aggregation function for each aggregation job
Expand Down
7 changes: 6 additions & 1 deletion akvo/settings/41-django-q.conf
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
# Used by akvo.rsr.usecases.django_q.decorators.unique_task
# Determines how long the heartbeat of a unique task should linger
UNIQUE_TASK_KEY_TIMEOUT = 30
# Max time in seconds for a task to complete
ASYNC_TASK_TIMEOUT = 5 * 60
Q_CLUSTER = {
"name": "akvo-rsr",
"recycle": 500,
"timeout": 60,
"timeout": ASYNC_TASK_TIMEOUT,
"compress": True,
"save_limit": 250,
"queue_limit": 500,
Expand Down
9 changes: 6 additions & 3 deletions akvo/settings/90-finish.conf
Original file line number Diff line number Diff line change
Expand Up @@ -121,9 +121,12 @@ AKVO_JOBS = {
"cron": "*/5 * * * *",
"args": ( "perform_iati_checks", )
},
"run_aggregation_jobs": {
"func": "django.core.management.call_command",
"execute_aggregation_jobs": {
"func": "akvo.rsr.usecases.jobs.aggregation.execute_aggregation_jobs",
"cron": "* * * * *",
"args": ( "run_aggregation_jobs", )
"args": (),
"kwargs": {
"task_name": "execute_aggregation_jobs",
},
},
}

0 comments on commit 7bed016

Please sign in to comment.