Skip to content

Commit

Permalink
Add option (default=True) for rescheduling callbacks along w/task.
Browse files Browse the repository at this point in the history
Also reschedule the original priority and support overriding priority
when rescheduling.
  • Loading branch information
coleifer committed Mar 11, 2023
1 parent e01f71b commit eacdd85
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 5 deletions.
14 changes: 12 additions & 2 deletions huey/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -996,21 +996,31 @@ def revoke(self, revoke_once=True):
def restore(self):
return self.huey.restore(self.task)

def reschedule(self, eta=None, delay=None, expires=None):
def reschedule(self, eta=None, delay=None, expires=None, priority=None,
preserve_pipeline=True):
# Rescheduling works by revoking the currently-scheduled task (nothing
# is done to check if the task has already run, however). Then the
# original task's data is used to enqueue a new task with a new task ID
# and execution_time.
self.revoke()
if eta is not None or delay is not None:
eta = normalize_time(eta, delay, self.huey.utc)
if preserve_pipeline:
on_complete = self.task.on_complete
on_error = self.task.on_error
else:
on_complete = on_error = None

task = type(self.task)(
self.task.args,
self.task.kwargs,
eta=eta,
retries=self.task.retries,
retry_delay=self.task.retry_delay,
expires=expires if expires is not None else self.task.expires)
priority=priority if priority is not None else self.task.priority,
expires=expires if expires is not None else self.task.expires,
on_complete=on_complete,
on_error=on_error)
return self.huey.enqueue(task)

def reset(self):
Expand Down
40 changes: 37 additions & 3 deletions huey/tests/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,9 +131,10 @@ def test_schedule_s(self):
def add(a, b):
return a + b

task = add.s((1, 2), delay=10)
task = task.then(add, (3,), delay=20)
self.huey.execute(task)
task = add.s(1, 2, delay=10)
task = task.then(add, 3, delay=20)
r1, r2 = self.huey.enqueue(task)
self.execute_next()

sched = self.huey.scheduled()
self.assertEqual(len(sched), 1)
Expand All @@ -148,6 +149,16 @@ def add(a, b):
self.assertFalse(self.huey.ready_to_run(oc, t10))
self.assertTrue(self.huey.ready_to_run(oc, t20))

r = r1.reschedule()
self.assertEqual(len(self.huey), 1)
self.assertEqual(self.execute_next(), 3)

self.assertEqual(len(self.huey), 1)
task = self.huey.dequeue()
self.assertFalse(self.huey.ready_to_run(task))
self.assertTrue(self.huey.ready_to_run(task, t20))
self.assertEqual(self.huey.execute(task, t20), 6)

def test_revoke_task(self):
state = {}
@self.huey.task()
Expand Down Expand Up @@ -568,6 +579,29 @@ def task_s(task=None):
self.assertEqual(self.huey.execute(task), True)
self.assertEqual(state, [res3.id])

def test_reschedule_priority(self):
state = []

@self.huey.task(context=True)
def task_s(task=None):
state.append(task.id)
return True

res = task_s(priority=10)
res2 = res.reschedule(priority=99)
self.assertEqual(len(self.huey), 2)

task = self.huey.dequeue()
self.assertEqual(task.priority, 99)
self.assertTrue(self.huey.execute(task))

task = self.huey.dequeue()
self.assertEqual(task.priority, 10)
self.assertTrue(res.is_revoked())
self.assertTrue(self.huey.execute(task) is None)

self.assertEqual(state, [res2.id])

def test_task_error(self):
@self.huey.task()
def task_e(n):
Expand Down

0 comments on commit eacdd85

Please sign in to comment.