Skip to content

Commit

Permalink
Added test_stamping_example_canvas to validate the new stamping examp…
Browse files Browse the repository at this point in the history
…le canvas is calculated correctly using automatic tests
  • Loading branch information
Nusnus authored and auvipy committed Dec 4, 2022
1 parent 5eaa6ac commit aad5ff1
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 1 deletion.
6 changes: 6 additions & 0 deletions t/integration/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,12 @@ def add(x, y, z=None):
return x + y


@shared_task
def mul(x: int, y: int) -> int:
"""Multiply two numbers"""
return x * y


@shared_task
def write_to_file_and_return_int(file_name, i):
with open(file_name, mode='a', buffering=1) as file_handle:
Expand Down
17 changes: 16 additions & 1 deletion t/integration/test_canvas.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from .tasks import (ExpectedException, StampOnReplace, add, add_chord_to_chord, add_replaced, add_to_all,
add_to_all_to_chord, build_chain_inside_task, collect_ids, delayed_sum,
delayed_sum_with_soft_guard, errback_new_style, errback_old_style, fail, fail_replaced, identity,
ids, print_unicode, raise_error, redis_count, redis_echo, redis_echo_group_id,
ids, mul, print_unicode, raise_error, redis_count, redis_echo, redis_echo_group_id,
replace_with_chain, replace_with_chain_which_raises, replace_with_empty_chain,
replace_with_stamped_task, retry_once, return_exception, return_priority, second_order_replace1,
tsum, write_to_file_and_return_int, xsum)
Expand Down Expand Up @@ -506,6 +506,21 @@ def test_chain_of_a_chord_and_three_tasks_and_a_group(self, manager):
res = c()
assert res.get(timeout=TIMEOUT) == [8, 8]

def test_stamping_example_canvas(self, manager):
"""Test the stamping example canvas from the examples directory"""
try:
manager.app.backend.ensure_chords_allowed()
except NotImplementedError as e:
raise pytest.skip(e.args[0])

c = chain(
group(identity.s(i) for i in range(1, 4)) | xsum.s(),
chord(group(mul.s(10) for _ in range(1, 4)), xsum.s()),
)

res = c()
assert res.get(timeout=TIMEOUT) == 180

@pytest.mark.xfail(raises=TimeoutError, reason="Task is timeout")
def test_nested_chain_group_lone(self, manager):
"""
Expand Down

0 comments on commit aad5ff1

Please sign in to comment.