Skip to content

Commit

Permalink
Add option to override delay and eta with RetryTask exception.
Browse files Browse the repository at this point in the history
  • Loading branch information
coleifer committed Apr 28, 2023
1 parent aeaa36e commit fa999de
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 5 deletions.
6 changes: 5 additions & 1 deletion docs/api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -1426,12 +1426,16 @@ Exceptions
more retries remaining**. Similarly, if ``retry=True`` then the task will
be retried regardless.

.. py:class:: RetryTask
.. py:class:: RetryTask(msg=None, delay=None, eta=None)
Raised by user code from within a :py:meth:`~Huey.task` function to force a
retry. When this exception is raised, the task will be retried irrespective
of whether it is configured with automatic retries.

If ``delay`` or ``eta`` is specified, then any ``retry_delay`` set on the
task will be overridden and the value specified will be used to determine
when the task will be retried next.

.. py:class:: TaskException
General exception raised by :py:class:`Result` handles when reading the
Expand Down
12 changes: 9 additions & 3 deletions huey/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,7 @@ def _execute(self, task, timestamp):

start = time_clock()
exception = None
retry_eta = None
task_value = None

try:
Expand All @@ -388,6 +389,8 @@ def _execute(self, task, timestamp):
except RetryTask as exc:
logger.info('Task %s raised RetryTask, retrying.', task.id)
task.retries += 1
if exc.eta or exc.delay is not None:
retry_eta = normalize_time(exc.eta, exc.delay, self.utc)
exception = exc
except CancelExecution as exc:
if exc.retry or (exc.retry is None and task.retries):
Expand Down Expand Up @@ -440,14 +443,17 @@ def _execute(self, task, timestamp):

if exception is not None and task.retries:
self._emit(S.SIGNAL_RETRYING, task)
self._requeue_task(task, self._get_timestamp())
self._requeue_task(task, self._get_timestamp(), retry_eta)

return task_value

def _requeue_task(self, task, timestamp):
def _requeue_task(self, task, timestamp, retry_eta=None):
task.retries -= 1
logger.info('Requeueing %s, %s retries', task.id, task.retries)
if task.retry_delay:
if retry_eta is not None:
task.eta = retry_eta
self.add_schedule(task)
elif task.retry_delay:
delay = datetime.timedelta(seconds=task.retry_delay)
task.eta = timestamp + delay
self.add_schedule(task)
Expand Down
5 changes: 4 additions & 1 deletion huey/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@ class CancelExecution(Exception):
def __init__(self, retry=None, *args, **kwargs):
self.retry = retry
super(CancelExecution, self).__init__(*args, **kwargs)
class RetryTask(Exception): pass
class RetryTask(Exception):
def __init__(self, msg=None, eta=None, delay=None, *args, **kwargs):
self.eta, self.delay = eta, delay
super(RetryTask, self).__init__(msg, *args, **kwargs)
class TaskException(Exception):
def __init__(self, metadata=None, *args):
self.metadata = metadata or {}
Expand Down
32 changes: 32 additions & 0 deletions huey/tests/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -808,6 +808,38 @@ def task_p():
dt = datetime.datetime.now() + datetime.timedelta(seconds=61)
self.assertTrue(self.huey.ready_to_run(task, dt))

def test_retrytask_eta_delay(self):
@self.huey.task(retry_delay=10)
def task_a(d=None, e=None):
raise RetryTask(delay=d, eta=e)

seconds = lambda s: (datetime.datetime.now() +
datetime.timedelta(seconds=s))

def run_iteration(d, e):
r = task_a(d=d, e=e)
self.assertTrue(self.execute_next() is None)
self.assertRaises(TaskException, r.get)
self.assertEqual(len(self.huey), 0)
self.assertEqual(len(self.huey.scheduled()), 1)

task, = self.huey.scheduled()
self.assertEqual(task.id, r.id)
self.assertFalse(self.huey.ready_to_run(task))
if e is not None:
self.assertEqual(task.eta, e)
else:
d = d or 10 # Default.
self.assertTrue(seconds(d - 3) < task.eta < seconds(d + 3))

self.huey.flush()

run_iteration(None, None)
run_iteration(3, None)
run_iteration(300, None)
run_iteration(None, seconds(3))
run_iteration(None, seconds(300))

def test_retrytask_explicit(self):
state = [0]

Expand Down

0 comments on commit fa999de

Please sign in to comment.