Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

default_channel - call to ensure_connection() loses conguration information #735

Closed
okrutny opened this issue May 15, 2017 · 12 comments
Closed

Comments

@okrutny
Copy link

okrutny commented May 15, 2017

We use kombu together with celery. Celery provides some configuration options to kombu like "max_retries". It works well with previous version (4.0.2 in pip) where default_channel property (https://github.com/celery/kombu/blob/master/kombu/connection.py#L807) was implemented like:

    self.connection
    if self._default_channel is None:

        self._default_channel = self.channel()

    return self._default_channel

Now, it's implemented like:

    self.ensure_connection()
    if self._default_channel is None:

        self._default_channel = self.channel()

    return self._default_channel

This way config options, like max_retries are provided by celery but not respected by kombu - call to ensure_connection() is done without any arguments.

@georgepsarakis
Copy link
Contributor

Your observation seems correct. The change was introduced in #724 .

@etiktin
Copy link

etiktin commented Sep 5, 2017

@georgepsarakis any updates on this?

I'm encountering the same issue using Celery 4.1.0 and Kombu 4.1.0.

The scenario I'm seeing is that if I'm trying to call a task when RabbitMQ is down, the call will be stuck forever (at least it seems that way), ignoring my retry_policy or retry=False.
If I patch the default_channel code to use self.connection instead of self.ensure_connection(), an exception is raised as expected (note that reverting the code brings back celery/celery#3921).


Here's a minimal Python 2.7 example to reproduce the issue:

# tasks.py
import time

from celery import Celery

app = Celery('tasks', broker='amqp://guest@localhost//')


@app.task
def some_long_computation():
    print('In some_long_computation, about to sleep for 2 seconds')
    time.sleep(2)
    print('Exiting some_long_computation')

if __name__ == '__main__':
    print('>>> About to call some_long_computation task')
    some_long_computation.apply_async(retry=False)
    print('>>> After calling some_long_computation task')

Run the worker:

celery -A tasks worker --loglevel=info

Execute the task in another shell/session:

python tasks.py

The task will complete and exit successfully.

Now, stop RabbitMQ using:

sudo service rabbitmq-server stop

Execute the task again, and you will see it's stuck, even though the call passed retry=False and it should have thrown an exception.

@etiktin
Copy link

etiktin commented Sep 6, 2017

As a temporary fix, I wrote the following monkey patch:

# License: New BSD (same as Kombu)

def monkey_patch_kombu(retry=True, retry_policy=None):
    """
    Applies a fix for producer being stuck forever when trying to publish a
    message. See details at: https://github.com/celery/kombu/issues/735
    :param bool retry: decides if publishing task messages will be retried
    in the case of connection loss or other connection errors (see
    `task_publish_retry` in Celery's docs)
    :param dict retry_policy: defines the default policy when retrying
    publishing a task message in the case of connection loss or other
    connection errors (see `task_publish_retry_policy` in Celery's docs)
    """
    import kombu
    assert kombu.__version__ == '4.1.0', 'Check if patch is still needed'
    from kombu import Connection

    if retry_policy is None:
        retry_policy = dict(
            max_retries=3, interval_start=0,
            interval_step=0.2, interval_max=0.2)

    if not retry or retry_policy['max_retries'] == 0:
        # Disable retries
        # Note: we use -1 instead of 0, because the retry logic in
        # kombu/utils/functional.py `retry_over_time` function checks if
        # max_retries is "truthy" before checking if the current number of
        # retries passed max_retries, so 0 won't work, but -1 will
        retry_policy['max_retries'] = -1

    @property
    def patched_default_channel(self):
        self.ensure_connection(**retry_policy)

        if self._default_channel is None:
            self._default_channel = self.channel()
        return self._default_channel

    # Patch/replace the connection module default_channel property
    Connection.default_channel = patched_default_channel

Usage: call this code once, before publishing and your retry policy should be respected (if none is provided, the default is used - i.e. up-to 3 retries).

@georgepsarakis
Copy link
Contributor

@okrutny @etiktin do you think this might be solved by #779 ?

@etiktin
Copy link

etiktin commented Sep 7, 2017

@georgepsarakis the code in #779 doesn't fix this issue. It describes a similar situation but in other areas of the code.

Here's the call stack, up-to calling default_channel when retry==False:

File "tasks.py", line 58, in <module>
    some_long_computation.apply_async(retry=False)
File ".../python2.7/site-packages/celery/app/task.py", line 536, in apply_async
    **options
File ".../python2.7/site-packages/celery/app/base.py", line 737, in send_task
    amqp.send_task_message(P, name, message, **options)
File ".../python2.7/site-packages/celery/app/amqp.py", line 554, in send_task_message
    **properties
File ".../python2.7/site-packages/kombu/messaging.py", line 181, in publish
    exchange_name, declare,
File ".../python2.7/site-packages/kombu/messaging.py", line 187, in _publish
    channel = self.channel
File ".../python2.7/site-packages/kombu/messaging.py", line 209, in _get_channel
    channel = self._channel = channel()
File ".../python2.7/site-packages/kombu/utils/functional.py", line 38, in __call__
    value = self.__value__ = self.__contract__()
File ".../python2.7/site-packages/kombu/messaging.py", line 224, in <lambda>
    channel = ChannelPromise(lambda: connection.default_channel)

Here's the call stack, up-to calling default_channel when retry==True:

File "tasks.py", line 58, in <module>
    some_long_computation.apply_async(retry=True)
File ".../python2.7/site-packages/celery/app/task.py", line 536, in apply_async
    **options
File ".../python2.7/site-packages/celery/app/base.py", line 737, in send_task
    amqp.send_task_message(P, name, message, **options)
File ".../python2.7/site-packages/celery/app/amqp.py", line 554, in send_task_message
    **properties
File ".../python2.7/site-packages/kombu/messaging.py", line 181, in publish
    exchange_name, declare,
File ".../python2.7/site-packages/kombu/connection.py", line 495, in _ensured
    return fun(*args, **kwargs)
File ".../python2.7/site-packages/kombu/messaging.py", line 187, in _publish
    channel = self.channel
File ".../python2.7/site-packages/kombu/messaging.py", line 209, in _get_channel
    channel = self._channel = channel()
File ".../python2.7/site-packages/kombu/utils/functional.py", line 38, in __call__
    value = self.__value__ = self.__contract__()
File ".../python2.7/site-packages/kombu/messaging.py", line 224, in <lambda>
    channel = ChannelPromise(lambda: connection.default_channel)

They diverge in Producer.publish. That method accepts retry and retry_policy, but it's not saved anywhere, so by the time they both call the Connection.default_channel property, that information is lost and Connection.ensure_connection is called without passing the policy and ignoring the retry flag (if retry is False, you can encode that in the retry_policy by passing max_retries=-1, see example in my previous comment). So it seems we need to save the policy and retry flag and pass them along from the producer to the connection so default_channel could use them in the call to ensure_connection.

@mgan59
Copy link

mgan59 commented Sep 12, 2017

We are in the process of upgrading to Celery 4.1 and believe we may of run into this issue. We have an internal abstraction that wrapsup our dispatching and previously with Celery 3.x we would catch the exceptions thrown from Redis.ConnectionError. This code is now hanging when Redis is turned off with no exceptions being raised. We can make adjustments on our end to prevent this from being an issue. But having this fixed would be helpful :)

@auvipy
Copy link
Member

auvipy commented Jan 13, 2018

what is the resolution in master for this issue?

@paultiplady
Copy link

paultiplady commented Jan 16, 2018

I can confirm that Celery 4.0.0..4.1.0 is susceptible to this issue (since they all pull in the latest Kombu). Manually pinning Kombu to <=4.1.0 works around this problem.

As far as I can tell this is a critical bug, it will block the producer thread when the broker goes down, until the broker comes back up, which will cause an outage if that thread is meant to be serving API requests (as is the standard use-case for Celery).

There's probably a more correct long-term fix to make (inheriting the retry policy correctly), but I successfully fixed this locally by making the following one-character fix to set max_retries to something safe:

def ensure_connection(self, errback=None, max_retries=3,
                      interval_start=2, interval_step=2, interval_max=30,
                      callback=None, reraise_as_library_errors=True):

(Infinite retries doesn't seem like a sensible default anywhere in the system, even if it's intended that the config will be overridden by something else).

@georgepsarakis
Copy link
Contributor

@paultiplady this must be fixed on master by #769 .

@auvipy auvipy closed this as completed Jan 16, 2018
@mlorant
Copy link

mlorant commented May 22, 2018

Does the patch is still needed with kombu 4.2.0? It seems to work without it, but just be 200 % sure.. :)

@georgepsarakis
Copy link
Contributor

@mlorant sorry, I did not quite understand the question. If you notice the commit history, the fix for this issue is included in 4.2.0.

@mlorant
Copy link

mlorant commented May 22, 2018

Alright, I was in a hurry when posting there, sorry. I wanted to mention @etiktin and its monkey patch in #735 (comment)

The 4.2.0 release notes does not quote this issue explicitly as resolved, but since it's merged and closed for a long time, I should have guessed it was in. Thanks for the confirmation though 👍

@celery celery locked as resolved and limited conversation to collaborators May 23, 2018
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Projects
None yet
Development

No branches or pull requests

7 participants