Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Celery task does not get send to broker #6661

Closed
8 of 12 tasks
Krogsager opened this issue Mar 5, 2021 · 19 comments
Closed
8 of 12 tasks

Celery task does not get send to broker #6661

Krogsager opened this issue Mar 5, 2021 · 19 comments

Comments

@Krogsager
Copy link

Krogsager commented Mar 5, 2021

Checklist

  • I have verified that the issue exists against the master branch of Celery.
  • This has already been asked to the discussion group first.
  • I have read the relevant section in the
    contribution guide
    on reporting bugs.
  • I have checked the issues list
    for similar or identical bug reports.
  • I have checked the pull requests list
    for existing proposed fixes.
  • I have checked the commit log
    to find out if the bug was already fixed in the master branch.
  • I have included all related issues and possible duplicate issues
    in this issue (If there are none, check this box anyway).

Mandatory Debugging Information

  • I have included the output of celery -A proj report in the issue.
    (if you are not able to do this, then at least specify the Celery
    version affected).
  • I have verified that the issue exists against the master branch of Celery.
  • I have included the contents of pip freeze in the issue.
  • I have included all the versions of all the external dependencies required
    to reproduce this bug.

Optional Debugging Information

  • I have tried reproducing the issue after downgrading
    and/or upgrading Celery and its dependencies.

Related Issues and Possible Duplicates

Related Issues

#5969

Possible Duplicates

My post
https://stackoverflow.com/questions/66462079/celery-task-does-not-get-send-to-broker

Environment & Settings

Celery version: 5.0.5

celery report Output:

software -> celery:5.0.5 (singularity) kombu:5.0.2 py:3.7.8
            billiard:3.6.3.0 py-amqp:5.0.5
platform -> system:Linux arch:64bit
            kernel version:4.15.0-20-generic imp:CPython
loader   -> celery.loaders.app.AppLoader
settings -> transport:pyamqp results:db+postgresql://docker:**@pg_db:5432/

broker: 'pyamqp://guest@my-rabbit//'
celery_accept_content: ['json']
environ: {
    'CELERY_BROKER': 'pyamqp://guest@my-rabbit//',
    'CELERY_BROKER_URL': 'amqp://guest:********@my-rabbit:5672//',
    'GPG_KEY': '********',
    'HOME': '/home/snake',
    'LANG': 'C.UTF-8',
    'LS_COLORS': 'rs=0:di=01;34:ln=01;36:mh=00:pi=40;33:so=01;35:do=01;35:bd=40;33;01:cd=40;33;01:or=40;31;01:mi=00:su=37;41:sg=30;43:ca=30;41:tw=30;42:ow=34;42:st=37;44:ex=01;32:*.tar=01;31:*.tgz=01;31:*.arc=01;31:*.arj=01;31:*.taz=01;31:*.lha=01;31:*.lz4=01;31:*.lzh=01;31:*.lzma=01;31:*.tlz=01;31:*.txz=01;31:*.tzo=01;31:*.t7z=01;31:*.zip=01;31:*.z=01;31:*.Z=01;31:*.dz=01;31:*.gz=01;31:*.lrz=01;31:*.lz=01;31:*.lzo=01;31:*.xz=01;31:*.zst=01;31:*.tzst=01;31:*.bz2=01;31:*.bz=01;31:*.tbz=01;31:*.tbz2=01;31:*.tz=01;31:*.deb=01;31:*.rpm=01;31:*.jar=01;31:*.war=01;31:*.ear=01;31:*.sar=01;31:*.rar=01;31:*.alz=01;31:*.ace=01;31:*.zoo=01;31:*.cpio=01;31:*.7z=01;31:*.rz=01;31:*.cab=01;31:*.jpg=01;35:*.jpeg=01;35:*.mjpg=01;35:*.mjpeg=01;35:*.gif=01;35:*.bmp=01;35:*.pbm=01;35:*.pgm=01;35:*.ppm=01;35:*.tga=01;35:*.xbm=01;35:*.xpm=01;35:*.tif=01;35:*.tiff=01;35:*.png=01;35:*.svg=01;35:*.svgz=01;35:*.mng=01;35:*.pcx=01;35:*.mov=01;35:*.mpg=01;35:*.mpeg=01;35:*.m2v=01;35:*.mkv=01;35:*.webm=01;35:*.ogm=01;35:*.mp4=01;35:*.m4v=01;35:*.mp4v=01;35:*.vob=01;35:*.qt=01;35:*.nuv=01;35:*.wmv=01;35:*.asf=01;35:*.rm=01;35:*.rmvb=01;35:*.flc=01;35:*.avi=01;35:*.fli=01;35:*.flv=01;35:*.gl=01;35:*.dl=01;35:*.xcf=01;35:*.xwd=01;35:*.yuv=01;35:*.cgm=01;35:*.emf=01;35:*.ogv=01;35:*.ogx=01;35:*.aac=00;36:*.au=00;36:*.flac=00;36:*.m4a=00;36:*.mid=00;36:*.midi=00;36:*.mka=00;36:*.mp3=00;36:*.mpc=00;36:*.ogg=00;36:*.ra=00;36:*.wav=00;36:*.oga=00;36:*.opus=00;36:*.spx=00;36:*.xspf=00;36:',
    'PATH': '/usr/local/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin',
    'PWD': '/home/snake',
    'PYTHONDONTWRITEBYTECODE': '1',
    'PYTHONUNBUFFERED': '1',
    'PYTHON_VERSION': '3.7.8',
    'SHLVL': '1',
    'TERM': 'xterm',
    '_': '/usr/local/bin/celery'}
result_backend: 'db+postgresql://docker:********@pg_db:5432/'
result_persistent: False
result_serializer: 'json'
task_serializer: 'json'
worker_concurrency: 16
deprecated_settings: None

Steps to Reproduce

Required Dependencies

  • Minimal Python Version: 3.7
  • Minimal Celery Version: 4.7.7
  • Minimal Kombu Version: 5.0.2
  • Minimal Broker Version: RabbitMQ
  • Minimal Result Backend Version: N/A
  • Minimal OS and/or Kernel Version: 4.15.0-20-generic celeryd startup crash #21-Ubuntu SMP
  • Minimal Broker Client Version: N/A or Unknown
  • Minimal Result Backend Client Version: N/A or Unknown

Python Packages

pip freeze Output:

alembic==1.5.5
amqp==5.0.5
Beaker==1.11.0
billiard==3.6.3.0
celery==5.0.5
certifi==2020.12.5
cffi==1.14.5
chardet==4.0.0
click==7.1.2
click-didyoumean==0.0.3
click-plugins==1.1.1
click-repl==0.1.6
cryptography==3.4.6
defusedxml==0.7.0
et-xmlfile==1.0.1
Flask==1.1.1
Flask-Migrate==2.5.3
Flask-pyoidc==3.4.0
Flask-SQLAlchemy==2.4.1
Flask-Testing==0.8.1
Flask-WTF==0.14.3
future==0.18.2
gunicorn==20.0.4
idna==2.10
importlib-metadata==3.7.0
importlib-resources==5.1.2
itsdangerous==1.1.0
jdcal==1.4.1
Jinja2==2.11.3
kombu==5.0.2
Mako==1.1.4
MarkupSafe==1.1.1
oic==1.1.2
openpyxl==3.0.3
prompt-toolkit==3.0.16
psycopg2==2.8.6
pycparser==2.20
pycryptodomex==3.10.1
pyjwkest==1.4.2
pyodbc==4.0.30
python-dateutil==2.8.1
python-editor==1.0.4
python-Levenshtein==0.12.2
pytz==2021.1
requests==2.25.1
six==1.15.0
SQLAlchemy==1.3.23
typing-extensions==3.7.4.3
urllib3==1.26.3
vine==5.0.0
wcwidth==0.2.5
Werkzeug==1.0.1
WTForms==2.3.3
zipp==3.4.1

Other Dependencies

N/A

Broker details

Listeners

Interface: [::], port: 15672, protocol: http, purpose: HTTP API
Interface: [::], port: 15692, protocol: http/prometheus, purpose: Prometheus exporter API over HTTP
Interface: [::], port: 25672, protocol: clustering, purpose: inter-node and CLI tool communication
Interface: [::], port: 5672, protocol: amqp, purpose: AMQP 0-9-1 and AMQP 1.0

Expected Behavior

.delay and .apply_async tasks are send to rabbitmq broker.

Actual Behavior

When I try to send my task to broker (RabbitMQ) it hangs.

# python shell
promise = foo.s(first_arg="2").apply_async()
# blocking indefinitely. I expected a promise object.

If I run the task synchronously it works as expected.

# python shell
promise = foo.s(first_arg="2").apply()
>>> hello argument 2

If I interrupt .apply_async() with ctrl+c I get a traceback with some clues:

Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/kombu/utils/functional.py", line 32, in __call__
    return self.__value__
AttributeError: 'ChannelPromise' object has no attribute '__value__'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/amqp/transport.py", line 173, in _connect
    host, port, family, socket.SOCK_STREAM, SOL_TCP)
  File "/usr/local/lib/python3.7/socket.py", line 752, in getaddrinfo
    for res in _socket.getaddrinfo(host, port, family, type, proto, flags):
socket.gaierror: [Errno -9] Address family for hostname not supported

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/kombu/utils/functional.py", line 325, in retry_over_time
    return fun(*args, **kwargs)
  File "/usr/local/lib/python3.7/site-packages/kombu/connection.py", line 866, in _connection_factory
    self._connection = self._establish_connection()
  File "/usr/local/lib/python3.7/site-packages/kombu/connection.py", line 801, in _establish_connection
    conn = self.transport.establish_connection()
  File "/usr/local/lib/python3.7/site-packages/kombu/transport/pyamqp.py", line 128, in establish_connection
    conn.connect()
  File "/usr/local/lib/python3.7/site-packages/amqp/connection.py", line 323, in connect
    self.transport.connect()
  File "/usr/local/lib/python3.7/site-packages/amqp/transport.py", line 113, in connect
    self._connect(self.host, self.port, self.connect_timeout)
  File "/usr/local/lib/python3.7/site-packages/amqp/transport.py", line 184, in _connect
    "failed to resolve broker hostname"))
  File "/usr/local/lib/python3.7/site-packages/amqp/transport.py", line 197, in _connect
    self.sock.connect(sa)
ConnectionRefusedError: [Errno 111] Connection refused

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/usr/local/lib/python3.7/site-packages/celery/canvas.py", line 225, in apply_async
    return _apply(args, kwargs, **options)
  File "/usr/local/lib/python3.7/site-packages/celery/app/task.py", line 565, in apply_async
    **options
  File "/usr/local/lib/python3.7/site-packages/celery/app/base.py", line 749, in send_task
    amqp.send_task_message(P, name, message, **options)
  File "/usr/local/lib/python3.7/site-packages/celery/app/amqp.py", line 532, in send_task_message
    **properties
  File "/usr/local/lib/python3.7/site-packages/kombu/messaging.py", line 178, in publish
    exchange_name, declare,
  File "/usr/local/lib/python3.7/site-packages/kombu/connection.py", line 525, in _ensured
    return fun(*args, **kwargs)
  File "/usr/local/lib/python3.7/site-packages/kombu/messaging.py", line 184, in _publish
    channel = self.channel
  File "/usr/local/lib/python3.7/site-packages/kombu/messaging.py", line 206, in _get_channel
    channel = self._channel = channel()
  File "/usr/local/lib/python3.7/site-packages/kombu/utils/functional.py", line 34, in __call__
    value = self.__value__ = self.__contract__()
  File "/usr/local/lib/python3.7/site-packages/kombu/messaging.py", line 221, in <lambda>
    channel = ChannelPromise(lambda: connection.default_channel)
  File "/usr/local/lib/python3.7/site-packages/kombu/connection.py", line 884, in default_channel
    self._ensure_connection(**conn_opts)
  File "/usr/local/lib/python3.7/site-packages/kombu/connection.py", line 439, in _ensure_connection
    callback, timeout=timeout
  File "/usr/local/lib/python3.7/site-packages/kombu/utils/functional.py", line 339, in retry_over_time
    sleep(1.0)

The broker connection string looks like this in the system:

~$ env | grep BROKER
CELERY_BROKER=pyamqp://guest@172.23.0.3//

The broker connection string in python:

# python shell
from src.celery import app
app.pool.connection
>>> Connection: amqp://guest:**@localhost:5672//

Before you suggest that RabbitMQ is not running, or the connection string is bad; my celery worker (consumer) process is able to connect with the same connection string.

-------------- celery@f9ab48fc6b63 v5.0.5 (singularity)
--- ***** -----
-- ******* ---- Linux-4.15.0-20-generic-x86_64-with-debian-9.12 2021-03-05 07:56:29
- *** --- * ---
- ** ---------- [config]
- ** ---------- .> app:         celery_statst_api:0x7f15b6de0450
- ** ---------- .> transport:   amqp://guest:**@my-rabbit:5672//
- ** ---------- .> results:     postgresql://docker:**@pg_db:5432/
- *** --- * --- .> concurrency: 16 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
 -------------- [queues]
                .> celery           exchange=celery(direct) key=celery


[tasks]
  . foo_task
  . (long list of tasks)

[2021-03-05 07:56:30,564: INFO/MainProcess] Connected to amqp://guest:**@my-rabbit:5672//
[2021-03-05 07:56:30,581: INFO/MainProcess] mingle: searching for neighbors
[2021-03-05 07:56:31,622: INFO/MainProcess] mingle: all alone
[2021-03-05 07:56:31,647: INFO/MainProcess] celery@f9ab48fc6b63 ready.

This is how I connect app/producer to the broker.
The file celeryconfig.py contains setup for broker url backend, concurrency, etc.

# celery_tasks.py
# imports...
app = Celery('celery_statst_api')
app.config_from_object(celeryconfig) # import config file

@app.task(name="foo")
def foo(first_arg: str) -> str:
    print(f"thanks for {first_arg}")
    return "OK"
@open-collective-bot
Copy link

Hey @Krogsager 👋,
Thank you for opening an issue. We will get back to you as soon as we can.
Also, check out our Open Collective and consider backing us - every little helps!

We also offer priority support for our sponsors.
If you require immediate assistance please consider sponsoring us.

@thedrow
Copy link
Member

thedrow commented Mar 5, 2021

It seems like we can't connect to the broker.
This doesn't strike me as a bug.

@Krogsager
Copy link
Author

Hi thedrow. I need some help debugging this - I cannot decipher the source code.
It might not be a bug per se, but the celery worker process can connect, and i can telnet to the broker at port 5672.

I debugged this line in the traceback

File "/usr/local/lib/python3.7/socket.py", line 752, in getaddrinfo
    for res in _socket.getaddrinfo(host, port, family, type, proto, flags):

and the host and port are '127.0.0.1' and 5672. Why then should the code throw socket.gaierror: [Errno -9]?
I would at least like to do a PR with some info for other developer on how to fix the problem.

@Krogsager
Copy link
Author

Kombu does not seem to be the exact problem. I'm running this simple script and the Broker receives the message, abeit as "Unruteable".

from kombu import Connection
from os import getenv
x = getenv('CELERY_BROKER')
conn = Connection(x)
conn.connect()
producer = conn.Producer()
y = producer.publish({'hello':'world'})
print(y.failed)

False

@thedrow
Copy link
Member

thedrow commented Mar 10, 2021

This is strange.
socket.gaierror: [Errno -9] means Address family for hostname not supported.
Since telnet is responding this may happen if something else is listening instead of RabbitMQ.
Can you please try to publish something that is routable with kombu and see what happens?

@Krogsager
Copy link
Author

Krogsager commented Mar 10, 2021

@thedrow I ran the sample code here and it ran perfectly. Created the video queue and posted a message to it.

from kombu import Connection, Exchange, Queue
from os import getenv

media_exchange = Exchange('media', 'direct', durable=True)
video_queue = Queue('video', exchange=media_exchange, routing_key='video')

def process_media(body, message):
    print(body)
    message.ack()

# connections
env_broker = getenv('CELERY_BROKER')
print(f"connect to {env_broker}")

with Connection(env_broker) as conn:

    # produce
    producer = conn.Producer(serializer='json')
    producer.publish({'name': '/tmp/lolcat1.avi', 'size': 1301013},
                      exchange=media_exchange, routing_key='video',
                      declare=[video_queue])

print("done.")

image

@thedrow
Copy link
Member

thedrow commented Mar 10, 2021

In the configuration you supplied in the original post me you're not pointing to localhost.
Are you sure there's a broker listening there?
Can you celery call a task using that configuration? What about using the rabbitmq docker container and mapping 5672 to a different port and trying that?

Can you try running that code example in a clean VM? Maybe it's your system that is misconfigured?

@Krogsager
Copy link
Author

Krogsager commented Mar 11, 2021

In the configuration you supplied in the original post me you're not pointing to localhost.

The system configuration is as such:
Container A: Celery producer and consumer.
Container B: RabbitMQ (official Docker image)
Container C: PostgreSQL backend (official Docker image).
The containers are on the same Docker network, where the broker is named my-rabbit and resolves to IP 172.23.0.3.

In my original post I wrote CELERY_BROKER=pyamqp://guest@172.23.0.3// but that was just for clarification. It has always been my-rabbit. (I did try to manually set the IP as well for testing, but that made no difference.)

I have noticed that the different components interpret the celery broker string differently.
I supply the environment variable pyamqp://guest@my-rabbit//
The worker process sees: amqp://guest:**@my-rabbit:5672//
The producer (app) sees amqp://guest:**@localhost:5672// <-- this stands out.
The Kombu snippet sees amqp://guest:**@my-rabbit:5672//

Are you sure there's a broker listening there?

I am sure that the broker is listening because my Kombu tests from yesterday (here and here) are running in container A. Not to mention the Celery worker process.

Can you celery call a task using that configuration?

I will get back to you ASAP.

@thedrow
Copy link
Member

thedrow commented Mar 11, 2021

That's strange and it may definitely be a bug.

@thedrow thedrow added this to the 5.1.0 milestone Mar 11, 2021
@thedrow thedrow added this to To do in Celery 5.1.0 via automation Mar 11, 2021
@Krogsager
Copy link
Author

Can you celery call a task using that configuration?

Yes, I can call a task from container A.

~$ celery call -a "[3]" foo_task
74c69e81-7903-4efe-be02-d864c68756bd

@thedrow
Copy link
Member

thedrow commented Mar 11, 2021

And from outside the container?

@Krogsager
Copy link
Author

Can you give an example?

@thedrow
Copy link
Member

thedrow commented Mar 11, 2021

The correct environment variable is not CELERY_BROKER but CELERY_BROKER_URL.

If there's a mistake in the documentation, please let us know.

@thedrow thedrow closed this as completed Mar 11, 2021
Celery 5.1.0 automation moved this from To do to Done Mar 11, 2021
@Krogsager
Copy link
Author

Krogsager commented Mar 11, 2021

@thedrow I disagree that this issue is invalid: I do not rely on the default variable CELERY_BROKER_URL. It is a coincidence that the two env vars looks so similar.
I instantiate the app like this:

#celery_tasks.py
# ...import
app = Celery('celery_statst_api')
app.config_from_object(celeryconfig) # import config file

and my config file looks like this:

#celeryconfig.py
from os import environ
broker = environ.get('CELERY_BROKER', 'default')
result_backend = 'db+postgresql://docker:************@pg_db:5432'

task_serializer= 'json'
result_serializer= 'json'
celery_accept_content = ['json']
# ...

@thedrow
Copy link
Member

thedrow commented Mar 11, 2021

Why do you need a special environment variable?
If you set the correct one, everything works right?

@thedrow
Copy link
Member

thedrow commented Mar 11, 2021

Also, it's 'broker_url'

@Krogsager
Copy link
Author

Krogsager commented Mar 11, 2021

That was the issue! A simple misconfiguration. I wrote broker instead of broker_url.
From a usability standpoint it is nice that the code defaults to amqp://guest:**@localhost:5672// when no broker url is set.
But I think it is too aggressive. Would you consider showing a warning when no broker is explicitely set or available to the producer?

@Krogsager
Copy link
Author

Why do you need a special environment variable?
If you set the correct one, everything works right?

Sometimes I test several systems in the same environment, and therefore I need to separate the run settings.

Krogsager added a commit to Krogsager/kombu that referenced this issue Mar 11, 2021
Based on my headaches with silent revert to `localhost` I submit this PR.
The developer should be notified if their host settings are not found.
Details on the issue are here: celery/celery#6661
thedrow pushed a commit to celery/kombu that referenced this issue Mar 15, 2021
Based on my headaches with silent revert to `localhost` I submit this PR.
The developer should be notified if their host settings are not found.
Details on the issue are here: celery/celery#6661
@thedrow
Copy link
Member

thedrow commented Apr 4, 2021

Would you consider showing a warning when no broker is explicitly set or available to the producer?

Yes, that would be nice.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
No open projects
Celery 5.1.0
  
Done
Development

No branches or pull requests

2 participants