Skip to content

Commit

Permalink
Stabilized test_mutable_errback_called_by_chord_from_group_fail_multiple
Browse files Browse the repository at this point in the history
  • Loading branch information
Nusnus committed Oct 20, 2022
1 parent 7d4fe22 commit 89db817
Showing 1 changed file with 25 additions and 6 deletions.
31 changes: 25 additions & 6 deletions t/integration/test_canvas.py
Original file line number Diff line number Diff line change
Expand Up @@ -2472,10 +2472,8 @@ def test_immutable_errback_called_by_chord_from_group_fail_multiple(
await_redis_count(fail_task_count, redis_key=redis_key)
redis_connection.delete(redis_key)

@pytest.mark.parametrize(
"errback_task", [errback_old_style, errback_new_style, ],
)
def test_mutable_errback_called_by_chord_from_group_fail_multiple(
@pytest.mark.parametrize("errback_task", [errback_old_style, errback_new_style])
def test_mutable_errback_called_by_chord_from_group_fail_multiple_on_header_failure(
self, errback_task, manager, subtests
):
if not manager.app.conf.result_backend.startswith("redis"):
Expand All @@ -2488,11 +2486,10 @@ def test_mutable_errback_called_by_chord_from_group_fail_multiple(
fail_sigs = tuple(
fail.s() for _ in range(fail_task_count)
)
fail_sig_ids = tuple(s.freeze().id for s in fail_sigs)
errback = errback_task.s()
# Include a mix of passing and failing tasks
child_sig = group(
*(identity.si(42) for _ in range(24)), # arbitrary task count
*(identity.si(42) for _ in range(8)), # arbitrary task count
*fail_sigs,
)

Expand All @@ -2510,6 +2507,28 @@ def test_mutable_errback_called_by_chord_from_group_fail_multiple(
# is attached to the chord body which is a single task!
await_redis_count(1, redis_key=expected_redis_key)

@pytest.mark.parametrize("errback_task", [errback_old_style, errback_new_style])
def test_mutable_errback_called_by_chord_from_group_fail_multiple_on_body_failure(
self, errback_task, manager, subtests
):
if not manager.app.conf.result_backend.startswith("redis"):
raise pytest.skip("Requires redis result backend.")
redis_connection = get_redis_connection()

fail_task_count = 42
# We have to use failing task signatures with unique task IDs to ensure
# the chord can complete when they are used as part of its header!
fail_sigs = tuple(
fail.s() for _ in range(fail_task_count)
)
fail_sig_ids = tuple(s.freeze().id for s in fail_sigs)
errback = errback_task.s()
# Include a mix of passing and failing tasks
child_sig = group(
*(identity.si(42) for _ in range(8)), # arbitrary task count
*fail_sigs,
)

chord_sig = chord((identity.si(42),), child_sig)
chord_sig.link_error(errback)
for fail_sig_id in fail_sig_ids:
Expand Down

0 comments on commit 89db817

Please sign in to comment.