Skip to content

Commit

Permalink
Add new consumer option / argument to flush_locks() to accept names.
Browse files Browse the repository at this point in the history
  • Loading branch information
coleifer committed Nov 28, 2021
1 parent e77acf3 commit b488a36
Show file tree
Hide file tree
Showing 6 changed files with 47 additions and 7 deletions.
12 changes: 12 additions & 0 deletions docs/api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -810,6 +810,18 @@ Huey object
with huey.lock_task('db-backup'):
do_db_backup()
.. py:method:: flush_locks(*names)
:param names: additional lock-names to flush.
:returns: set of lock names that were set and subsequently cleared.

Flush any locks that may be held. Top-level tasks or functions that use
the :py:meth:`~Huey.lock_task` decorator will be registered as
import-time side-effects, but it is possible that locks in nested
scopes (e.g. a context-manager inside a task function) will not be
registered. These undiscovered locks can be flushed by passing their
lock-names explicitly.

.. py:method:: put(key, value)
:param key: key for data
Expand Down
6 changes: 6 additions & 0 deletions docs/consumer.rst
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,12 @@ their default values.
Flush all locks when starting the consumer. This may be useful if the
consumer was killed abruptly while executing a locked task.

``-L``, ``--extra-locks``
Additional lock-names to flush when starting the consumer, separated by
comma. This is useful if you have locks within context-managers that may
not be discovered during consumer startup, but you wish to ensure they are
cleared. Implies ``--flush-locks``.

``-s``, ``--scheduler-interval``
The frequency with which the scheduler should run. By default this will run
every second, but you can increase the interval to as much as 60 seconds.
Expand Down
12 changes: 10 additions & 2 deletions huey/api.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import datetime
import inspect
import itertools
import logging
import re
import time
Expand Down Expand Up @@ -597,11 +598,18 @@ def flush(self):
def lock_task(self, lock_name):
return TaskLock(self, lock_name)

def flush_locks(self):
def flush_locks(self, *names):
flushed = set()
for lock_key in self._locks:
locks = self._locks
if names:
lock_template = '%s.lock.%%s' % self.name
named_locks = (lock_template % name.strip() for name in names)
locks = itertools.chain(locks, named_locks)

for lock_key in locks:
if self.delete(lock_key):
flushed.add(lock_key.split('.lock.', 1)[-1])

return flushed

def result(self, id, blocking=False, timeout=None, backoff=1.15,
Expand Down
12 changes: 7 additions & 5 deletions huey/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,8 @@ class Consumer(object):
def __init__(self, huey, workers=1, periodic=True, initial_delay=0.1,
backoff=1.15, max_delay=10.0, scheduler_interval=1,
worker_type=WORKER_THREAD, check_worker_health=True,
health_check_interval=10, flush_locks=False):
health_check_interval=10, flush_locks=False,
extra_locks=None):

self._logger = logging.getLogger('huey.consumer')
if huey.immediate:
Expand Down Expand Up @@ -295,8 +296,9 @@ def __init__(self, huey, workers=1, periodic=True, initial_delay=0.1,

# In the event the consumer was killed while running a task that held
# a lock, this ensures that all locks are flushed before starting.
if flush_locks:
self.flush_locks()
if flush_locks or extra_locks:
lock_names = extra_locks.split(',') if extra_locks else ()
self.flush_locks(*lock_names)

# Create the scheduler process (but don't start it yet).
scheduler = self._create_scheduler()
Expand All @@ -313,9 +315,9 @@ def __init__(self, huey, workers=1, periodic=True, initial_delay=0.1,
# but it is referenced in the test-suite.
self.worker_threads.append((worker, process))

def flush_locks(self):
def flush_locks(self, *names):
self._logger.debug('Flushing locks before starting up.')
flushed = self.huey.flush_locks()
flushed = self.huey.flush_locks(*names)
if flushed:
self._logger.warning('Found stale locks: %s' % (
', '.join(key for key in flushed)))
Expand Down
3 changes: 3 additions & 0 deletions huey/consumer_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
('verbose', None),
('simple_log', None),
('flush_locks', False),
('extra_locks', None),
)
config_keys = [param for param, _ in config_defaults]

Expand Down Expand Up @@ -67,6 +68,8 @@ def get_worker_options(self):
'restarting any worker that crashes unexpectedly.')),
option('flush_locks', action='store_true', dest='flush_locks',
help=('flush all locks when starting consumer.')),
option(('L', 'extra_locks'), dest='extra_locks',
help=('additional locks to flush, separated by comma.')),
)

def get_scheduler_options(self):
Expand Down
9 changes: 9 additions & 0 deletions huey/tests/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -1306,6 +1306,15 @@ def test_flush_locks(self):
self.assertEqual(flushed, set(['lock1', 'lock2']))
self.assertEqual(self.huey.flush_locks(), set())

def test_flush_named_locks(self):
self.huey.put_if_empty('%s.lock.lock1' % self.huey.name, '1')
self.huey.put_if_empty('%s.lock.lock2' % self.huey.name, '1')
with self.huey.lock_task('lock3'):
flushed = self.huey.flush_locks('lock1', 'lock2', 'lockx')

self.assertEqual(flushed, set(['lock1', 'lock2', 'lock3']))
self.assertEqual(self.huey.flush_locks(), set())

def test_serialize_deserialize(self):
@self.huey.task()
def task_a(n):
Expand Down

0 comments on commit b488a36

Please sign in to comment.