Skip to content

Commit

Permalink
Stabilized test_revoked_by_headers_complex_canvas (#7877)
Browse files Browse the repository at this point in the history
  • Loading branch information
Nusnus committed Nov 3, 2022
1 parent 720d192 commit f64b337
Showing 1 changed file with 46 additions and 40 deletions.
86 changes: 46 additions & 40 deletions t/integration/test_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -231,53 +231,59 @@ def on_signature(self, sig, **headers) -> dict:
# so we let it run last in the suite to avoid
# affecting other tests until we can fix it.
@pytest.mark.order("last")
@pytest.mark.parametrize('monitoring_id', [
"4242",
[1234, uuid4().hex],
])
def test_revoked_by_headers_complex_canvas(self, manager, subtests, monitoring_id):
@flaky
def test_revoked_by_headers_complex_canvas(self, manager, subtests):
"""Testing revoking of task using a stamped header"""
try:
manager.app.backend.ensure_chords_allowed()
except NotImplementedError as e:
raise pytest.skip(e.args[0])

target_monitoring_id = isinstance(monitoring_id, list) and monitoring_id[0] or monitoring_id
for monitoring_id in ["4242", [1234, uuid4().hex]]:

class MonitoringIdStampingVisitor(StampingVisitor):
def on_signature(self, sig, **headers) -> dict:
return {'monitoring_id': target_monitoring_id, 'stamped_headers': ['monitoring_id']}

stamped_task = sleeping.si(4)
stamped_task.stamp(visitor=MonitoringIdStampingVisitor())
result = stamped_task.freeze()

canvas = [
group([stamped_task]),
chord(group([stamped_task]), sleeping.si(2)),
chord(group([sleeping.si(2)]), stamped_task),
chain(stamped_task),
group([sleeping.si(2), stamped_task, sleeping.si(2)]),
chord([sleeping.si(2), stamped_task], sleeping.si(2)),
chord([sleeping.si(2), sleeping.si(2)], stamped_task),
chain(sleeping.si(2), stamped_task),
chain(sleeping.si(2), group([sleeping.si(2), stamped_task, sleeping.si(2)])),
chain(sleeping.si(2), group([sleeping.si(2), stamped_task]), sleeping.si(2)),
chain(sleeping.si(2), group([sleeping.si(2), sleeping.si(2)]), stamped_task),
]

result.revoke_by_stamped_headers(headers={'monitoring_id': monitoring_id})

for sig in canvas:
sig_result = sig.apply_async()
with subtests.test(msg='Testing if task was revoked'):
with pytest.raises(celery.exceptions.TaskRevokedError):
sig_result.get()
assert result.status == 'REVOKED'
assert result.ready() is True
assert result.failed() is False
assert result.successful() is False
worker_state.revoked_headers.clear()
# Try to purge the queue before we start
# to attempt to avoid interference from other tests
while True:
count = manager.app.control.purge()
if count == 0:
break

target_monitoring_id = isinstance(monitoring_id, list) and monitoring_id[0] or monitoring_id

class MonitoringIdStampingVisitor(StampingVisitor):
def on_signature(self, sig, **headers) -> dict:
return {'monitoring_id': target_monitoring_id, 'stamped_headers': ['monitoring_id']}

stamped_task = sleeping.si(4)
stamped_task.stamp(visitor=MonitoringIdStampingVisitor())
result = stamped_task.freeze()

canvas = [
group([stamped_task]),
chord(group([stamped_task]), sleeping.si(2)),
chord(group([sleeping.si(2)]), stamped_task),
chain(stamped_task),
group([sleeping.si(2), stamped_task, sleeping.si(2)]),
chord([sleeping.si(2), stamped_task], sleeping.si(2)),
chord([sleeping.si(2), sleeping.si(2)], stamped_task),
chain(sleeping.si(2), stamped_task),
chain(sleeping.si(2), group([sleeping.si(2), stamped_task, sleeping.si(2)])),
chain(sleeping.si(2), group([sleeping.si(2), stamped_task]), sleeping.si(2)),
chain(sleeping.si(2), group([sleeping.si(2), sleeping.si(2)]), stamped_task),
]

result.revoke_by_stamped_headers(headers={'monitoring_id': monitoring_id})

for sig in canvas:
sig_result = sig.apply_async()
with subtests.test(msg='Testing if task was revoked'):
with pytest.raises(celery.exceptions.TaskRevokedError):
sig_result.get()
assert result.status == 'REVOKED'
assert result.ready() is True
assert result.failed() is False
assert result.successful() is False
worker_state.revoked_headers.clear()

@flaky
def test_wrong_arguments(self, manager):
Expand Down

0 comments on commit f64b337

Please sign in to comment.