Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stabilized test_revoked_by_headers_complex_canvas #7877

Merged
merged 1 commit into from
Nov 3, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 46 additions & 40 deletions t/integration/test_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -231,53 +231,59 @@ def on_signature(self, sig, **headers) -> dict:
# so we let it run last in the suite to avoid
# affecting other tests until we can fix it.
@pytest.mark.order("last")
@pytest.mark.parametrize('monitoring_id', [
"4242",
[1234, uuid4().hex],
])
def test_revoked_by_headers_complex_canvas(self, manager, subtests, monitoring_id):
@flaky
def test_revoked_by_headers_complex_canvas(self, manager, subtests):
"""Testing revoking of task using a stamped header"""
try:
manager.app.backend.ensure_chords_allowed()
except NotImplementedError as e:
raise pytest.skip(e.args[0])

target_monitoring_id = isinstance(monitoring_id, list) and monitoring_id[0] or monitoring_id
for monitoring_id in ["4242", [1234, uuid4().hex]]:

class MonitoringIdStampingVisitor(StampingVisitor):
def on_signature(self, sig, **headers) -> dict:
return {'monitoring_id': target_monitoring_id, 'stamped_headers': ['monitoring_id']}

stamped_task = sleeping.si(4)
stamped_task.stamp(visitor=MonitoringIdStampingVisitor())
result = stamped_task.freeze()

canvas = [
group([stamped_task]),
chord(group([stamped_task]), sleeping.si(2)),
chord(group([sleeping.si(2)]), stamped_task),
chain(stamped_task),
group([sleeping.si(2), stamped_task, sleeping.si(2)]),
chord([sleeping.si(2), stamped_task], sleeping.si(2)),
chord([sleeping.si(2), sleeping.si(2)], stamped_task),
chain(sleeping.si(2), stamped_task),
chain(sleeping.si(2), group([sleeping.si(2), stamped_task, sleeping.si(2)])),
chain(sleeping.si(2), group([sleeping.si(2), stamped_task]), sleeping.si(2)),
chain(sleeping.si(2), group([sleeping.si(2), sleeping.si(2)]), stamped_task),
]

result.revoke_by_stamped_headers(headers={'monitoring_id': monitoring_id})

for sig in canvas:
sig_result = sig.apply_async()
with subtests.test(msg='Testing if task was revoked'):
with pytest.raises(celery.exceptions.TaskRevokedError):
sig_result.get()
assert result.status == 'REVOKED'
assert result.ready() is True
assert result.failed() is False
assert result.successful() is False
worker_state.revoked_headers.clear()
# Try to purge the queue before we start
# to attempt to avoid interference from other tests
while True:
count = manager.app.control.purge()
if count == 0:
break

target_monitoring_id = isinstance(monitoring_id, list) and monitoring_id[0] or monitoring_id

class MonitoringIdStampingVisitor(StampingVisitor):
def on_signature(self, sig, **headers) -> dict:
return {'monitoring_id': target_monitoring_id, 'stamped_headers': ['monitoring_id']}

stamped_task = sleeping.si(4)
stamped_task.stamp(visitor=MonitoringIdStampingVisitor())
result = stamped_task.freeze()

canvas = [
group([stamped_task]),
chord(group([stamped_task]), sleeping.si(2)),
chord(group([sleeping.si(2)]), stamped_task),
chain(stamped_task),
group([sleeping.si(2), stamped_task, sleeping.si(2)]),
chord([sleeping.si(2), stamped_task], sleeping.si(2)),
chord([sleeping.si(2), sleeping.si(2)], stamped_task),
chain(sleeping.si(2), stamped_task),
chain(sleeping.si(2), group([sleeping.si(2), stamped_task, sleeping.si(2)])),
chain(sleeping.si(2), group([sleeping.si(2), stamped_task]), sleeping.si(2)),
chain(sleeping.si(2), group([sleeping.si(2), sleeping.si(2)]), stamped_task),
]

result.revoke_by_stamped_headers(headers={'monitoring_id': monitoring_id})

for sig in canvas:
sig_result = sig.apply_async()
with subtests.test(msg='Testing if task was revoked'):
with pytest.raises(celery.exceptions.TaskRevokedError):
sig_result.get()
assert result.status == 'REVOKED'
assert result.ready() is True
assert result.failed() is False
assert result.successful() is False
worker_state.revoked_headers.clear()

@flaky
def test_wrong_arguments(self, manager):
Expand Down