Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 7 additions & 30 deletions django_celery_extensions/beat.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
try:
from celery.beat import PersistentScheduler
from celery.signals import beat_init
from celery.utils.log import get_logger
except ImportError:
raise ImproperlyConfigured('Missing celery library, please install it')

Expand All @@ -15,31 +16,7 @@
from .config import settings


# Copied from:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

new redis already supports it

# https://github.com/andymccurdy/redis-py/blob/master/redis/lock.py#L33
# Changes:
# The second line from the bottom: The original Lua script intends
# to extend time to (lock remaining time + additional time); while
# the script here extend time to a expected expiration time.
# KEYS[1] - lock name
# ARGS[1] - token
# ARGS[2] - additional milliseconds
# return 1 if the locks time was extended, otherwise 0
LUA_EXTEND_TO_SCRIPT = """
local token = redis.call('get', KEYS[1])
if not token or token ~= ARGV[1] then
return 0
end
local expiration = redis.call('pttl', KEYS[1])
if not expiration then
expiration = 0
end
if expiration < 0 then
return 0
end
redis.call('pexpire', KEYS[1], ARGV[2])
return 1
"""
logger = get_logger(__name__)


class LockedPersistentScheduler(PersistentScheduler):
Expand All @@ -51,12 +28,14 @@ class LockedPersistentScheduler(PersistentScheduler):

def tick(self, *args, **kwargs):
if self.lock:
self.lock.extend(int(self.lock_timeout))
logger.debug('beat: Extending lock...')
self.lock.extend(int(self.lock_timeout), replace_ttl=True)
res = super().tick(*args, **kwargs)
return res
return min(res, self.lock_sleep)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tick must be lower than lock_timeout


def close(self):
if self.lock:
logger.debug('beat: Releasing lock...')
self.lock.release()
self.lock = None
super().close()
Expand All @@ -71,14 +50,12 @@ def acquire_distributed_beat_lock(sender=None, **kwargs):
if not isinstance(cache, RedisCache):
raise ImproperlyConfigured('Only redis cache is allowed to use LockedPersistentScheduler')

logger.debug('beat: Acquiring lock...')
lock = caches[settings.CACHE_NAME].lock(
scheduler.lock_key,
timeout=scheduler.lock_timeout,
sleep=scheduler.lock_sleep
)

# overwrite redis-py's extend script
# which will add additional timeout instead of extend to a new timeout
lock.lua_extend = lock.redis.register_script(LUA_EXTEND_TO_SCRIPT)
lock.acquire()
scheduler.lock = lock