Skip to content

Commit

Permalink
Fix worker crash on un-pickleable exceptions (#8133)
Browse files Browse the repository at this point in the history
* Fix worker crash on unpickleable exceptions

* Move logic to wrap unpicklable exception into the Retry class (revert modifications to handle_retry)

* Add test and fix handle_ignore not representing the wrapped exception correctly

---------

Co-authored-by: Alessio Bogon <alessio.bogon@b2c2.com>
  • Loading branch information
youtux and alessio-b2c2 committed Mar 23, 2023
1 parent 347553f commit ab34d34
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 3 deletions.
2 changes: 1 addition & 1 deletion celery/app/trace.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ def handle_failure(self, task, req, store_errors=True, call_errbacks=True):
exc = self.retval
# make sure we only send pickleable exceptions back to parent.
einfo = ExceptionInfo()
einfo.exception = get_pickleable_exception(einfo.exception)
einfo.exception.exc = get_pickleable_exception(einfo.exception.exc)
einfo.type = get_pickleable_etype(einfo.type)

task.backend.mark_as_failure(
Expand Down
4 changes: 3 additions & 1 deletion celery/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,8 @@
'CeleryCommandException',
)

from celery.utils.serialization import get_pickleable_exception

UNREGISTERED_FMT = """\
Task of kind {0} never registered, please make sure it's imported.\
"""
Expand Down Expand Up @@ -160,7 +162,7 @@ def __init__(self, message=None, exc=None, when=None, is_eager=False,
if isinstance(exc, str):
self.exc, self.excs = None, exc
else:
self.exc, self.excs = exc, safe_repr(exc) if exc else None
self.exc, self.excs = get_pickleable_exception(exc), safe_repr(exc) if exc else None
self.when = when
self.is_eager = is_eager
self.sig = sig
Expand Down
4 changes: 3 additions & 1 deletion celery/utils/serialization.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,9 @@ class UnpickleableExceptionWrapper(Exception):
exc_args = None

def __init__(self, exc_module, exc_cls_name, exc_args, text=None):
safe_exc_args = ensure_serializable(exc_args, pickle.dumps)
safe_exc_args = ensure_serializable(
exc_args, lambda v: pickle.loads(pickle.dumps(v))
)
self.exc_module = exc_module
self.exc_cls_name = exc_cls_name
self.exc_args = safe_exc_args
Expand Down
30 changes: 30 additions & 0 deletions t/unit/tasks/test_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from celery.contrib.testing.mocks import ContextMock
from celery.exceptions import Ignore, ImproperlyConfigured, Retry
from celery.result import AsyncResult, EagerResult
from celery.utils.serialization import UnpickleableExceptionWrapper

try:
from urllib.error import HTTPError
Expand Down Expand Up @@ -215,6 +216,13 @@ def retry_task_customexc(self, arg1, arg2, kwarg=1, **kwargs):

self.retry_task_customexc = retry_task_customexc

@self.app.task(bind=True, max_retries=3, iterations=0, shared=False)
def retry_task_unpickleable_exc(self, foo, bar):
self.iterations += 1
raise self.retry(countdown=0, exc=UnpickleableException(foo, bar))

self.retry_task_unpickleable_exc = retry_task_unpickleable_exc

@self.app.task(bind=True, autoretry_for=(ZeroDivisionError,),
shared=False)
def autoretry_task_no_kwargs(self, a, b):
Expand Down Expand Up @@ -389,6 +397,13 @@ class MyCustomException(Exception):
"""Random custom exception."""


class UnpickleableException(Exception):
"""Exception that doesn't survive a pickling roundtrip (dump + load)."""
def __init__(self, foo, bar):
super().__init__(foo)
self.bar = bar


class test_task_retries(TasksCase):

def test_retry(self):
Expand Down Expand Up @@ -540,6 +555,21 @@ def test_retry_with_custom_exception(self):
result.get()
assert self.retry_task_customexc.iterations == 3

def test_retry_with_unpickleable_exception(self):
self.retry_task_unpickleable_exc.max_retries = 2
self.retry_task_unpickleable_exc.iterations = 0

result = self.retry_task_unpickleable_exc.apply(
["foo", "bar"]
)
with pytest.raises(UnpickleableExceptionWrapper) as exc_info:
result.get()

assert self.retry_task_unpickleable_exc.iterations == 3

exc_wrapper = exc_info.value
assert exc_wrapper.exc_args == ("foo", )

def test_max_retries_exceeded(self):
self.retry_task.max_retries = 2
self.retry_task.iterations = 0
Expand Down

0 comments on commit ab34d34

Please sign in to comment.