Skip to content

Commit

Permalink
The bugfix in PR #7934 created a new bug with nested group stamping o…
Browse files Browse the repository at this point in the history
…n task replace.

This adds a new test case to reproduce it + fix.
New test case: test_replace_group_merge_stamps()
  • Loading branch information
Nusnus committed Dec 1, 2022
1 parent b2f456b commit 437ef18
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 4 deletions.
4 changes: 3 additions & 1 deletion celery/app/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

from celery import current_app, states
from celery._state import _task_stack
from celery.canvas import _chain, group, signature
from celery.canvas import GroupStampingVisitor, _chain, group, signature
from celery.exceptions import Ignore, ImproperlyConfigured, MaxRetriesExceededError, Reject, Retry
from celery.local import class_property
from celery.result import EagerResult, denied_join_result
Expand Down Expand Up @@ -954,6 +954,8 @@ def replace(self, sig):
sig |= signature(t, app=self.app)
# Stamping sig with parents groups
if self.request.stamps:
groups = self.request.stamps.get("groups")
sig.stamp(visitor=GroupStampingVisitor(groups=groups, stamped_headers=self.request.stamped_headers))
stamped_headers = self.request.stamped_headers.copy()
stamps = self.request.stamps.copy()
stamped_headers.extend(sig.options.get('stamped_headers', []))
Expand Down
8 changes: 5 additions & 3 deletions t/integration/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from time import sleep

from celery import Signature, Task, chain, chord, group, shared_task
from celery.canvas import StampingVisitor
from celery.canvas import StampingVisitor, signature
from celery.exceptions import SoftTimeLimitExceeded
from celery.utils.log import get_task_logger

Expand Down Expand Up @@ -445,5 +445,7 @@ def replaced_with_me():


@shared_task(bind=True, base=StampedTaskOnReplace)
def replace_with_stamped_task(self: StampedTaskOnReplace):
self.replace(replaced_with_me.s())
def replace_with_stamped_task(self: StampedTaskOnReplace, replace_with=None):
if replace_with is None:
replace_with = replaced_with_me.s()
self.replace(signature(replace_with))
28 changes: 28 additions & 0 deletions t/integration/test_canvas.py
Original file line number Diff line number Diff line change
Expand Up @@ -3281,3 +3281,31 @@ def on_signature(self, sig, **headers) -> dict:
# stamped_task needs to be stamped with CustomStampingVisitor
# and the replaced task with both CustomStampingVisitor and StampOnReplace
assert assertion_result, 'All of the tasks should have been stamped'

def test_replace_group_merge_stamps(self, manager):
""" Test that replacing a group signature keeps the previous and new group stamps """

x = 5
y = 6

@task_received.connect
def task_received_handler(**kwargs):
request = kwargs['request']
nonlocal assertion_result
nonlocal gid1

assertion_result = all([
assertion_result,
request.stamps['groups'][0] == gid1,
len(request.stamps['groups']) == 2
if any([request.args == [10, x], request.args == [10, y]]) else True
])

sig = add.s(3, 3) | add.s(4) | group(add.s(x), add.s(y))
sig = group(add.s(1, 1), add.s(2, 2), replace_with_stamped_task.s(replace_with=sig))
assertion_result = False
sig.delay()
assertion_result = True
gid1 = sig.options['task_id']
sleep(1)
assert assertion_result, 'Group stamping is corrupted'

0 comments on commit 437ef18

Please sign in to comment.