diff --git a/django_celery_extensions/beat.py b/django_celery_extensions/beat.py index 8bde642..cb8a44e 100644 --- a/django_celery_extensions/beat.py +++ b/django_celery_extensions/beat.py @@ -4,6 +4,7 @@ try: from celery.beat import PersistentScheduler from celery.signals import beat_init + from celery.utils.log import get_logger except ImportError: raise ImproperlyConfigured('Missing celery library, please install it') @@ -15,31 +16,7 @@ from .config import settings -# Copied from: -# https://github.com/andymccurdy/redis-py/blob/master/redis/lock.py#L33 -# Changes: -# The second line from the bottom: The original Lua script intends -# to extend time to (lock remaining time + additional time); while -# the script here extend time to a expected expiration time. -# KEYS[1] - lock name -# ARGS[1] - token -# ARGS[2] - additional milliseconds -# return 1 if the locks time was extended, otherwise 0 -LUA_EXTEND_TO_SCRIPT = """ - local token = redis.call('get', KEYS[1]) - if not token or token ~= ARGV[1] then - return 0 - end - local expiration = redis.call('pttl', KEYS[1]) - if not expiration then - expiration = 0 - end - if expiration < 0 then - return 0 - end - redis.call('pexpire', KEYS[1], ARGV[2]) - return 1 -""" +logger = get_logger(__name__) class LockedPersistentScheduler(PersistentScheduler): @@ -51,12 +28,14 @@ class LockedPersistentScheduler(PersistentScheduler): def tick(self, *args, **kwargs): if self.lock: - self.lock.extend(int(self.lock_timeout)) + logger.debug('beat: Extending lock...') + self.lock.extend(int(self.lock_timeout), replace_ttl=True) res = super().tick(*args, **kwargs) - return res + return min(res, self.lock_sleep) def close(self): if self.lock: + logger.debug('beat: Releasing lock...') self.lock.release() self.lock = None super().close() @@ -71,14 +50,12 @@ def acquire_distributed_beat_lock(sender=None, **kwargs): if not isinstance(cache, RedisCache): raise ImproperlyConfigured('Only redis cache is allowed to use LockedPersistentScheduler') + logger.debug('beat: Acquiring lock...') lock = caches[settings.CACHE_NAME].lock( scheduler.lock_key, timeout=scheduler.lock_timeout, sleep=scheduler.lock_sleep ) - # overwrite redis-py's extend script - # which will add additional timeout instead of extend to a new timeout - lock.lua_extend = lock.redis.register_script(LUA_EXTEND_TO_SCRIPT) lock.acquire() scheduler.lock = lock