You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
from celery import Celery
class celeryconfig:
broker_url = 'pyamqp://guest@localhost//'
task_publish_retry = False
app = Celery('tasks')
app.config_from_object(celeryconfig)
@app.task
def add(x, y):
return x + y
Step 2: shutdown broker
Step 3: from python prompt:
$ python
>>> from tasks import add
>>> add.delay(1,2)
Expected Behavior
Quit process due to exception: kombu.exceptions.OperationalError: [Errno 61] Connection refused
Actual Behavior
Wait INDEFINITELY
Analysis
at line 870 of celery/app/base.y, parameter transport_options will only cite calling parameter of calling _connection() & conf.broker_transport_options. Bothtask_publish_retry and task_publish_retry_policy are not included.
at line 1196 of celery/app/base.y, self._pool is generated for the first time by calling connection_for_write()without any parameter. So only conf.broker_transport_options will be used for initiaizing Connection instance of kombu, both task_publish_retry and task_publish_retry_policy will be ignored.
at line 559 of celery/app/amqp.py, both task_publish_retry and task_publish_retry_policy is passed to function publish() of kombu, but these two parameters will be dropped when referencing self.channel to create a connection channel at line 174 of kombu/messaging.py, which will make call to ChannelPromise(lambda: connection.default_channel) at line 224, and get result from Connection.default_channel.
at line 844 of kombu/connection.py we can find that default_channel() takes no parameter at all, and will only take the initialization parameters of Collection instance. As a result, settings from task_publish_retry and task_publish_retry_policy configuration are not included, because they're not part of instance initialization as mentioned above.
at line 383 of kombu/connection.py we can further inspect that the real value of parameter max_retries passed to function ensure_connection() is None. So it will repeat indefinitely.
For the same reason, the retry and retry_policy parameter when calling apply_async() or send_task() does not work, either. Because these parameter will not reach default_channel of kombu at all.
So either the code should be fixed to include task_publish_retry and task_publish_retry_policy in Connection instance initialization, or the instance variables of Connection should be modified when calling Producer.publish(), or the document should be modified to clarify that these two configurations will not work as intended.
Temporary Workaround
Use broker_transport_options instead of task_publish_retry/task_publish_retry_policy:
The conclusion is identical to my post, use broker_transport_options instead. This problem (or inconsistency of documentation) was reported back in 2018, but the situation still remains in v4.4.2.
The "Calling Task" section of document should really be modified to clarify that all task_publish_retry, task_publish_retry_policy, retry and retry_policy will not work as intended when connecting to broker for the first time.
The text was updated successfully, but these errors were encountered:
Checklist
master
branch.Mandatory Debugging Information
celery -A proj report
in the issue.master
branch.pip freeze
in the issue.Optional Debugging Information
ETA/Countdown & rate limits disabled.
and/or upgrading Celery and its dependencies.
Related Issues and Possible Duplicates
Related Issues
#2991
#2457
Possible Duplicates
#4296
Environment & Settings
celery report
Output:software -> celery:4.4.2 (cliffs) kombu:4.6.8 py:3.7.7
billiard:3.6.3.0 py-amqp:2.5.2
platform -> system:Darwin arch:64bit
kernel version:19.4.0 imp:CPython
loader -> celery.loaders.app.AppLoader
settings -> transport:pyamqp results:disabled
broker_url: 'amqp://guest:********@localhost:5672//'
task_publish_retry: False
Steps to Reproduce
Required Dependencies
Python Packages
pip freeze
Output:amqp==2.5.2
billiard==3.6.3.0
celery==4.4.2
docutils==0.16
importlib-metadata==1.6.0
kombu==4.6.8
lockfile==0.12.2
protobuf==3.11.4
python-daemon==2.2.4
pytz==2019.3
six==1.14.0
vine==1.3.0
zipp==3.1.0
Other Dependencies
N/A
Minimally Reproducible Test Case
Step 1: vi tasks.py
Step 2: shutdown broker
Step 3: from python prompt:
Expected Behavior
Quit process due to exception:
kombu.exceptions.OperationalError: [Errno 61] Connection refused
Actual Behavior
Wait INDEFINITELY
Analysis
at line 870 of
celery/app/base.y
, parametertransport_options
will only cite calling parameter ofcalling _connection()
&conf.broker_transport_options
. Bothtask_publish_retry
andtask_publish_retry_policy
are not included.at line 1196 of
celery/app/base.y
,self._pool
is generated for the first time by callingconnection_for_write()
without any parameter. So onlyconf.broker_transport_options
will be used for initiaizingConnection
instance ofkombu
, bothtask_publish_retry
andtask_publish_retry_policy
will be ignored.at line 559 of
celery/app/amqp.py
, bothtask_publish_retry
andtask_publish_retry_policy
is passed to functionpublish()
ofkombu
, but these two parameters will be dropped when referencingself.channel
to create a connection channel at line 174 ofkombu/messaging.py
, which will make call toChannelPromise(lambda: connection.default_channel)
at line 224, and get result fromConnection.default_channel
.at line 844 of
kombu/connection.py
we can find thatdefault_channel()
takes no parameter at all, and will only take the initialization parameters of Collection instance. As a result, settings fromtask_publish_retry
andtask_publish_retry_policy
configuration are not included, because they're not part of instance initialization as mentioned above.at line 383 of
kombu/connection.py
we can further inspect that the real value of parametermax_retries
passed to functionensure_connection()
isNone
. So it will repeat indefinitely.For the same reason, the
retry
andretry_policy
parameter when callingapply_async()
orsend_task()
does not work, either. Because these parameter will not reachdefault_channel
ofkombu
at all.So either the code should be fixed to include
task_publish_retry
andtask_publish_retry_policy
inConnection
instance initialization, or the instance variables ofConnection
should be modified when callingProducer.publish()
, or the document should be modified to clarify that these two configurations will not work as intended.Temporary Workaround
Use
broker_transport_options
instead oftask_publish_retry
/task_publish_retry_policy
:Didn't find a way to turn off retries, though.
[EDIT]
Add possible duplicate through Googling: #4296
The conclusion is identical to my post, use
broker_transport_options
instead. This problem (or inconsistency of documentation) was reported back in 2018, but the situation still remains in v4.4.2.The "Calling Task" section of document should really be modified to clarify that all
task_publish_retry
,task_publish_retry_policy
,retry
andretry_policy
will not work as intended when connecting to broker for the first time.The text was updated successfully, but these errors were encountered: