Skip to content

Commit

Permalink
Add support for pipelines with task instances.
Browse files Browse the repository at this point in the history
This also enables support for chaining one pipeline to another.

Fixes #491
  • Loading branch information
coleifer committed Feb 23, 2020
1 parent f30d147 commit bc8bb73
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 2 deletions.
14 changes: 12 additions & 2 deletions huey/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -653,14 +653,24 @@ def then(self, task, *args, **kwargs):
if self.on_complete:
self.on_complete.then(task, *args, **kwargs)
else:
self.on_complete = task.s(*args, **kwargs)
if isinstance(task, Task):
if args: task.extend_data(args)
if kwargs: task.extend_data(kwargs)
else:
task = task.s(*args, **kwargs)
self.on_complete = task
return self

def error(self, task, *args, **kwargs):
if self.on_error:
self.on_error.error(task, *args, **kwargs)
else:
self.on_error = task.s(*args, **kwargs)
if isinstance(task, Task):
if args: task.extend_data(args)
if kwargs: task.extend_data(kwargs)
else:
task = task.s(*args, **kwargs)
self.on_error = task
return self

def execute(self):
Expand Down
31 changes: 31 additions & 0 deletions huey/tests/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -958,13 +958,44 @@ def mul(a, b):
pipe = add.s(1, 2).then(mul, 4).then(add, -5).then(mul, 3).then(add, 8)
self.assertPipe(pipe, [3, 12, 7, 21, 29])

def test_mixed_tasks_instances(self):
@self.huey.task()
def add(a, b):
return a + b
@self.huey.task()
def mul(a, b):
return a * b

t1 = add.s(1, 2)
t2 = add.s(3)
t3 = mul.s(4)
t4 = mul.s()
p1 = t1.then(t2)
p2 = p1.then(t3).then(t4, 5)
self.assertPipe(p2, [3, 6, 24, 120])

def test_task_instances_args_kwargs(self):
@self.huey.task()
def add(a, b, c=None):
s = a + b
if c is not None:
s += c
return s

t1 = add.s(1, 2)
t2 = add.s()
t3 = add.s()
p1 = t1.then(t2, 3).then(t3, 4, c=5)
self.assertPipe(p1, [3, 6, 15])

def assertPipe(self, pipeline, expected):
results = self.huey.enqueue(pipeline)
for _ in range(len(results)):
self.assertEqual(len(self.huey), 1)
self.execute_next()

self.assertEqual(len(self.huey), 0)
self.assertEqual(len(results), len(expected))
self.assertEqual([r() for r in results], expected)

def test_error_callback(self):
Expand Down

0 comments on commit bc8bb73

Please sign in to comment.