Skip to content

Commit

Permalink
Allow raising CancelExecution within a Task, and override retries.
Browse files Browse the repository at this point in the history
  • Loading branch information
coleifer committed Feb 22, 2022
1 parent b7aebe4 commit b17c8dc
Show file tree
Hide file tree
Showing 5 changed files with 133 additions and 4 deletions.
14 changes: 12 additions & 2 deletions docs/api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -1408,8 +1408,18 @@ Exceptions

.. py:class:: CancelExecution
Should be raised by user code within a :py:meth:`~Huey.pre_execute` hook to
signal to the consumer that the task shall be cancelled.
Cancel the execution of a task. Can be raised either within a
:py:meth:`~Huey.pre_execute` hook, or within a
:py:meth:`~Huey.task`-decorated function.

When raised from within a :py:meth:`~Huey.pre_execute` hook, this exception
signals to the consumer that the task shall be cancelled and not run.

When raised in the body of a :py:meth:`~Huey.task`-decorated function, this
exception accepts a boolean ``retry`` parameter (default is ``False``). If
``retry=False`` then the task will not be retried, **even if it has 1 or
more retries remaining**. Similarly, if ``retry=True`` then the task will
be retried regardless.

.. py:class:: RetryTask
Expand Down
33 changes: 33 additions & 0 deletions docs/guide.rst
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,39 @@ API docs:
* :py:meth:`TaskWrapper.is_revoked` for checking the status of the task
function itself.

Canceling from within a Task
^^^^^^^^^^^^^^^^^^^^^^^^^^^^

Huey provides a special :py:class:`CancelExecution` exception which can be
raised, either within a :py:meth:`~Huey.pre_execute` hook or within the body of
a :py:meth:`~Huey.task`-decorated function, to cancel the execution of the
task. Additionally, when raised from within a task, the ``CancelExecution``
exception can override the task's default retry policy, by specifying either
``retry=True/False``.

Example:

.. code:: python
@huey.task(retries=2)
def load_data():
if something_temporary_is_wrong():
# Task will be retried, even if it has run out of retries or is a
# task that does not specify any automatic retries.
raise CancelExecution(retry=True)
elif something_fatal_is_wrong():
# Task will NOT be retried, even if it has more than one retry
# remaining.
raise CancelExecution(retry=False)
elif cancel_and_maybe_retry():
# Task will only be retried if it has one or more retries
# remaining (this is the default).
raise CancelExecution()
...
For more information, see: :py:class:`CancelExecution`.

Canceling or pausing periodic tasks
-----------------------------------

Expand Down
14 changes: 13 additions & 1 deletion huey/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@


logger = logging.getLogger('huey')
_sentinel = object()


class Huey(object):
Expand Down Expand Up @@ -388,6 +389,16 @@ def _execute(self, task, timestamp):
logger.info('Task %s raised RetryTask, retrying.', task.id)
task.retries += 1
exception = exc
except CancelExecution as exc:
if exc.retry or (exc.retry is None and task.retries):
task.retries = max(task.retries, 1)
msg = '(task will be retried)'
else:
task.retries = 0
msg = '(aborted, will not be retried)'
logger.warning('Task %s raised CancelExecution %s.', task.id, msg)
self._emit(S.SIGNAL_CANCELED, task)
exception = exc
except KeyboardInterrupt:
logger.warning('Received exit signal, %s did not finish.', task.id)
self._emit(S.SIGNAL_INTERRUPTED, task)
Expand Down Expand Up @@ -444,7 +455,8 @@ def _run_pre_execute(self, task):
try:
callback(task)
except CancelExecution:
logger.warning('Task %s cancelled by %s.', task, name)
logger.warning('Task %s cancelled by %s (pre-execute).',
task, name)
raise
except Exception:
logger.exception('Unhandled exception calling pre-execute '
Expand Down
5 changes: 4 additions & 1 deletion huey/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@ class HueyException(Exception): pass
class ConfigurationError(HueyException): pass
class TaskLockedException(HueyException): pass

class CancelExecution(Exception): pass
class CancelExecution(Exception):
def __init__(self, retry=None, *args, **kwargs):
self.retry = retry
super(CancelExecution, self).__init__(*args, **kwargs)
class RetryTask(Exception): pass
class TaskException(Exception):
def __init__(self, metadata=None, *args):
Expand Down
71 changes: 71 additions & 0 deletions huey/tests/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -795,6 +795,77 @@ def task_a(n):
self.assertEqual(len(self.huey), 0)
self.assertEqual(self.huey.result_count(), 0)

def test_cancel_execution(self):
@self.huey.task()
def task_a(n=None):
raise CancelExecution(retry=n)

r = task_a()
self.assertTrue(self.execute_next() is None)
self.assertRaises(TaskException, r.get)
self.assertEqual(len(self.huey), 0)

r = task_a(False)
self.assertTrue(self.execute_next() is None)
self.assertRaises(TaskException, r.get)
self.assertEqual(len(self.huey), 0)

r = task_a(True)
self.assertTrue(self.execute_next() is None)
self.assertRaises(TaskException, r.get)
try:
r.get()
except TaskException as exc:
metadata = exc.metadata
self.assertEqual(metadata['error'], 'CancelExecution()')
self.assertEqual(metadata['retries'], 1)
self.assertEqual(len(self.huey), 1)

def test_cancel_execution_task_retries(self):
@self.huey.task(retries=2)
def task_a(n=None):
raise CancelExecution(retry=n)

# Even though the task itself declares retries, these retries are
# ignored when cancel is raised with retry=False.
r = task_a(False)
self.assertTrue(self.execute_next() is None)
self.assertRaises(TaskException, r.get)
self.assertEqual(len(self.huey), 0)

# The original task specified 2 retries. When Cancel is raised with
# retry=None, it proceeds normally.
r = task_a()
for retries in (2, 1, 0):
self.assertTrue(self.execute_next() is None)
self.assertRaises(TaskException, r.get)
try:
r.get()
except TaskException as exc:
metadata = exc.metadata
self.assertEqual(metadata['error'], 'CancelExecution()')
self.assertEqual(metadata['retries'], retries)
if retries > 0:
self.assertEqual(len(self.huey), 1)
else:
self.assertEqual(len(self.huey), 0)
r.reset()

# The original task specified 2 retries. When Cancel is raised with
# retry=True, then max(task.retries, 1) will be used.
r = task_a(True)
for retries in (2, 1, 1, 1):
self.assertTrue(self.execute_next() is None)
self.assertRaises(TaskException, r.get)
try:
r.get()
except TaskException as exc:
metadata = exc.metadata
self.assertEqual(metadata['error'], 'CancelExecution()')
self.assertEqual(metadata['retries'], retries)
self.assertEqual(len(self.huey), 1)
r.reset()

def test_read_schedule(self):
@self.huey.task()
def task_a(n):
Expand Down

0 comments on commit b17c8dc

Please sign in to comment.