Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Celery 4 worker can't connect to RabbitMQ broker failover #3921

Closed
draskomikic opened this issue Mar 17, 2017 · 36 comments
Closed

Celery 4 worker can't connect to RabbitMQ broker failover #3921

draskomikic opened this issue Mar 17, 2017 · 36 comments

Comments

@draskomikic
Copy link

draskomikic commented Mar 17, 2017

I have 3 RabbitMQ nodes in cluster in HA mode. Each node is on separate Docker container.

I have used this command to set HA policy:
rabbitmqctl set_policy ha-all "" '{"ha-mode":"all","ha-sync-mode":"automatic"}'

Celery config looks like this:

CELERY = dict(
    broker_url=[
        'amqp://guest@rabbitmq1:5672',
        'amqp://guest@rabbitmq2:5672',
        'amqp://guest@rabbitmq3:5672',
    ],
    celery_queue_ha_policy='all',
    accept_content=['json'],
    task_serializer='json',
    result_serializer='json',
    task_default_queue='default',
    task_queues=(
        Queue('default', Exchange('default')),
        Queue('preprocessor', Exchange('preprocessor')),
        Queue('vcp', Exchange('vcp')),
        Queue('alpha', Exchange('alpha')),
    ),
    task_routes={
        'myapp.worker.tasks.preprocess': {'queue': 'preprocessor'},
        'myapp.worker.tasks.vcp': {'queue': 'vcp'},
         'myapp.worker.tasks.alpha': {'queue': 'alpha'},
    },
    imports=[
        'myapp.worker.tasks',
    ]
)

Everything works fine until I stop master RabbitMQ application in order to test Celery failover feature using command:
rabbitmqctl stop_app

Immediately after RabbitMQ application is stopped I started seeing errors in log bellow. Frequency of log messages is very high and it doesn't slow down with number of attempts.

According to logs Celery tries to reconnect using next failover but it get interrupted by another try to reconnect to node that was stopped. The same thing happens over and over like in infinite loop.

[2017-03-17 15:10:28,084: ERROR/MainProcess] consumer: Cannot connect to amqp://guest:**@rabbitmq1:5672//: [Errno 111] Connection refused.
Will retry using next failover.

[2017-03-17 15:10:28,300: DEBUG/MainProcess] Start from server, version: 0.9, properties: {'information': 'Licensed under the MPL.  See http://www.rabbitmq.com/', 'product': 'RabbitMQ', 'copyright': 'Copyright (C) 2007-2016 Pivotal Software, Inc.', 'capabilities': {'exchange_exchange_bindings': True, 'connection.blocked': True, 'authentication_failure_close': True, 'direct_reply_to': True, 'basic.nack': True, 'per_consumer_qos': True, 'consumer_priorities': True, 'consumer_cancel_notify': True, 'publisher_confirms': True}, 'cluster_name': 'rabbit@rabbitmq1', 'platform': 'Erlang/OTP', 'version': '3.6.6'}, mechanisms: [u'PLAIN', u'AMQPLAIN'], locales: [u'en_US']
[2017-03-17 15:10:28,302: DEBUG/MainProcess] ^-- substep ok
[2017-03-17 15:10:28,303: DEBUG/MainProcess] | Consumer: Starting Mingle
[2017-03-17 15:10:28,303: INFO/MainProcess] mingle: searching for neighbors
[2017-03-17 15:10:28,303: DEBUG/MainProcess] using channel_id: 1
[2017-03-17 15:10:28,318: DEBUG/MainProcess] Channel open
[2017-03-17 15:10:28,470: WARNING/MainProcess] consumer: Connection to broker lost. Trying to re-establish the connection...
Traceback (most recent call last):
  File "/usr/local/lib/python2.7/site-packages/celery/worker/consumer/consumer.py", line 318, in start
    blueprint.start(self)
  File "/usr/local/lib/python2.7/site-packages/celery/bootsteps.py", line 119, in start
    step.start(parent)
  File "/usr/local/lib/python2.7/site-packages/celery/worker/consumer/mingle.py", line 38, in start
    self.sync(c)
  File "/usr/local/lib/python2.7/site-packages/celery/worker/consumer/mingle.py", line 42, in sync
    replies = self.send_hello(c)
  File "/usr/local/lib/python2.7/site-packages/celery/worker/consumer/mingle.py", line 55, in send_hello
    replies = inspect.hello(c.hostname, our_revoked._data) or {}
  File "/usr/local/lib/python2.7/site-packages/celery/app/control.py", line 129, in hello
    return self._request('hello', from_node=from_node, revoked=revoked)
  File "/usr/local/lib/python2.7/site-packages/celery/app/control.py", line 81, in _request
    timeout=self.timeout, reply=True,
  File "/usr/local/lib/python2.7/site-packages/celery/app/control.py", line 436, in broadcast
    limit, callback, channel=channel,
  File "/usr/local/lib/python2.7/site-packages/kombu/pidbox.py", line 315, in _broadcast
    serializer=serializer)
  File "/usr/local/lib/python2.7/site-packages/kombu/pidbox.py", line 290, in _publish
    serializer=serializer,
  File "/usr/local/lib/python2.7/site-packages/kombu/messaging.py", line 181, in publish
    exchange_name, declare,
  File "/usr/local/lib/python2.7/site-packages/kombu/messaging.py", line 187, in _publish
    channel = self.channel
  File "/usr/local/lib/python2.7/site-packages/kombu/messaging.py", line 209, in _get_channel
    channel = self._channel = channel()
  File "/usr/local/lib/python2.7/site-packages/kombu/utils/functional.py", line 38, in __call__
    value = self.__value__ = self.__contract__()
  File "/usr/local/lib/python2.7/site-packages/kombu/messaging.py", line 224, in <lambda>
    channel = ChannelPromise(lambda: connection.default_channel)
  File "/usr/local/lib/python2.7/site-packages/kombu/connection.py", line 819, in default_channel
    self.connection
  File "/usr/local/lib/python2.7/site-packages/kombu/connection.py", line 802, in connection
    self._connection = self._establish_connection()
  File "/usr/local/lib/python2.7/site-packages/kombu/connection.py", line 757, in _establish_connection
    conn = self.transport.establish_connection()
  File "/usr/local/lib/python2.7/site-packages/kombu/transport/pyamqp.py", line 130, in establish_connection
    conn.connect()
  File "/usr/local/lib/python2.7/site-packages/amqp/connection.py", line 294, in connect
    self.transport.connect()
  File "/usr/local/lib/python2.7/site-packages/amqp/transport.py", line 120, in connect
    self._connect(self.host, self.port, self.connect_timeout)
  File "/usr/local/lib/python2.7/site-packages/amqp/transport.py", line 161, in _connect
    self.sock.connect(sa)
  File "/usr/local/lib/python2.7/socket.py", line 228, in meth
    return getattr(self._sock,name)(*args)
error: [Errno 111] Connection refused
[2017-03-17 15:10:28,508: DEBUG/MainProcess] Closed channel #1
[2017-03-17 15:10:28,570: DEBUG/MainProcess] | Consumer: Restarting event loop...
[2017-03-17 15:10:28,572: DEBUG/MainProcess] | Consumer: Restarting Gossip...
[2017-03-17 15:10:28,575: DEBUG/MainProcess] | Consumer: Restarting Heart...
[2017-03-17 15:10:28,648: DEBUG/MainProcess] | Consumer: Restarting Control...
[2017-03-17 15:10:28,655: DEBUG/MainProcess] | Consumer: Restarting Tasks...
[2017-03-17 15:10:28,655: DEBUG/MainProcess] Canceling task consumer...
[2017-03-17 15:10:28,655: DEBUG/MainProcess] | Consumer: Restarting Mingle...
[2017-03-17 15:10:28,655: DEBUG/MainProcess] | Consumer: Restarting Events...
[2017-03-17 15:10:28,672: DEBUG/MainProcess] | Consumer: Restarting Connection...
[2017-03-17 15:10:28,673: DEBUG/MainProcess] | Consumer: Starting Connection
[2017-03-17 15:10:28,947: ERROR/MainProcess] consumer: Cannot connect to amqp://guest:**@rabbitmq1:5672//: [Errno 111] Connection refused.
Will retry using next failover.

[2017-03-17 15:10:29,345: DEBUG/MainProcess] Start from server, version: 0.9, properties: {'information': 'Licensed under the MPL.  See http://www.rabbitmq.com/', 'product': 'RabbitMQ', 'copyright': 'Copyright (C) 2007-2016 Pivotal Software, Inc.', 'capabilities': {'exchange_exchange_bindings': True, 'connection.blocked': True, 'authentication_failure_close': True, 'direct_reply_to': True, 'basic.nack': True, 'per_consumer_qos': True, 'consumer_priorities': True, 'consumer_cancel_notify': True, 'publisher_confirms': True}, 'cluster_name': 'rabbit@rabbitmq1', 'platform': 'Erlang/OTP', 'version': '3.6.6'}, mechanisms: [u'PLAIN', u'AMQPLAIN'], locales: [u'en_US']
[2017-03-17 15:10:29,506: INFO/MainProcess] Connected to amqp://guest:**@rabbitmq2:5672//
[2017-03-17 15:10:29,535: DEBUG/MainProcess] ^-- substep ok
[2017-03-17 15:10:29,569: DEBUG/MainProcess] | Consumer: Starting Events
[2017-03-17 15:10:29,682: ERROR/MainProcess] consumer: Cannot connect to amqp://guest:**@rabbitmq1:5672//: [Errno 111] Connection refused.
Will retry using next failover.

[2017-03-17 15:10:29,740: DEBUG/MainProcess] Start from server, version: 0.9, properties: {'information': 'Licensed under the MPL.  See http://www.rabbitmq.com/', 'product': 'RabbitMQ', 'copyright': 'Copyright (C) 2007-2016 Pivotal Software, Inc.', 'capabilities': {'exchange_exchange_bindings': True, 'connection.blocked': True, 'authentication_failure_close': True, 'direct_reply_to': True, 'basic.nack': True, 'per_consumer_qos': True, 'consumer_priorities': True, 'consumer_cancel_notify': True, 'publisher_confirms': True}, 'cluster_name': 'rabbit@rabbitmq1', 'platform': 'Erlang/OTP', 'version': '3.6.6'}, mechanisms: [u'PLAIN', u'AMQPLAIN'], locales: [u'en_US']
[2017-03-17 15:10:29,768: DEBUG/MainProcess] ^-- substep ok
[2017-03-17 15:10:29,770: DEBUG/MainProcess] | Consumer: Starting Mingle
[2017-03-17 15:10:29,770: INFO/MainProcess] mingle: searching for neighbors
[2017-03-17 15:10:29,771: DEBUG/MainProcess] using channel_id: 1
[2017-03-17 15:10:29,795: DEBUG/MainProcess] Channel open
[2017-03-17 15:10:29,874: WARNING/MainProcess] consumer: Connection to broker lost. Trying to re-establish the connection...
Traceback (most recent call last):
  File "/usr/local/lib/python2.7/site-packages/celery/worker/consumer/consumer.py", line 318, in start
    blueprint.start(self)
  File "/usr/local/lib/python2.7/site-packages/celery/bootsteps.py", line 119, in start
    step.start(parent)
  File "/usr/local/lib/python2.7/site-packages/celery/worker/consumer/mingle.py", line 38, in start
    self.sync(c)
  File "/usr/local/lib/python2.7/site-packages/celery/worker/consumer/mingle.py", line 42, in sync
    replies = self.send_hello(c)
  File "/usr/local/lib/python2.7/site-packages/celery/worker/consumer/mingle.py", line 55, in send_hello
    replies = inspect.hello(c.hostname, our_revoked._data) or {}
  File "/usr/local/lib/python2.7/site-packages/celery/app/control.py", line 129, in hello
    return self._request('hello', from_node=from_node, revoked=revoked)
  File "/usr/local/lib/python2.7/site-packages/celery/app/control.py", line 81, in _request
    timeout=self.timeout, reply=True,
  File "/usr/local/lib/python2.7/site-packages/celery/app/control.py", line 436, in broadcast
    limit, callback, channel=channel,
  File "/usr/local/lib/python2.7/site-packages/kombu/pidbox.py", line 315, in _broadcast
    serializer=serializer)
  File "/usr/local/lib/python2.7/site-packages/kombu/pidbox.py", line 290, in _publish
    serializer=serializer,
  File "/usr/local/lib/python2.7/site-packages/kombu/messaging.py", line 181, in publish
    exchange_name, declare,
  File "/usr/local/lib/python2.7/site-packages/kombu/messaging.py", line 187, in _publish
    channel = self.channel
  File "/usr/local/lib/python2.7/site-packages/kombu/messaging.py", line 209, in _get_channel
    channel = self._channel = channel()
  File "/usr/local/lib/python2.7/site-packages/kombu/utils/functional.py", line 38, in __call__
    value = self.__value__ = self.__contract__()
  File "/usr/local/lib/python2.7/site-packages/kombu/messaging.py", line 224, in <lambda>
    channel = ChannelPromise(lambda: connection.default_channel)
  File "/usr/local/lib/python2.7/site-packages/kombu/connection.py", line 819, in default_channel
    self.connection
  File "/usr/local/lib/python2.7/site-packages/kombu/connection.py", line 802, in connection
    self._connection = self._establish_connection()
  File "/usr/local/lib/python2.7/site-packages/kombu/connection.py", line 757, in _establish_connection
    conn = self.transport.establish_connection()
  File "/usr/local/lib/python2.7/site-packages/kombu/transport/pyamqp.py", line 130, in establish_connection
    conn.connect()
  File "/usr/local/lib/python2.7/site-packages/amqp/connection.py", line 294, in connect
    self.transport.connect()
  File "/usr/local/lib/python2.7/site-packages/amqp/transport.py", line 120, in connect
    self._connect(self.host, self.port, self.connect_timeout)
  File "/usr/local/lib/python2.7/site-packages/amqp/transport.py", line 161, in _connect
    self.sock.connect(sa)
  File "/usr/local/lib/python2.7/socket.py", line 228, in meth
    return getattr(self._sock,name)(*args)
error: [Errno 111] Connection refused
[2017-03-17 15:10:29,887: DEBUG/MainProcess] Closed channel #1
[2017-03-17 15:10:29,907: DEBUG/MainProcess] | Consumer: Restarting event loop...
[2017-03-17 15:10:29,908: DEBUG/MainProcess] | Consumer: Restarting Gossip...
[2017-03-17 15:10:29,908: DEBUG/MainProcess] | Consumer: Restarting Heart...
[2017-03-17 15:10:29,908: DEBUG/MainProcess] | Consumer: Restarting Control...
[2017-03-17 15:10:29,909: DEBUG/MainProcess] | Consumer: Restarting Tasks...
[2017-03-17 15:10:29,910: DEBUG/MainProcess] Canceling task consumer...
[2017-03-17 15:10:29,911: DEBUG/MainProcess] | Consumer: Restarting Mingle...
[2017-03-17 15:10:29,912: DEBUG/MainProcess] | Consumer: Restarting Events...
[2017-03-17 15:10:29,953: DEBUG/MainProcess] | Consumer: Restarting Connection...
[2017-03-17 15:10:29,954: DEBUG/MainProcess] | Consumer: Starting Connection
[2017-03-17 15:10:30,036: ERROR/MainProcess] consumer: Cannot connect to amqp://guest:**@rabbitmq1:5672//: [Errno 111] Connection refused.
Will retry using next failover.

pip list:

...
amqp (2.1.4)
celery (4.0.2)
flower (0.9.1)
kombu (4.0.2)
pep8 (1.7.0)
pip (9.0.1)
setuptools (34.3.0)
six (1.10.0)
...
@draskomikic draskomikic changed the title Celery worker can't connect to RabbitMQ broker failover Celery 4 worker can't connect to RabbitMQ broker failover Mar 17, 2017
@csterk
Copy link

csterk commented Mar 30, 2017

I have this same issue.

@georgepsarakis georgepsarakis self-assigned this Mar 30, 2017
@georgepsarakis
Copy link
Contributor

georgepsarakis commented Apr 7, 2017

@csterk or @draskomikic could you try the following patch please?

Change this line of the Inspect._request method to:

connection=None,

If this works we could then start working on a proper way to fix this bug.

@draskomikic
Copy link
Author

Hi George,

I have tried to set *connection=None* like you proposed, but no luck,
same issue happens.

I have also tried to set Celery configuration parameter
broker_connection_max_retries=3 since default is 100, but again same
issue, looks like it doesn't have effect on this problem.

I have also added one line in amqp/transport.py library to print out node
in use:
print("---------------------NOW USING NODE: {}".format(self.host))

Line is added before:
https://github.com/celery/py-amqp/blob/master/amqp/transport.py#L120

According to logs it seems that Celery has conflict in connecting to
failover node and reconnecting to node that went down. More precisely, it
looks like by trying to reconnect to master node it interrupts connection
to failover node.

You can see celery log output when I stop rabbitmq1 RabbitMQ node:

[2017-04-07 11:29:35,245: WARNING/MainProcess] ---------------------NOW
USING NODE: rabbitmq1
[2017-04-07 11:29:35,276: ERROR/MainProcess] consumer: Cannot connect to
amqp://guest:**@rabbitmq1:5672//: [Errno 111] Connection refused.
Will retry using next failover.

[2017-04-07 11:29:35,334: WARNING/MainProcess] ---------------------NOW
USING NODE: rabbitmq2
[2017-04-07 11:29:35,378: DEBUG/MainProcess] Start from server, version:
0.9, properties: {'information': 'Licensed under the MPL.  See
http://www.rabbitmq.com/', 'product': 'RabbitMQ', 'copyright': 'Copyright
(C) 2007-2016 Pivotal Software, Inc.', 'capabilities':
{'exchange_exchange_bindings': True, 'connection.blocked': True,
'authentication_failure_close': True, 'direct_reply_to': True,
'basic.nack': True, 'per_consumer_qos': True, 'consumer_priorities': True,
'consumer_cancel_notify': True, 'publisher_confirms': True},
'cluster_name': 'rabbit@rabbitmq2', 'platform': 'Erlang/OTP', 'version':
'3.6.6'}, mechanisms: [u'PLAIN', u'AMQPLAIN'], locales: [u'en_US']
[2017-04-07 11:29:35,386: INFO/MainProcess] Connected to
amqp://guest:**@rabbitmq2:5672//
[2017-04-07 11:29:35,392: DEBUG/MainProcess] ^-- substep ok
[2017-04-07 11:29:35,396: DEBUG/MainProcess] | Consumer: Starting Events
[2017-04-07 11:29:35,426: WARNING/MainProcess] ---------------------NOW
USING NODE: rabbitmq1
[2017-04-07 11:29:35,452: ERROR/MainProcess] consumer: Cannot connect to
amqp://guest:**@rabbitmq1:5672//: [Errno 111] Connection refused.
Will retry using next failover.

[2017-04-07 11:29:35,551: WARNING/MainProcess] ---------------------NOW
USING NODE: rabbitmq2
[2017-04-07 11:29:35,567: DEBUG/MainProcess] Start from server, version:
0.9, properties: {'information': 'Licensed under the MPL.  See
http://www.rabbitmq.com/', 'product': 'RabbitMQ', 'copyright': 'Copyright
(C) 2007-2016 Pivotal Software, Inc.', 'capabilities':
{'exchange_exchange_bindings': True, 'connection.blocked': True,
'authentication_failure_close': True, 'direct_reply_to': True,
'basic.nack': True, 'per_consumer_qos': True, 'consumer_priorities': True,
'consumer_cancel_notify': True, 'publisher_confirms': True},
'cluster_name': 'rabbit@rabbitmq2', 'platform': 'Erlang/OTP', 'version':
'3.6.6'}, mechanisms: [u'PLAIN', u'AMQPLAIN'], locales: [u'en_US']
[2017-04-07 11:29:35,582: DEBUG/MainProcess] ^-- substep ok
[2017-04-07 11:29:35,587: DEBUG/MainProcess] | Consumer: Starting Mingle
[2017-04-07 11:29:35,588: INFO/MainProcess] mingle: searching for neighbors
[2017-04-07 11:29:35,640: WARNING/MainProcess] ---------------------NOW
USING NODE: rabbitmq1
[2017-04-07 11:29:35,666: WARNING/MainProcess] consumer: Connection to
broker lost. Trying to re-establish the connection...
Traceback (most recent call last):
  File
"/usr/local/lib/python2.7/site-packages/celery/worker/consumer/consumer.py",
line 318, in start
    blueprint.start(self)
  File "/usr/local/lib/python2.7/site-packages/celery/bootsteps.py", line
119, in start
    step.start(parent)
  File
"/usr/local/lib/python2.7/site-packages/celery/worker/consumer/mingle.py",
line 38, in start
    self.sync(c)
  File
"/usr/local/lib/python2.7/site-packages/celery/worker/consumer/mingle.py",
line 42, in sync
    replies = self.send_hello(c)
  File
"/usr/local/lib/python2.7/site-packages/celery/worker/consumer/mingle.py",
line 55, in send_hello
    replies = inspect.hello(c.hostname, our_revoked._data) or {}
  File "/usr/local/lib/python2.7/site-packages/celery/app/control.py", line
121, in hello
    return self._request('hello', from_node=from_node, revoked=revoked)
  File "/usr/local/lib/python2.7/site-packages/celery/app/control.py", line
74, in _request
    return self._prepare(self.app.control.broadcast(command,
arguments=kwargs, destination=self.destination, callback=self.callback,
connection=None, limit=self.limit, timeout=self.timeout, reply=True,))
  File "/usr/local/lib/python2.7/site-packages/celery/app/control.py", line
428, in broadcast
    limit, callback, channel=channel,
  File "/usr/local/lib/python2.7/site-packages/kombu/pidbox.py", line 304,
in _broadcast
    chan = channel or self.connection.default_channel
  File "/usr/local/lib/python2.7/site-packages/kombu/connection.py", line
819, in default_channel
    self.connection
  File "/usr/local/lib/python2.7/site-packages/kombu/connection.py", line
802, in connection
    self._connection = self._establish_connection()
  File "/usr/local/lib/python2.7/site-packages/kombu/connection.py", line
757, in _establish_connection
    conn = self.transport.establish_connection()
  File "/usr/local/lib/python2.7/site-packages/kombu/transport/pyamqp.py",
line 130, in establish_connection
    conn.connect()
  File "/usr/local/lib/python2.7/site-packages/amqp/connection.py", line
294, in connect
    self.transport.connect()
  File "/usr/local/lib/python2.7/site-packages/amqp/transport.py", line
121, in connect
    self._connect(self.host, self.port, self.connect_timeout)
  File "/usr/local/lib/python2.7/site-packages/amqp/transport.py", line
162, in _connect
    self.sock.connect(sa)
  File "/usr/local/lib/python2.7/socket.py", line 228, in meth
    return getattr(self._sock,name)(*args)
error: [Errno 111] Connection refused
[2017-04-07 11:29:35,673: DEBUG/MainProcess] | Consumer: Restarting event
loop...
[2017-04-07 11:29:35,681: DEBUG/MainProcess] | Consumer: Restarting
Gossip...
[2017-04-07 11:29:35,683: DEBUG/MainProcess] | Consumer: Restarting Heart...
[2017-04-07 11:29:35,684: DEBUG/MainProcess] | Consumer: Restarting
Control...
[2017-04-07 11:29:35,685: DEBUG/MainProcess] | Consumer: Restarting Tasks...
[2017-04-07 11:29:35,687: DEBUG/MainProcess] Canceling task consumer...
[2017-04-07 11:29:35,688: DEBUG/MainProcess] | Consumer: Restarting
Mingle...
[2017-04-07 11:29:35,694: DEBUG/MainProcess] | Consumer: Restarting
Events...
[2017-04-07 11:29:35,700: DEBUG/MainProcess] | Consumer: Restarting
Connection...
[2017-04-07 11:29:35,702: DEBUG/MainProcess] | Consumer: Starting Connection
[2017-04-07 11:29:35,749: WARNING/MainProcess] ---------------------NOW
USING NODE: rabbitmq1
[2017-04-07 11:29:35,767: ERROR/MainProcess] consumer: Cannot connect to
amqp://guest:**@rabbitmq1:5672//: [Errno 111] Connection refused.
Will retry using next failover.

[2017-04-07 11:29:35,809: WARNING/MainProcess] ---------------------NOW
USING NODE: rabbitmq2
[2017-04-07 11:29:35,826: DEBUG/MainProcess] Start from server, version:
0.9, properties: {'information': 'Licensed under the MPL.  See
http://www.rabbitmq.com/', 'product': 'RabbitMQ', 'copyright': 'Copyright
(C) 2007-2016 Pivotal Software, Inc.', 'capabilities':
{'exchange_exchange_bindings': True, 'connection.blocked': True,
'authentication_failure_close': True, 'direct_reply_to': True,
'basic.nack': True, 'per_consumer_qos': True, 'consumer_priorities': True,
'consumer_cancel_notify': True, 'publisher_confirms': True},
'cluster_name': 'rabbit@rabbitmq2', 'platform': 'Erlang/OTP', 'version':
'3.6.6'}, mechanisms: [u'PLAIN', u'AMQPLAIN'], locales: [u'en_US']
[2017-04-07 11:29:35,850: INFO/MainProcess] Connected to
amqp://guest:**@rabbitmq2:5672//
[2017-04-07 11:29:35,853: DEBUG/MainProcess] ^-- substep ok
[2017-04-07 11:29:35,863: DEBUG/MainProcess] | Consumer: Starting Events
[2017-04-07 11:29:35,890: WARNING/MainProcess] ---------------------NOW
USING NODE: rabbitmq1
[2017-04-07 11:29:35,926: ERROR/MainProcess] consumer: Cannot connect to
amqp://guest:**@rabbitmq1:5672//: [Errno 111] Connection refused.
Will retry using next failover.

[2017-04-07 11:29:35,965: WARNING/MainProcess] ---------------------NOW
USING NODE: rabbitmq2
[2017-04-07 11:29:35,986: DEBUG/MainProcess] Start from server, version:
0.9, properties: {'information': 'Licensed under the MPL.  See
http://www.rabbitmq.com/', 'product': 'RabbitMQ', 'copyright': 'Copyright
(C) 2007-2016 Pivotal Software, Inc.', 'capabilities':
{'exchange_exchange_bindings': True, 'connection.blocked': True,
'authentication_failure_close': True, 'direct_reply_to': True,
'basic.nack': True, 'per_consumer_qos': True, 'consumer_priorities': True,
'consumer_cancel_notify': True, 'publisher_confirms': True},
'cluster_name': 'rabbit@rabbitmq2', 'platform': 'Erlang/OTP', 'version':
'3.6.6'}, mechanisms: [u'PLAIN', u'AMQPLAIN'], locales: [u'en_US']
[2017-04-07 11:29:35,990: DEBUG/MainProcess] ^-- substep ok
[2017-04-07 11:29:35,991: DEBUG/MainProcess] | Consumer: Starting Mingle
[2017-04-07 11:29:35,991: INFO/MainProcess] mingle: searching for neighbors
[2017-04-07 11:29:36,013: WARNING/MainProcess] ---------------------NOW
USING NODE: rabbitmq1
[2017-04-07 11:29:36,024: WARNING/MainProcess] consumer: Connection to
broker lost. Trying to re-establish the connection...

@georgepsarakis
Copy link
Contributor

@draskomikic did you try starting the worker with the --without-mingle option? This is still a bug (I think) but maybe this may be a temporary workaround for some cases.

@draskomikic
Copy link
Author

Hi George,

I have run some basis tests using --without-mingle option and it seems to be working just fine. I can see in logs that Celery workers immediately connect to next RabbitMQ node and that workers are able to continue consuming tasks from second RabbitMQ node.

Thank you very much for all help, let me know if I can somehow help you with this bug fix.

@georgepsarakis
Copy link
Contributor

georgepsarakis commented Apr 10, 2017

@draskomikic great 😄 , happy to help! Yes, I believe we need to solve this. I will give it some thought and let you know.

@thedrow
Copy link
Member

thedrow commented Apr 16, 2017

@georgepsarakis Great job troubleshooting this. Have you thought about how to fix this?

Btw, can you use the labels to triage issues. It's easier for others to filter them based on their expertise.

@georgepsarakis
Copy link
Contributor

@thedrow you mean adding the Component: * labels? If yes, then you are right, I should be doing this more often, sorry.

I haven't yet concluded on how we can fix this, as it goes beyond my understanding of the project internals. I will have another look and let you know if I have any suggestions.

@monitorius
Copy link

I have exactly same problem, and --without-mingle fixed it. Thanks for workaround.

@monitorius
Copy link

Looks like it's a bit deeper. The same issue exists with task producer - if it's connected to a rabbit node and this node dies, producer is unable to establish stable connection to failover node.
I see infinite log of something like this on producer:

DEBUG 2017-04-19 14:49:43,263 73960 amqp Start from server, version: 0.9, properties: {'information': 'Licensed under the MPL.  See http://www.rabbitmq.com/', 'product': 'RabbitMQ', 'copyright': 'Copyright (C) 2007-2016 Pivotal Software, Inc.', 'capabilities': {'exchange_exchange_bindings': True, 'connection.blocked': True, 'authentication_failure_close': True, 'direct_reply_to': True, 'basic.nack': True, 'per_consumer_qos': True, 'consumer_priorities': True, 'consumer_cancel_notify': True, 'publisher_confirms': True}, 'cluster_name': 'rabbit@mondev2', 'platform': 'Erlang/OTP', 'version': '3.6.1-yandex0'}, mechanisms: [u'AMQPLAIN', u'PLAIN'], locales: [u'en_US']
DEBUG 2017-04-19 14:49:43,314 73960 amqp using channel_id: 1
DEBUG 2017-04-19 14:49:43,340 73960 amqp Channel open
DEBUG 2017-04-19 14:49:43,343 73960 amqp Closed channel #1
DEBUG 2017-04-19 14:49:43,401 73960 amqp Start from server, version: 0.9, properties: {'information': 'Licensed under the MPL.  See http://www.rabbitmq.com/', 'product': 'RabbitMQ', 'copyright': 'Copyright (C) 2007-2016 Pivotal Software, Inc.', 'capabilities': {'exchange_exchange_bindings': True, 'connection.blocked': True, 'authentication_failure_close': True, 'direct_reply_to': True, 'basic.nack': True, 'per_consumer_qos': True, 'consumer_priorities': True, 'consumer_cancel_notify': True, 'publisher_confirms': True}, 'cluster_name': 'rabbit@mondev2', 'platform': 'Erlang/OTP', 'version': '3.6.1-yandex0'}, mechanisms: [u'AMQPLAIN', u'PLAIN'], locales: [u'en_US']
DEBUG 2017-04-19 14:49:43,450 73960 amqp using channel_id: 1
DEBUG 2017-04-19 14:49:43,476 73960 amqp Channel open
DEBUG 2017-04-19 14:49:43,477 73960 amqp Closed channel #1

And on failover rabbit node:

=WARNING REPORT==== 19-Apr-2017::14:51:29 ===
closing AMQP connection <0.6913.1> ([2A02:6B8:0:1A71::2953]:42490 -> [2A02:6B8:C01:304:0:639:0:1EC]:5672):
client unexpectedly closed TCP connection

=INFO REPORT==== 19-Apr-2017::14:51:29 ===
accepting AMQP connection <0.6924.1> ([2A02:6B8:0:1A71::2953]:42491 -> [2A02:6B8:C01:304:0:639:0:1EC]:5672)

=WARNING REPORT==== 19-Apr-2017::14:51:29 ===
closing AMQP connection <0.6924.1> ([2A02:6B8:0:1A71::2953]:42491 -> [2A02:6B8:C01:304:0:639:0:1EC]:5672):
client unexpectedly closed TCP connection

=INFO REPORT==== 19-Apr-2017::14:51:29 ===
accepting AMQP connection <0.6935.1> ([2A02:6B8:0:1A71::2953]:42492 -> [2A02:6B8:C01:304:0:639:0:1EC]:5672)

=WARNING REPORT==== 19-Apr-2017::14:51:29 ===
closing AMQP connection <0.6935.1> ([2A02:6B8:0:1A71::2953]:42492 -> [2A02:6B8:C01:304:0:639:0:1EC]:5672):
client unexpectedly closed TCP connection

And mingle stuff has nothing to do with producer, right?
Setup is the same as I used to test workers with "--without-mingle"

@georgepsarakis
Copy link
Contributor

@draskomikic @monitorius can you please try this patch: celery/kombu#724 ?

@monitorius
Copy link

My kombu/connection.py looks like this now:

        # make sure we're still connected, and if not refresh.
        #self.connection
        print 'ensure_connection'
        self.ensure_connection()

And still have reconnects:

2017-04-20 14:05:36,464 DEBUG: using channel_id: 1
2017-04-20 14:05:36,488 DEBUG: Channel open
2017-04-20 14:05:36,490 DEBUG: Closed channel #1
2017-04-20 14:05:36,537 DEBUG: Start from server, version: 0.9, properties: {'information': 'Licensed under the MPL.  See http://www.rabbitmq.com/', 'product': 'RabbitMQ', 'copyright': 'Copyright (C) 2007-2016 Pivotal Software, Inc.', 'capabilities': {'exchange_exchange_bindings': True, 'connection.blocked': True, 'authentication_failure_close': True, 'direct_reply_to': True, 'basic.nack': True, 'per_consumer_qos': True, 'consumer_priorities': True, 'consumer_cancel_notify': True, 'publisher_confirms': True}, 'cluster_name': 'rabbit@mondev2', 'platform': 'Erlang/OTP', 'version': '3.6.1-yandex0'}, mechanisms: [u'AMQPLAIN', u'PLAIN'], locales: [u'en_US']
ensure_connection
2017-04-20 14:05:36,578 DEBUG: using channel_id: 1
2017-04-20 14:05:36,600 DEBUG: Channel open
2017-04-20 14:05:36,602 DEBUG: Closed channel #1
2017-04-20 14:05:36,662 DEBUG: Start from server, version: 0.9, properties: {'information': 'Licensed under the MPL.  See http://www.rabbitmq.com/', 'product': 'RabbitMQ', 'copyright': 'Copyright (C) 2007-2016 Pivotal Software, Inc.', 'capabilities': {'exchange_exchange_bindings': True, 'connection.blocked': True, 'authentication_failure_close': True, 'direct_reply_to': True, 'basic.nack': True, 'per_consumer_qos': True, 'consumer_priorities': True, 'consumer_cancel_notify': True, 'publisher_confirms': True}, 'cluster_name': 'rabbit@mondev2', 'platform': 'Erlang/OTP', 'version': '3.6.1-yandex0'}, mechanisms: [u'AMQPLAIN', u'PLAIN'], locales: [u'en_US']
ensure_connection
2017-04-20 14:05:36,713 DEBUG: using channel_id: 1
2017-04-20 14:05:36,739 DEBUG: Channel open
2017-04-20 14:05:36,742 DEBUG: Closed channel #1
2017-04-20 14:05:36,798 DEBUG: Start from server, version: 0.9, properties: {'information': 'Licensed under the MPL.  See http://www.rabbitmq.com/', 'product': 'RabbitMQ', 'copyright': 'Copyright (C) 2007-2016 Pivotal Software, Inc.', 'capabilities': {'exchange_exchange_bindings': True, 'connection.blocked': True, 'authentication_failure_close': True, 'direct_reply_to': True, 'basic.nack': True, 'per_consumer_qos': True, 'consumer_priorities': True, 'consumer_cancel_notify': True, 'publisher_confirms': True}, 'cluster_name': 'rabbit@mondev2', 'platform': 'Erlang/OTP', 'version': '3.6.1-yandex0'}, mechanisms: [u'AMQPLAIN', u'PLAIN'], locales: [u'en_US']
ensure_connection

@monitorius
Copy link

With debugger I see that infinite repeats came from here:
https://github.com/celery/kombu/blob/master/kombu/connection.py#L494
because we get [Errno 32] Broken pipe every time on queue declaring:
https://github.com/celery/kombu/blob/master/kombu/entity.py#L605

And stack (to Broken pipe raise in amqp):

 ...
  File "/home/monitorius/rasp/morda_backend/common/virtualenv/lib/python2.7/site-packages/celery/app/task.py", line 535, in apply_async
    **options
  File "/home/monitorius/rasp/morda_backend/common/virtualenv/lib/python2.7/site-packages/celery/app/base.py", line 736, in send_task
    self.backend.on_task_call(P, task_id)
  File "/home/monitorius/rasp/morda_backend/common/virtualenv/lib/python2.7/site-packages/celery/backends/rpc.py", line 168, in on_task_call
    maybe_declare(self.binding(producer.channel), retry=True)
  File "/home/monitorius/rasp/morda_backend/common/virtualenv/lib/python2.7/site-packages/kombu/common.py", line 124, in maybe_declare
    channel, orig, **retry_policy)
  File "/home/monitorius/rasp/morda_backend/common/virtualenv/lib/python2.7/site-packages/kombu/common.py", line 143, in _imaybe_declare
    entity, declared, ident, channel, orig)
  File "/home/monitorius/rasp/morda_backend/common/virtualenv/lib/python2.7/site-packages/kombu/connection.py", line 494, in _ensured
    return fun(*args, **kwargs)
  File "/home/monitorius/rasp/morda_backend/common/virtualenv/lib/python2.7/site-packages/kombu/common.py", line 131, in _maybe_declare
    entity.declare(channel=channel)
  File "/home/monitorius/rasp/morda_backend/common/virtualenv/lib/python2.7/site-packages/kombu/entity.py", line 605, in declare
    self._create_queue(nowait=nowait, channel=channel)
  File "/home/monitorius/rasp/morda_backend/common/virtualenv/lib/python2.7/site-packages/kombu/entity.py", line 614, in _create_queue
    self.queue_declare(nowait=nowait, passive=False, channel=channel)
  File "/home/monitorius/rasp/morda_backend/common/virtualenv/lib/python2.7/site-packages/kombu/entity.py", line 649, in queue_declare
    nowait=nowait,
  File "/home/monitorius/rasp/morda_backend/common/virtualenv/lib/python2.7/site-packages/amqp/channel.py", line 1161, in queue_declare
    nowait, arguments),
  File "/home/monitorius/rasp/morda_backend/common/virtualenv/lib/python2.7/site-packages/amqp/abstract_channel.py", line 64, in send_method
    conn.frame_writer(1, self.channel_id, sig, args, content)
  File "/home/monitorius/rasp/morda_backend/common/virtualenv/lib/python2.7/site-packages/amqp/method_framing.py", line 174, in write_frame
    write(view[:offset])
  File "/home/monitorius/rasp/morda_backend/common/virtualenv/lib/python2.7/site-packages/amqp/transport.py", line 275, in write
    raise

https://github.com/celery/py-amqp/blob/v2.1.4/amqp/transport.py#L275

@georgepsarakis
Copy link
Contributor

georgepsarakis commented Apr 20, 2017

@monitorius are you using the RPC result backend? If yes, could you also try with another backend too?

@monitorius
Copy link

Here is the problem:
In this line kombu calls a fun to send a message to rmq. It finds that current rmq node is dead, and tries to connect to the next (failover) node here. It succeeds, we have a new channel to failover node. But after that in this infinite loop kombu continues to call fun with SAME OLD arguments, one of which is old channel to dead rmq node.

Literally, we have a new alive channel, but we trying to send messages to dead channel just because args list has not been changed.

As a proof of concept I inserted this dirty hack here:

                        if on_revive:
                            on_revive(channel)

                        if isinstance(args[3], channel.__class__):
                            args = list(args)
                            args[3] = channel
                        got_connection += 1

changing fun channel argument to a new channel. And it worked - producer seamlessly reconnects, as it should.

I'm new to kombu code, but it looks like a serious problem, because Channel objects are passed everywhere, but they are not reliable enough.

@georgepsarakis
Copy link
Contributor

georgepsarakis commented Apr 21, 2017

@monitorius great work in debugging here. I may have an alternative. What happens if you change line to:

maybe_declare(self.binding(producer.default_channel), retry=True)

@monitorius
Copy link

AttributeError: 'Producer' object has no attribute 'default_channel'

If you mean producer.connection.default_channel, it won't help either because it's a property - we'll still have hanging old channel in args. As to me, there should be some channel proxy. Or maybe connection object should be passed inside this functions instead of channel.

@monitorius
Copy link

With disabled rpc backend failover works fine - fun is a bound method of producer and uses self.channel, so channel is renewed after failover

@draskomikic
Copy link
Author

draskomikic commented Apr 21, 2017

Sorry for late response guys.

I did quick test with patch celery/kombu#724 and it seems to be working. I am not using RPC result backend.

I will do more thoroughly testing in a few hours.

UPDATE:
With patch and without RPC result backend it is working just fine, my Celery app is able to handle RabbitMQ node failures and to continue to receive tasks and process them. I did tests where I was shutting down and starting RabbitMQ nodes in cycles and couldn't find any error nor misbehaviour.

When I include result_backend = 'rpc://' in Celery config I started to notice error:
[Errno 111] Connection refused

@monitorius
Copy link

Oh, right. In fact, we are talking about two different cases (with same symptoms) here.

  1. Workers (consumers) trying to reconnect to to failover node.
  2. Producer trying to reconnect to to failover node.

I tested patch celery/kombu#724 only for case 2) - and my last messages are about producer. I've just tested this for 1) case - and @draskomikic is right, it works fine with workers.

@georgepsarakis
Copy link
Contributor

@monitorius correct. Now that I think of it, I think this happens because the exception type that is raised is not included in the list of Connection.recoverable_connection_errors.

@georgepsarakis
Copy link
Contributor

I believe it is safe to change the line to:

except conn_errors + (socket.error,) as exc:

What do you think?

@georgepsarakis
Copy link
Contributor

@monitorius I haven't still figured out how to fully fix the issue with the RPC backend. After the connection retries, it seems that the endless loop is transferred to Kombu Connection.drain_events. Are you perhaps interested in continuing this work? I could create a branch to share it with you.

@thedrow
Copy link
Member

thedrow commented Apr 27, 2017

@georgepsarakis So celery/kombu#724 is only a partial fix?

@georgepsarakis
Copy link
Contributor

As verified above by @monitorius and @draskomikic , the worker issue is resolved. However, I could not get the RPC backend to work unfortunately, thus my latest comment.

@draskomikic
Copy link
Author

@georgepsarakis Thanks for all hard work and prompt responses for this issue.

@newmen
Copy link

newmen commented Jul 18, 2017

Is this issue resolved?
Which celery version contains this fix?

I've tried the dirty fix from @monitorius with connection.py file patching as described here and it is working fine, but current celery 4.0.2 was packaged at Dec 2016 and thus not has this fix yet.

@thedrow
Copy link
Member

thedrow commented Jul 18, 2017

As far as we can tel there's only a partial fix.
The RPC backend still doesn't failover correctly.
The newest release will contain this fix. You can track the progress of the release at #4109.

@thedrow
Copy link
Member

thedrow commented Jul 21, 2017

The full fix won't hit 4.1.0 unfortunately.
We'd be happy to release a new version with a fix contributed by any of you after the current release cycle.

@mirasrael
Copy link

mirasrael commented Sep 29, 2017

@draskomikic I tried to fix this issue with proxying default_channel (which will automatically replace default channel in case of reconnect). Can you please test if it works for you (before I create pull request)?

https://github.com/mirasrael/kombu/tree/fix-reconnect-for-send-message

@auvipy
Copy link
Member

auvipy commented Jan 9, 2018

can you look into the last comment of the issue #4075

@auvipy
Copy link
Member

auvipy commented Jan 9, 2018

@mirasrael plz send a pr with test

@thijstriemstra
Copy link
Contributor

thijstriemstra commented Jan 16, 2018

Also encountered this issue (I think) using Celery 4.1.0, rabbitmq log was filling up with 1000s of messages like:

=WARNING REPORT==== 16-Jan-2018::06:31:37 ===
closing AMQP connection <0.2312.2> (192.168.82.10:51450 -> 192.168.82.1:5671, vhost: 'myvhost', user: 'celery'):
client unexpectedly closed TCP connection

=INFO REPORT==== 16-Jan-2018::06:31:38 ===
accepting AMQP connection <0.2325.2> (192.168.82.10:51452 -> 192.168.82.1:5671)

=INFO REPORT==== 16-Jan-2018::06:31:38 ===
connection <0.2325.2> (192.168.82.10:51452 -> 192.168.82.1:5671): user 'celery' authenticated and granted access to vhost 'myvhost'

=WARNING REPORT==== 16-Jan-2018::06:31:38 ===
closing AMQP connection <0.2325.2> (192.168.82.10:51452 -> 192.168.82.1:5671, vhost: 'myvhost', user: 'celery'):
client unexpectedly closed TCP connection

=INFO REPORT==== 16-Jan-2018::06:31:38 ===
accepting AMQP connection <0.2338.2> (192.168.82.10:51454 -> 192.168.82.1:5671)

=INFO REPORT==== 16-Jan-2018::06:31:38 ===
connection <0.2338.2> (192.168.82.10:51454 -> 192.168.82.1:5671): user 'celery' authenticated and granted access to vhost 'myvhost'

=WARNING REPORT==== 16-Jan-2018::06:31:38 ===
closing AMQP connection <0.2338.2> (192.168.82.10:51454 -> 192.168.82.1:5671, vhost: 'myvhost', user: 'celery'):
client unexpectedly closed TCP connection

etc...

This happened after restarting the device running the rabbitmq server and worker. The client calling the tasks would go crazy on rabbitmq once that server+worker came back online. Removing the result_backend = rpc:// fixed the problem.

@23doors
Copy link
Contributor

23doors commented Feb 13, 2018

Is 4.2 coming any time soon then?

@auvipy auvipy modified the milestones: v5.0.0, v4.3 May 27, 2018
@auvipy
Copy link
Member

auvipy commented Aug 10, 2018

Can anyone confirm this issue with the latest release?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment