Skip to content

Commit

Permalink
fix: Preserve call/errbacks of replaced tasks
Browse files Browse the repository at this point in the history
Fixes #6441
  • Loading branch information
maybe-sybr committed May 17, 2021
1 parent 1789097 commit 190aeaf
Show file tree
Hide file tree
Showing 4 changed files with 316 additions and 65 deletions.
55 changes: 28 additions & 27 deletions celery/app/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@
from kombu.exceptions import OperationalError
from kombu.utils.uuid import uuid

from celery import current_app, group, states
from celery import current_app, states
from celery._state import _task_stack
from celery.canvas import _chain, signature
from celery.canvas import _chain, group, signature
from celery.exceptions import (Ignore, ImproperlyConfigured,
MaxRetriesExceededError, Reject, Retry)
from celery.local import class_property
Expand Down Expand Up @@ -893,39 +893,40 @@ def replace(self, sig):
raise ImproperlyConfigured(
"A signature replacing a task must not be part of a chord"
)
if isinstance(sig, _chain) and not getattr(sig, "tasks", True):
raise ImproperlyConfigured("Cannot replace with an empty chain")

# Ensure callbacks or errbacks from the replaced signature are retained
if isinstance(sig, group):
sig |= self.app.tasks['celery.accumulate'].s(index=0).set(
link=self.request.callbacks,
link_error=self.request.errbacks,
)
elif isinstance(sig, _chain):
if not sig.tasks:
raise ImproperlyConfigured(
"Cannot replace with an empty chain"
)

if self.request.chain:
if "link" in sig.options:
final_task_links = sig.tasks[-1].options.setdefault("link", [])
final_task_links.extend(maybe_list(sig.options["link"]))
# Construct the new remainder of the task by chaining the signature
# we're being replaced by with signatures constructed from the
# chain elements in the current request.
for t in reversed(self.request.chain):
sig |= signature(t, app=self.app)

# Groups get uplifted to a chord so that we can link onto the body
sig |= self.app.tasks['celery.accumulate'].s(index=0)
for callback in self.request.callbacks or []:
sig.link(callback)
for errback in self.request.errbacks or []:
sig.link_error(errback)
# If the replacement signature is a chain, we need to push callbacks
# down to the final task so they run at the right time even if we
# proceed to link further tasks from the original request below
if isinstance(sig, _chain) and "link" in sig.options:
final_task_links = sig.tasks[-1].options.setdefault("link", [])
final_task_links.extend(maybe_list(sig.options["link"]))
# We need to freeze the replacement signature with the current task's
# ID to ensure that we don't disassociate it from the existing task IDs
# which would break previously constructed results objects.
sig.freeze(self.request.id)
# Ensure the important options from the original signature are retained
sig.set(
chord=chord,
group_id=self.request.group,
group_index=self.request.group_index,
root_id=self.request.root_id,
)
# We need to freeze the new signature with the current task's ID to
# ensure that we don't disassociate the new signature from the existing
# task IDs which would break previously constructed results objects.
sig.freeze(self.request.id)

# If the task being replaced is part of a chain, we need to re-create
# it with the replacement signature - these subsequent tasks will
# retain their original task IDs as well
for t in reversed(self.request.chain or []):
sig |= signature(t, app=self.app)
# Finally, either apply or delay the new signature!
if self.request.is_eager:
return sig.apply().get()
else:
Expand Down
6 changes: 6 additions & 0 deletions t/integration/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,12 @@ def fail(*args):
raise ExpectedException(*args)


@shared_task(bind=True)
def fail_replaced(self, *args):
"""Replace this task with one which raises ExpectedException."""
raise self.replace(fail.si(*args))


@shared_task
def chord_error(*args):
return args
Expand Down

0 comments on commit 190aeaf

Please sign in to comment.