Skip to content

Commit

Permalink
Add support for INTERRUPTED signal when using threads/gevent.
Browse files Browse the repository at this point in the history
  • Loading branch information
coleifer committed Dec 29, 2020
1 parent 8cbc11f commit 76c2b37
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 2 deletions.
3 changes: 1 addition & 2 deletions docs/signals.rst
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@ The following signals are implemented by Huey:
* ``SIGNAL_REVOKED``: task is revoked and will not be executed.
* ``SIGNAL_SCHEDULED``: task is not yet ready to run and has been added to the
schedule for future execution.
* ``SIGNAL_INTERRUPTED``: task is interrupted when consumer exits - **only
available when using the `process` worker model.**
* ``SIGNAL_INTERRUPTED``: task is interrupted when consumer exits

When a signal handler is called, it will be called with the following
arguments:
Expand Down
11 changes: 11 additions & 0 deletions huey/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ def __init__(self, name='huey', results=True, store_none=False, utc=True,
self._shutdown = OrderedDict()
self._registry = Registry()
self._signal = S.Signal()
self._tasks_in_flight = set()

def create_storage(self):
# When using immediate mode, the default behavior is to use an
Expand Down Expand Up @@ -254,6 +255,11 @@ def unregister_on_shutdown(self, name=None):
name = name.__name__
return self._shutdown.pop(name, None) is not None

def notify_interrupted_tasks(self):
while self._tasks_in_flight:
task = self._tasks_in_flight.pop()
self._emit(S.SIGNAL_INTERRUPTED, task)

def signal(self, *signals):
def decorator(fn):
self._signal.connect(fn, *signals)
Expand Down Expand Up @@ -356,9 +362,11 @@ def _execute(self, task, timestamp):
task_value = None

try:
self._tasks_in_flight.add(task)
try:
task_value = task.execute()
finally:
self._tasks_in_flight.remove(task)
duration = time_clock() - start
except TaskLockedException as exc:
logger.warning('Task %s not run, unable to acquire lock.', task.id)
Expand Down Expand Up @@ -634,6 +642,9 @@ def __repr__(self):
rep += ', on error %s' % self.on_error
return rep

def __hash__(self):
return hash(self.id)

def create_id(self):
return str(uuid.uuid4())

Expand Down
2 changes: 2 additions & 0 deletions huey/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -444,6 +444,8 @@ def run(self):
health_check_ts = now
self.check_worker_health()

self.huey.notify_interrupted_tasks()

if self._restart:
self._logger.info('Consumer will restart.')
python = sys.executable
Expand Down

0 comments on commit 76c2b37

Please sign in to comment.