Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Celery does not start all amqp connections with heartbeats #7250

Open
10 of 18 tasks
massover opened this issue Jan 21, 2022 · 10 comments
Open
10 of 18 tasks

Celery does not start all amqp connections with heartbeats #7250

massover opened this issue Jan 21, 2022 · 10 comments

Comments

@massover
Copy link

massover commented Jan 21, 2022

Checklist

  • I have verified that the issue exists against the master branch of Celery.
  • This has already been asked to the discussions forum first.
  • I have read the relevant section in the
    contribution guide
    on reporting bugs.
  • I have checked the issues list
    for similar or identical bug reports.
  • I have checked the pull requests list
    for existing proposed fixes.
  • I have checked the commit log
    to find out if the bug was already fixed in the master branch.
  • I have included all related issues and possible duplicate issues
    in this issue (If there are none, check this box anyway).

Mandatory Debugging Information

  • I have included the output of celery -A proj report in the issue.
    (if you are not able to do this, then at least specify the Celery
    version affected).
  • I have verified that the issue exists against the master branch of Celery.
  • I have included the contents of pip freeze in the issue.
  • I have included all the versions of all the external dependencies required
    to reproduce this bug.

Optional Debugging Information

  • I have tried reproducing the issue on more than one Python version
    and/or implementation.
  • I have tried reproducing the issue on more than one message broker and/or
    result backend.
  • I have tried reproducing the issue on more than one version of the message
    broker and/or result backend.
  • I have tried reproducing the issue on more than one operating system.
  • I have tried reproducing the issue on more than one workers pool.
  • I have tried reproducing the issue with autoscaling, retries,
    ETA/Countdown & rate limits disabled.
  • I have tried reproducing the issue after downgrading
    and/or upgrading Celery and its dependencies.

Related Issues and Possible Duplicates

Related Issues

  • None

Possible Duplicates

  • None

Environment & Settings

Celery version: 5.2.3 (dawn-chorus)

celery report Output:

(venv) ➜  heartbeat-bug celery -A main report        

software -> celery:5.2.3 (dawn-chorus) kombu:5.2.3 py:3.9.9
            billiard:3.6.4.0 py-amqp:5.0.9
platform -> system:Darwin arch:64bit
            kernel version:19.6.0 imp:CPython
loader   -> celery.loaders.app.AppLoader
settings -> transport:amqp results:disabled

broker_url: 'amqp://guest:********@localhost:5672//'
deprecated_settings: None

Steps to Reproduce

Required Dependencies

  • Minimal Python Version: Python 3.9.9
  • Minimal Celery Version: master
  • Minimal Kombu Version: 5.2.3
  • Minimal Broker Version: N/A or Unknown
  • Minimal Result Backend Version: N/A or Unknown
  • Minimal OS and/or Kernel Version: N/A or Unknown
  • Minimal Broker Client Version: N/A or Unknown
  • Minimal Result Backend Client Version: N/A or Unknown

Python Packages

pip freeze Output:

(venv) ➜  heartbeat-bug pip freeze                   
amqp==5.0.9
billiard==3.6.4.0
celery @ git+https://github.com/celery/celery.git@fdb4af3cbf88ab59a3ed25a13b554b718768d178
click==8.0.3
click-didyoumean==0.3.0
click-plugins==1.1.1
click-repl==0.2.0
kombu==5.2.3
prompt-toolkit==3.0.24
pytz==2021.3
six==1.16.0
vine==5.0.0
wcwidth==0.2.5

Other Dependencies

N/A

Minimally Reproducible Test Case

from celery import Celery

app = Celery('hello', broker='amqp://guest@localhost//')

@app.task
def hello():
    return 'hello world'

# start the worker normally with 
# celery -A main worker -l info      

Expected Behavior

I think all of the rabbitmq connections created should have amqp heartbeats.

Why does this matter? When machines are connected to rabbitmq and are terminated (more likely in an ephemeral cloud environment), the connections never close on the rabbitmq side. Eventually the rabbitmq service will run out of new available connections or maybe worse crash.

Actual Behavior

The main worker connection gets a 60s heartbeat, but the supplemental worker connections (such as mingle) have no heartbeat.

image

Passing hearbeat as a broker_transport_option applies the heartbeat to all connections

from celery import Celery

app = Celery('hello', broker='amqp://guest@localhost//', broker_transport_options={"heartbeat": 40})

@app.task
def hello():
    return 'hello world'

image

@open-collective-bot
Copy link

Hey @massover 👋,
Thank you for opening an issue. We will get back to you as soon as we can.
Also, check out our Open Collective and consider backing us - every little helps!

We also offer priority support for our sponsors.
If you require immediate assistance please consider sponsoring us.

@Hoohaha
Copy link

Hoohaha commented Jun 23, 2022

I have same issue. The rabbitmq won't close dided connections, so there are ten millions of died connections makes rabbitmq slowly.

If set hearbeat in amqp transport, the rabbitmq will close connection due to the celery won't send heartbeat in AMQP protocol.
The celery work always try to establish new connections. Then killed then try again.

Is there have some workaround for this?
Or switch to redis broker...?

@Hoohaha
Copy link

Hoohaha commented Jun 23, 2022

I have some new discoveries.

My environment:
celery=5.1.2
python=3.9
os=windows 10 laptop

When Pool = eventlet

the heartbeat_check is registered when load asynloop: https://github.com/celery/celery/blob/master/celery/worker/loops.py#L59
however the timer never call the event on time.
No idea why. Seems eventlet.timer has bugs.

**Pool = threads **
When using threading as pool, the heartbeat_check seems is never passed to timer.
Use below workaround can force register heartbeat_check event in hub.timer.

class AMQPHeart(bootsteps.StartStopStep):
    requires = ('celery.worker.consumer:Connection',)

    def start(self, c):
        tick = c.connection.heartbeat_check
        c.timer.call_repeatedly(HEARTBEAT / 2, tick, (2,))

# do not forget add step
app.steps['consumer'].add(AMQPHeart)

@kinoute
Copy link

kinoute commented Dec 27, 2022

I have exactly the same problem. Our RabbitMQ instance sometimes has more than 60k connections for... 300 consumers. Some connections have the correct default heartbeat set, some don't, for the same consumer. These connections are then never closed properly and make the RMQ RAM instance grow, leading to OOM/memory alarm.

We had to make a bash script to periodically kill connections from the RMQ instance by pinging every IP:port associated to a RMQ connection. This is very annoying.

@auvipy auvipy added this to the 5.3 milestone Dec 27, 2022
@Nusnus Nusnus modified the milestones: 5.3, Future Feb 19, 2023
@shouldsee
Copy link

shouldsee commented Nov 1, 2023

I have some new discoveries.

My environment: celery=5.1.2 python=3.9 os=windows 10 laptop

When Pool = eventlet

the heartbeat_check is registered when load asynloop: https://github.com/celery/celery/blob/master/celery/worker/loops.py#L59 however the timer never call the event on time. No idea why. Seems eventlet.timer has bugs.

**Pool = threads ** When using threading as pool, the heartbeat_check seems is never passed to timer. Use below workaround can force register heartbeat_check event in hub.timer.

class AMQPHeart(bootsteps.StartStopStep):
    requires = ('celery.worker.consumer:Connection',)

    def start(self, c):
        tick = c.connection.heartbeat_check
        c.timer.call_repeatedly(HEARTBEAT / 2, tick, (2,))

# do not forget add step
app.steps['consumer'].add(AMQPHeart)

emm connection still dropped after adding this bookstep... but probably because I am looking at the publisher step

I had to use overrdie AMQP when initing Celery instance to make sure the hb is sent on producer side...

app = Celery('myapp',**dict(amqp="myproj.amqp_util:AMQP"))

amqp_util.py

import traceback
HEARTBEAT_RATE= 10
from kombu import Connection, Consumer, Exchange, Producer, Queue, pools
from celery.app.amqp import AMQP


from kombu.asynchronous.timer import Timer
import traceback


def _enable_amqheartbeats(timer, connection, rate=2.0):
    '''
    src: celery.worker.loops

    '''
    heartbeat_error = [None]

    if not connection:
        return heartbeat_error

    heartbeat = connection.get_heartbeat_interval()  # negotiated
    if not (heartbeat and connection.supports_heartbeats):
        return heartbeat_error

    def tick(rate):
        # assert 0
        DBG = 0
        try:
            if DBG:
                print('[dbg_sent_hb]-----------------------------------------')
            connection.heartbeat_check(rate)
        except Exception as e:
            # heartbeat_error is passed by reference can be updated
            # no append here list should be fixed size=1
            # print(tr)
            if DBG:
                print(f'[heartbeat_check_exception]{e}')
                traceback.print_exc()
            heartbeat_error[0] = e

    timer.call_repeatedly(heartbeat / rate, tick, (rate,))
    return heartbeat_error

class Connection(Connection):
    timer = Timer()

    def __init__(self,*a,**kw):
        super().__init__(*a,**kw)

    def _ensure_connection(self,*a,**kw):
        ret = super()._ensure_connection()
        _enable_amqheartbeats(self.timer, self, HEARTBEAT_RATE/2.)
        return ret


class AMQP(AMQP):



    Connection = Connection
    Consumer = Consumer
    Producer = Producer

    #: compat alias to Connection
    BrokerConnection = Connection
    pass

@auvipy auvipy modified the milestones: Future, 5.4.x Nov 13, 2023
@slavestys
Copy link

slavestys commented Mar 5, 2024

In consumer/events.py:

def start(self, c):
        # flush events sent while connection was down.
        prev = self._close(c)
        dis = c.event_dispatcher = c.app.events.Dispatcher(
            c.connection_for_write(),

Is it necessary to transmit heartbeat to c.connection_for_write?

In kombu/connection.py there is a default_channel property. There, too, a connection is created without a heartbeat.

@Hoohaha
Copy link

Hoohaha commented Mar 11, 2024

AMQP heatbeat tick is started conditionally with is_green in sync loop. https://github.com/celery/celery/blob/main/celery/worker/loops.py#L115
That maybe the reason why heatbeat is missed when pool is theading.
image

@slavestys
Copy link

slavestys commented Mar 11, 2024

https://www.rabbitmq.com/docs/heartbeats
Heartbeats executes rabbitmq. They are initialized on the server side. That is, you do not need to do any additional actions on the client side to enable heartbeats. Or am I misunderstanding something?

That is, if the celery process ends unexpectedly, then all these timer checks on client side will stop working. And the connection will remain.

@kinoute
Copy link

kinoute commented Mar 11, 2024

if the heartbeat on RabbitMQ was set properly for these connections, it will detect that your Celery process is not here anymore and close the connections.

What we can see in the first two screenshots is that the heartbeat interval is not set at all for some connections inside the same Celery worker. Some has it, some don't (heartbeat with a value of 0 means this connection will never get checked and will remained in RMQ memory even after Celery worker is no longer up).

We don't understand why, In a same Celery worker, the different connections established by the worker don't all respect the heartbeat interval given by the RabbitMQ instance. Fortunately, forcing It with broker_transport_options fix the issue.

@slavestys
Copy link

slavestys commented Mar 11, 2024

What we can see in the first two screenshots is that the heartbeat interval is not set at all for some connections inside the same Celery worker.

In consumer/events.py:

def start(self, c):
        # flush events sent while connection was down.
        prev = self._close(c)
        dis = c.event_dispatcher = c.app.events.Dispatcher(
            c.connection_for_write(),

Is it necessary to transmit heartbeat to c.connection_for_write?

In kombu/connection.py there is a default_channel property. There, too, a connection is created without a heartbeat.

Maybe because the desired parameter is not transmitted everywhere when connecting.

Fortunately, forcing It with broker_transport_options fix the issue.

It helped. Thank you very much!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

7 participants