Skip to content
Permalink
master
Go to file
…ork (#5718)

* 'abstractproperty' is deprecated. Use 'property' with 'abstractmethod' instead

* Fix #2849 - Initial work of celery 5.0.0 alpha1 series by dropping python below 3.6 from marix & remove import from __future__ (#5684)

* initial work of celery 5.0.0 alpha-1 series by dropping python below 3.6

* i-5651(ut): add ut for ResultSet.join_native (#5679)

* dropped python versions below 3.6 from tox

* dropped python versions below 3.6 from travis

* dropped python versions below 3.6 from appveyor

* dropped python2 compat __future__ imports from tests

* Fixed a bug where canvases with a group and tasks in the middle followed by a group fails to complete and indefinitely hangs. (#5681)

Fixes #5512, fixes #5354, fixes #2573.

* dropped python2 compat __future__ imports from celery

* dropped python2 compat code from init

* revert readme change about version

* removed python 2 object inheritance (#5687)

* removed python 2 object inheritance

* removed python 2 object inheritance

* removed python 2 compatibility decorator (#5689)

* removed python 2 compatibility decorator

* removed python 2 compatibility decorator

* removed python 2 compatibility decorator

* removed python 2 compatibility decorator

* Remove unused imports.

* Remove unused imports of python_2_unicode_compatible.

Also removed leftover useage of them where they were still used.

* Run pyupgrade on codebase (#5726)

* Run pyupgrade on codebase.

* Use format strings where possible.

* pyupgrade examples.

* pyupgrade on celerydocs extension.

* pyupgrade on updated code.

* Address code review comments.

* Address code review comments.

* Remove unused imports.

* Fix indentation.

* Address code review comments.

* Fix syntax error.

* Fix syntax error.

* Fix syntax error.

* pytest 5.x for celery 5 (#5791)

* Port latest changes from master to v5-dev (#5942)

* Fix serialization and deserialization of nested exception classes (#5717)

* Fix #5597: chain priority (#5759)

* adding `worker_process_shutdown` to __all__ (#5762)

* Fix typo (#5769)

* Reformat code.

* Simplify commands to looking for celery worker processes (#5778)

* update doc- celery supports storage list. (#5776)

* Update introduction.rst

* Update introduction.rst

* Fail xfailed tests if the failure is unexpected.

* Added integration coverage for link_error (#5373)

* Added coverage for link_error.

* Use pytest-rerunfailed plugin instead of rolling our own custom implementation.

* Added link_error with retries. This currently fails.

* Remove unused import.

* Fix import on Python 2.7.

* retries in link_error do not hang the worker anymore.

* Run error callbacks eagerly when the task itself is run eagerly.

Fixes #4899.

* Adjust unit tests accordingly.

* Grammar in documentation (#5780)

* Grammar in documentation

* Address review.

* pypy 7.2 matrix (#5790)

* removed extra slashes in CELERY_BROKER_URL (#5792)

The Celery broker URL in settings.py had 2 slashes in the end which are not required and can be misleading.
so I changed :-
CELERY_BROKER_URL = 'amqp://guest:guest@localhost//' to CELERY_BROKER_URL = 'amqp://guest:guest@localhost'

* Fix #5772 task_default_exchange & task_default_exchange_type not work (#5773)

* Fix #5772 task_default_exchange & task_default_exchange_type not work

* Add unit test: test_setting_default_exchange

* Move default_exchange test to standalone class

* Run integration suite with memcached results backend. (#5739)

* Fix hanging forever when fetching results from a group(chain(group)) canvas. (#5744)

PR #5739 uncovered multiple problems with the cache backend.
This PR should resolve one of them.

PR #5638 fixed the same test case for our async results backends that support native join.
However, it did not fix the test case for sync results backends that support native join.

* Fix regression in PR #5681. (#5753)

See comment in the diff for details.

* Grammatical fix to CONTRIBUTING.rst doc (#5794)

* Fix #5734 Celery does not consider authMechanism on mongodb backend URLs (#5795)

* Fix #5734 Celery does not consider authMechanism on mongodb backend URLs

* Add unit test: test_get_connection_with_authmechanism

* Add unit test: test_get_connection_with_authmechanism_no_username

* Fix errors in Python 2.7

Remove "," after "**" operator

* Revert "Revert "Revert "Added handle of SIGTERM in BaseTask in celery/task.py to prevent kill the task" (#5577)" (#5586)" (#5797)

This reverts commit f79894e0a2c7156fd0ca5e8e3b652b6a46a7e8e7.

* Add Python 3.8 Support (#5785)

* Added Python 3.8 to the build matrix.

* Ensure a supported tblib version is installed for Python 3.8 and above.

In addition, modernize the relevant tests.

* Workaround patching problem in test.

* py 3.8 in clasifier

* ubuntu bionic (#5799)

* ubuntu bionic

* fast finish

* sync bumversion with pypi release

* Dev.req (#5803)

* update  docker config



* undo hardpin

* devr req install from github master

* update  docker config (#5801)

* update  docker config

* make dockerfile to install from github master dev branch by default

* update download link

* Isort.

* Grammatical & punctuation fixes for CONTRIBUTING.rst document (#5804)

* update dockerfile

* switched to ubuntu bionic

* update docker

* keep it empty until we reconfigure it again with autopep8

* Fixed Dockerfile (#5809)

* Update document CONTRIBUTING.rst & fix Dockerfile typo (#5813)

* Added an issue template for minor releases.

* reference gocelery Go Client/Server for Celery (#5815)

* Add enterprise language (#5818)

* Fix/correct minor doc typos (#5825)

* Correct a small typo

* Correct bad contributing documentation links

* Preserve the task priority in case of a retry (#5820)

* Preserve the task priority in case of a retry

* Created test case for retried tasks with priority

* Implement an integration test for retried tasks with priorities

* bump kombu

* basic changelog for celery 4.4.0rc4

* bump celery 4.4.0rc4

* events bootstep disabled if no events (#5807)

* events bootstep disabled if no events

* Added unit tests.

* update bug report template

* fixing ascii art to look nicer (#5831)

* Only rerun flaky tests when failures can be intermediary.

* Rename Changelog to Changelog.rst

* The test_nested_group_chain test can run without native_join support. (#5838)

* Run integration tests with Cassandra (#5834)

* Run integration tests with Cassandra.

* Configure cassandra result backend

* Pre-create keyspace and table

* Fix deprecation warning.

* Fix path to cqlsh.

* Increase connection timeout.

* Wait until the cluster is available.

* SQS - Reject on failure (#5843)

* reject on failure

* add documentation

* test fix

* test fix

* test fix

* Add a concurrency model with ThreadPoolExecutor (#5099)

* Add a concurrency model with ThreadPoolExecutor

* thread model test for pypy

* Chain primitive's code example fix in canvas documentation (Regression PR#4444) (#5845)

* Changed multi-line string (#5846)

This string wasn't rendering properly and was printing the python statement too. Although the change isn't as pretty code-wise, it gets rid of an annoyance for the user.

* Add auto expiry for DynamoDB backend (#5805)

* Add auto expiry for DynamoDB backend

This adds auto-expire support for the DynamoDB backend, via the DynamoDB
Time to Live feature.

* Require boto3>=1.9.178 for DynamoDB TTL support

boto3 version 1.9.178 requires botocore>=1.12.178.

botocore version 1.12.178 introduces support for the DynamoDB
UpdateTimeToLive call.

The UpdateTimeToLive call is used by the DynamoDB backend to enable TTL
support on a newly created table.

* Separate TTL handling from table creation

Handle TTL enabling/disabling separately from the table get-or-create
function.

Improve handling of cases where the TTL is already set to the desired
state.

DynamoDB only allows a single TTL update action within a fairly long
time window, so some problematic cases (changing the TTL attribute,
enabling/disabling TTL when it was recently modified) will raise
exceptions that have to be dealt with.

* Handle older boto3 versions

If the boto3 TTL methods are not found, log an informative error. If the
user wants to enable TTL, raise an exception; if TTL should be disabled,
simply return.

* Improve logging

- Handle exceptions by logging the error and re-raising

- Log (level debug) when the desired TTL state is already in place

* Add and use _has_ttl() convenience method

Additional changes:

- Handle exceptions when calling boto3's describe_time_to_live()

- Fix test cases for missing TTL methods

* Update ttl_seconds documentation

* Log invalid TTL; catch and raise ValueError

* Separate method _get_table_ttl_description

* Separate ttl method validation function

* Clarify tri-state TTL value

* Improve test coverage

* Fix minor typo in comment

* Mark test as xfail when using the cache backend. (#5851)

* [Fix #5436] Store extending result in all backends (#5661)

* [Fix #5436] Store extending result in all backends

* Fix sqlalchemy

* More fixu

* Fixing tests

* removing not necessary import

* Removing debug code

* Removing debug code

* Add tests for get_result_meta in base and database

* Revert "Add auto expiry for DynamoDB backend (#5805)" (#5855)

This reverts commit f7f5bcfceca692d0e78c742a7c09c424f53d915b.

* Revert "Mark test as xfail when using the cache backend. (#5851)" (#5854)

This reverts commit 1b303c2968836245aaa43c3d0ff9249dd8bf9ed2.

* docs: Document Redis commands used by celery (#5853)

* remove cache back end integrtion test. (#5856)

* Fix a race condition when publishing a very large chord header (#5850)

* Added a test case which artificially introduces a delay to group.save().

* Fix race condition by delaying the task only after saving the group.

* update tox

* Remove duplicate boto dependency. (#5858)

* Revert "remove cache back end integrtion test. (#5856)" (#5859)

This reverts commit e0ac7a19a745dd5a52a615c1330bd67f2cef4d00.

* Revert "Revert "Add auto expiry for DynamoDB backend (#5805)" (#5855)" (#5857)

This reverts commit 4ddc605392d7694760f23069c34ede34b3e582c3.

* Revert "update tox"

This reverts commit 49427f51049073e38439ea9b3413978784a24999.

* Fix the test_simple_chord_with_a_delay_in_group_save test.

* Revert "Revert "Skip unsupported canvas when using the cache backend"" (#5860)

* Revert "Revert "Mark test as xfail when using the cache backend. (#5851)" (#5854)"

This reverts commit fc101c61c1912c4dafa661981f8b865c011e8a55.

* Make the xfail condition stricter.

* Fix the xfail condition.

* Linters should use Python 3.8.

* Move pypy unit tests to the correct stage.

* Temporarily allow PyPy to fail since it is unavailable in Travis.

* Remove unused variables.

* Fix unused imports.

* Fix pydocstyle errors in dynamodb.

* Fix pydocstyle errors in redis backend.

* bump kombu to 4.6.7

* celery 4.4.0rc5 changelog

* celery 4.4.0rc5

* rm redundant code (#5864)

* isort.

* Document the threads task pool in the CLI.

* Removed the paragraph about using librabbitmq. Refer to #5872 (#5873)

* Task class definitions can have retry attributes (#5869)

* autoretry_for
* retry_kwargs
* retry_backoff
* retry_backoff_max
* retry_jitter
can now be defined as cls attributes.

All of these can be overriden from the @task decorator

https://github.com/celery/celery/issues/4684

* whatsnew in Celery 4.4 as per projects standard (#5817)

* 4.4 whatsnew

* update

* update

* Move old whatsnew to history.

* Remove old news & fix markers.

* Added a section notifying Python 3.4 has been dropped.

* Added a note about ElasticSearch basic auth.

* Added a note about being able to replace eagerly run tasks.

* Update index.

* Address comment.

* Described boto3 version updates.

* Fix heading.

* More news.

* Thread pool.

* Add Django and Config changes

* Bump version 4.4.0

* upate readme

* Update docs regarding Redis Message Priorities (#5874)

* Update docs regarding Redis Message Priorities

* fixup! Update docs regarding Redis Message Priorities

* Update 4.4.0 docs (#5875)

* Update 4.4 release changelog

* Update whatsnew-4.4

* Update tasks docs

* Fix recent tasks doc file update (#5879)

* Include renamed Changelog.rst in source releases. (#5880)

Changelog.rst was renamed from Changelog in
fd023ec174bedc2dc65c63a0dc7c85e425ac00c6 but MANIFEST.in was not updated to
include the new name. This fixes the file name so Changelog.rst will show up
in future source releases again.

* Reorganised project_urls and classifiers. (#5884)

* Use safequote in SQS Getting Started doc (#5885)

* Have appveyor build relevant versions of Python. (#5887)

* Have appveyor build relevant and buildable versions of Python.

* Appveyor is missing CI requirements to build.

* Pin pycurl to version that will build with appveyor (because wheels files exist)

* Restrict python 2.7 64 bit version of python-dateutil for parse.

* Use is_alive instead of isAlive for Python 3.9 compatibility. (#5898)

* Very minor tweak to commen to improve docs (#5900)

As discussed here: 
https://stackoverflow.com/questions/58816271/celery-task-asyncresult-takes-task-id-but-is-documented-to-get-asyncresult-inst
this comment seems to flow to a very confusing and misleading piece of documentation here:
https://docs.celeryproject.org/en/latest/reference/celery.app.task.html#celery.app.task.Task.AsyncResult

* Support configuring schema of a PostgreSQL database (#5910)

* Support configuring schema of a PostgreSQL database

* Add unit test

* Remove blank line

* Fix raise issue to make exception message more friendly (#5912)

Signed-off-by: Chenyang Yan <memory.yancy@gmail.com>

* Add progress for retry connections (#5915)

This will show current retry progress so it will clear confusion about how many retries will be tried for connecting to broker.
Closes #4556

* chg: change xrange to range (#5926)

* update docs for json serializer and add note for int keys serialization (#5932)

* fix indentation for note block in calling.rst (#5933)

* Added links to other issue trackers. (#5939)

* Add labels automatically for issues. (#5938)

* Run pyupgrade.

Co-authored-by: Michal Čihař <michal@cihar.com>
Co-authored-by: ptitpoulpe <ptitpoulpe@ptitpoulpe.fr>
Co-authored-by: Didi Bar-Zev <didi@hiredscore.com>
Co-authored-by: Santos Solorzano <santosjavier22@gmail.com>
Co-authored-by: manlix <manlix@yandex.ru>
Co-authored-by: Jimmy <54828848+sckhg1367@users.noreply.github.com>
Co-authored-by: Борис Верховский <boris.verk@gmail.com>
Co-authored-by: Asif Saif Uddin <auvipy@gmail.com>
Co-authored-by: Jainal Gosaliya <jainal09gosalia@gmail.com>
Co-authored-by: gsfish <caoyu97@hotmail.com>
Co-authored-by: Dipankar Achinta <di.punk.car19@gmail.com>
Co-authored-by: Pengjie Song (宋鹏捷) <spengjie@sina.com>
Co-authored-by: Chris Griffin <chris-griffin@users.noreply.github.com>
Co-authored-by: Muhammad Hewedy <mhewedy@gmail.com>
Co-authored-by: Blaine Bublitz <blaine.bublitz@gmail.com>
Co-authored-by: Tamu <tamsanh@gmail.com>
Co-authored-by: Erik Tews <erik@datenzone.de>
Co-authored-by: abhinav nilaratna <anilaratna2@bloomberg.net>
Co-authored-by: Wyatt Paul <wpaul@hearsaycorp.com>
Co-authored-by: gal cohen <gal.nevis@gmail.com>
Co-authored-by: as <alfred@huji.fr>
Co-authored-by: Param Kapur <paramkapur2002@gmail.com>
Co-authored-by: Sven Ulland <sven.ulland@gmail.com>
Co-authored-by: Safwan Rahman <safwan.rahman15@gmail.com>
Co-authored-by: Aissaoui Anouar <tobia@crossbone.cc>
Co-authored-by: Neal Wang <qdzzyb2015@gmail.com>
Co-authored-by: Alireza Amouzadeh <alireza@amouzadeh.net>
Co-authored-by: Marcos Moyano <marcos@anue.biz>
Co-authored-by: Stepan Henek <stepan+github@henek.name>
Co-authored-by: Andrew Sklyarov <AndrewPix@users.noreply.github.com>
Co-authored-by: Michael Fladischer <michael@fladi.at>
Co-authored-by: Dejan Lekic <dejan.lekic@gmail.com>
Co-authored-by: Yannick Schuchmann <yannick.schuchmann@googlemail.com>
Co-authored-by: Matt Davis <matteius@gmail.com>
Co-authored-by: Karthikeyan Singaravelan <tir.karthi@gmail.com>
Co-authored-by: Bernd Wechner <bernd-wechner@users.noreply.github.com>
Co-authored-by: Sören Oldag <soeren_oldag@freenet.de>
Co-authored-by: uddmorningsun <memory.yancy@gmail.com>
Co-authored-by: Amar Fadil <34912365+marfgold1@users.noreply.github.com>
Co-authored-by: woodenrobot <woodenrobot1993@gmail.com>
Co-authored-by: Sardorbek Imomaliev <sardorbek.imomaliev@gmail.com>

* Remove fallback code for Python 2 support marked with TODOs. (#5953)

Co-authored-by: Asif Saif Uddin <auvipy@gmail.com>

* Remove PY3 conditionals (#5954)

* Added integration coverage for link_error (#5373)

* Added coverage for link_error.

* Use pytest-rerunfailed plugin instead of rolling our own custom implementation.

* Added link_error with retries. This currently fails.

* Remove unused import.

* Fix import on Python 2.7.

* retries in link_error do not hang the worker anymore.

* Run error callbacks eagerly when the task itself is run eagerly.

Fixes #4899.

* Adjust unit tests accordingly.

* Grammar in documentation (#5780)

* Grammar in documentation

* Address review.

* pypy 7.2 matrix (#5790)

* removed extra slashes in CELERY_BROKER_URL (#5792)

The Celery broker URL in settings.py had 2 slashes in the end which are not required and can be misleading.
so I changed :-
CELERY_BROKER_URL = 'amqp://guest:guest@localhost//' to CELERY_BROKER_URL = 'amqp://guest:guest@localhost'

* Fix #5772 task_default_exchange & task_default_exchange_type not work (#5773)

* Fix #5772 task_default_exchange & task_default_exchange_type not work

* Add unit test: test_setting_default_exchange

* Move default_exchange test to standalone class

* Run integration suite with memcached results backend. (#5739)

* Fix hanging forever when fetching results from a group(chain(group)) canvas. (#5744)

PR #5739 uncovered multiple problems with the cache backend.
This PR should resolve one of them.

PR #5638 fixed the same test case for our async results backends that support native join.
However, it did not fix the test case for sync results backends that support native join.

* Fix regression in PR #5681. (#5753)

See comment in the diff for details.

* Grammatical fix to CONTRIBUTING.rst doc (#5794)

* Fix #5734 Celery does not consider authMechanism on mongodb backend URLs (#5795)

* Fix #5734 Celery does not consider authMechanism on mongodb backend URLs

* Add unit test: test_get_connection_with_authmechanism

* Add unit test: test_get_connection_with_authmechanism_no_username

* Fix errors in Python 2.7

Remove "," after "**" operator

* Revert "Revert "Revert "Added handle of SIGTERM in BaseTask in celery/task.py to prevent kill the task" (#5577)" (#5586)" (#5797)

This reverts commit f79894e0a2c7156fd0ca5e8e3b652b6a46a7e8e7.

* Add Python 3.8 Support (#5785)

* Added Python 3.8 to the build matrix.

* Ensure a supported tblib version is installed for Python 3.8 and above.

In addition, modernize the relevant tests.

* Workaround patching problem in test.

* py 3.8 in clasifier

* ubuntu bionic (#5799)

* ubuntu bionic

* fast finish

* sync bumversion with pypi release

* Dev.req (#5803)

* update  docker config



* undo hardpin

* devr req install from github master

* update  docker config (#5801)

* update  docker config

* make dockerfile to install from github master dev branch by default

* update download link

* Isort.

* Grammatical & punctuation fixes for CONTRIBUTING.rst document (#5804)

* update dockerfile

* switched to ubuntu bionic

* update docker

* keep it empty until we reconfigure it again with autopep8

* Fixed Dockerfile (#5809)

* Update document CONTRIBUTING.rst & fix Dockerfile typo (#5813)

* Added an issue template for minor releases.

* reference gocelery Go Client/Server for Celery (#5815)

* Add enterprise language (#5818)

* Fix/correct minor doc typos (#5825)

* Correct a small typo

* Correct bad contributing documentation links

* Preserve the task priority in case of a retry (#5820)

* Preserve the task priority in case of a retry

* Created test case for retried tasks with priority

* Implement an integration test for retried tasks with priorities

* bump kombu

* basic changelog for celery 4.4.0rc4

* bump celery 4.4.0rc4

* events bootstep disabled if no events (#5807)

* events bootstep disabled if no events

* Added unit tests.

* update bug report template

* fixing ascii art to look nicer (#5831)

* Only rerun flaky tests when failures can be intermediary.

* Rename Changelog to Changelog.rst

* The test_nested_group_chain test can run without native_join support. (#5838)

* Run integration tests with Cassandra (#5834)

* Run integration tests with Cassandra.

* Configure cassandra result backend

* Pre-create keyspace and table

* Fix deprecation warning.

* Fix path to cqlsh.

* Increase connection timeout.

* Wait until the cluster is available.

* SQS - Reject on failure (#5843)

* reject on failure

* add documentation

* test fix

* test fix

* test fix

* Add a concurrency model with ThreadPoolExecutor (#5099)

* Add a concurrency model with ThreadPoolExecutor

* thread model test for pypy

* Chain primitive's code example fix in canvas documentation (Regression PR#4444) (#5845)

* Changed multi-line string (#5846)

This string wasn't rendering properly and was printing the python statement too. Although the change isn't as pretty code-wise, it gets rid of an annoyance for the user.

* Add auto expiry for DynamoDB backend (#5805)

* Add auto expiry for DynamoDB backend

This adds auto-expire support for the DynamoDB backend, via the DynamoDB
Time to Live feature.

* Require boto3>=1.9.178 for DynamoDB TTL support

boto3 version 1.9.178 requires botocore>=1.12.178.

botocore version 1.12.178 introduces support for the DynamoDB
UpdateTimeToLive call.

The UpdateTimeToLive call is used by the DynamoDB backend to enable TTL
support on a newly created table.

* Separate TTL handling from table creation

Handle TTL enabling/disabling separately from the table get-or-create
function.

Improve handling of cases where the TTL is already set to the desired
state.

DynamoDB only allows a single TTL update action within a fairly long
time window, so some problematic cases (changing the TTL attribute,
enabling/disabling TTL when it was recently modified) will raise
exceptions that have to be dealt with.

* Handle older boto3 versions

If the boto3 TTL methods are not found, log an informative error. If the
user wants to enable TTL, raise an exception; if TTL should be disabled,
simply return.

* Improve logging

- Handle exceptions by logging the error and re-raising

- Log (level debug) when the desired TTL state is already in place

* Add and use _has_ttl() convenience method

Additional changes:

- Handle exceptions when calling boto3's describe_time_to_live()

- Fix test cases for missing TTL methods

* Update ttl_seconds documentation

* Log invalid TTL; catch and raise ValueError

* Separate method _get_table_ttl_description

* Separate ttl method validation function

* Clarify tri-state TTL value

* Improve test coverage

* Fix minor typo in comment

* Mark test as xfail when using the cache backend. (#5851)

* [Fix #5436] Store extending result in all backends (#5661)

* [Fix #5436] Store extending result in all backends

* Fix sqlalchemy

* More fixu

* Fixing tests

* removing not necessary import

* Removing debug code

* Removing debug code

* Add tests for get_result_meta in base and database

* Revert "Add auto expiry for DynamoDB backend (#5805)" (#5855)

This reverts commit f7f5bcfceca692d0e78c742a7c09c424f53d915b.

* Revert "Mark test as xfail when using the cache backend. (#5851)" (#5854)

This reverts commit 1b303c2968836245aaa43c3d0ff9249dd8bf9ed2.

* docs: Document Redis commands used by celery (#5853)

* remove cache back end integrtion test. (#5856)

* Fix a race condition when publishing a very large chord header (#5850)

* Added a test case which artificially introduces a delay to group.save().

* Fix race condition by delaying the task only after saving the group.

* update tox

* Remove duplicate boto dependency. (#5858)

* Revert "remove cache back end integrtion test. (#5856)" (#5859)

This reverts commit e0ac7a19a745dd5a52a615c1330bd67f2cef4d00.

* Revert "Revert "Add auto expiry for DynamoDB backend (#5805)" (#5855)" (#5857)

This reverts commit 4ddc605392d7694760f23069c34ede34b3e582c3.

* Revert "update tox"

This reverts commit 49427f51049073e38439ea9b3413978784a24999.

* Fix the test_simple_chord_with_a_delay_in_group_save test.

* Revert "Revert "Skip unsupported canvas when using the cache backend"" (#5860)

* Revert "Revert "Mark test as xfail when using the cache backend. (#5851)" (#5854)"

This reverts commit fc101c61c1912c4dafa661981f8b865c011e8a55.

* Make the xfail condition stricter.

* Fix the xfail condition.

* Linters should use Python 3.8.

* Move pypy unit tests to the correct stage.

* Temporarily allow PyPy to fail since it is unavailable in Travis.

* Remove unused variables.

* Fix unused imports.

* Fix pydocstyle errors in dynamodb.

* Fix pydocstyle errors in redis backend.

* bump kombu to 4.6.7

* celery 4.4.0rc5 changelog

* celery 4.4.0rc5

* rm redundant code (#5864)

* isort.

* Document the threads task pool in the CLI.

* Removed the paragraph about using librabbitmq. Refer to #5872 (#5873)

* Task class definitions can have retry attributes (#5869)

* autoretry_for
* retry_kwargs
* retry_backoff
* retry_backoff_max
* retry_jitter
can now be defined as cls attributes.

All of these can be overriden from the @task decorator

https://github.com/celery/celery/issues/4684

* whatsnew in Celery 4.4 as per projects standard (#5817)

* 4.4 whatsnew

* update

* update

* Move old whatsnew to history.

* Remove old news & fix markers.

* Added a section notifying Python 3.4 has been dropped.

* Added a note about ElasticSearch basic auth.

* Added a note about being able to replace eagerly run tasks.

* Update index.

* Address comment.

* Described boto3 version updates.

* Fix heading.

* More news.

* Thread pool.

* Add Django and Config changes

* Bump version 4.4.0

* upate readme

* Update docs regarding Redis Message Priorities (#5874)

* Update docs regarding Redis Message Priorities

* fixup! Update docs regarding Redis Message Priorities

* Update 4.4.0 docs (#5875)

* Update 4.4 release changelog

* Update whatsnew-4.4

* Update tasks docs

* Fix recent tasks doc file update (#5879)

* Include renamed Changelog.rst in source releases. (#5880)

Changelog.rst was renamed from Changelog in
fd023ec174bedc2dc65c63a0dc7c85e425ac00c6 but MANIFEST.in was not updated to
include the new name. This fixes the file name so Changelog.rst will show up
in future source releases again.

* Reorganised project_urls and classifiers. (#5884)

* Use safequote in SQS Getting Started doc (#5885)

* Have appveyor build relevant versions of Python. (#5887)

* Have appveyor build relevant and buildable versions of Python.

* Appveyor is missing CI requirements to build.

* Pin pycurl to version that will build with appveyor (because wheels files exist)

* Restrict python 2.7 64 bit version of python-dateutil for parse.

* Use is_alive instead of isAlive for Python 3.9 compatibility. (#5898)

* Very minor tweak to commen to improve docs (#5900)

As discussed here: 
https://stackoverflow.com/questions/58816271/celery-task-asyncresult-takes-task-id-but-is-documented-to-get-asyncresult-inst
this comment seems to flow to a very confusing and misleading piece of documentation here:
https://docs.celeryproject.org/en/latest/reference/celery.app.task.html#celery.app.task.Task.AsyncResult

* Support configuring schema of a PostgreSQL database (#5910)

* Support configuring schema of a PostgreSQL database

* Add unit test

* Remove blank line

* Fix raise issue to make exception message more friendly (#5912)

Signed-off-by: Chenyang Yan <memory.yancy@gmail.com>

* Add progress for retry connections (#5915)

This will show current retry progress so it will clear confusion about how many retries will be tried for connecting to broker.
Closes #4556

* chg: change xrange to range (#5926)

* update docs for json serializer and add note for int keys serialization (#5932)

* fix indentation for note block in calling.rst (#5933)

* Added links to other issue trackers. (#5939)

* Add labels automatically for issues. (#5938)

* remove redundant raise from docstring (#5941)

`throw` is True by default so the Retry exception will already get raised by calling `self.retry(countdown=60 * 5, exc=exc)`

* Run pyupgrade.

* Fix typo (#5943)

* Remove fallback code for Python 2 support.

* docs: fixes Rabbits and Warrens link in routing userguide (#4007) (#5949)

* Fix labels on Github issue templates. (#5955)

Use quotation marks to escape labels on Github issue templates. This
prevents the colon from breaking the template.

* added retry_on_timeout and socket_keepalive to config and doc (#5952)

* Fixed event capture from building infinite list (#5870)

* Fix error propagation example (#5966)

* update range (#5971)

* update setup.cfg

* bump billiard to 3.6.3.0

* Update __init__.py (#5951)

* Update __init__.py

Fixed issue for object with result_backend=True (decode fails on multiple None request)

* Update __init__.py

suggested changeds

* Update __init__.py

* Use configured db schema also for sequences (#5972)

* Added a default value for retries in worker.strategy. (#5945)

* Added a default value for retries in worker.strategy.

I was facing an issue when adding tasks directly to rabbitmq
using pika instead of calling task.apply_async. The issue was
the self.retry mechanisum was failing. In app/tasks.py the line
`retries = request.retries + 1` was causing the issue. On further
tracing I figured out that it was because the default .get value
(None) was getting passed through this function and was raising
TypeError: unsupported operand type(s) for +: 'NoneType' and 'int'

* Add test cases for default and custom retries value

* pypy 7.3 (#5980)

* Pass `interval` to `get_many` (#5931)

* Pass `interval` to `get_many`

* Fix: Syntax error for py2.7

* Fix: Syntax error for py2.7

* Fixed problem with conflicting autoretry_for task parameter and Task.replace() (#5934)

* Fix #5917 (#5918)

* Fix changelog (#5881)

* merge in place the apps beat schedule in the default Schedule class. (#5908)

* Handle Redis connection errors in result consumer (#5921)

* Handle Redis connection errors in result consumer

* Closes #5919.

* Use context manager for Redis conusmer reconnect

* Log error when result backend reconnection fails

* Fix inspect_command documentation (#5983)

* Use gevent and eventlet wait() functions to remove busy-wait (#5974)

* Use gevent and eventlet wait() functions to remove busy-wait

Fixes issue #4999.

Calling AsyncResult.get() in a gevent context would cause the async Drainer to repeatedly call wait_for until the result was completed.  I've updated the code to have a specific implementation for gevent and eventlet that will cause wait_for to only return every "timeout" # of seconds, rather than repeatedly returning.

Some things I'd like some feedback on:
* Where's the best place to add test coverage for this?  It doesn't look like there are any tests that directly exercised the Drainer yet so I would probably look to add some of these to the backends/ unit tests.
* The way I did this for the Eventlet interface was to rely on the private _exit_event member of the GreenThread instance; to do this without relying on a private member would require some additional changes to the backend Drainer interface so that we could wait for an eventlet-specific event in wait_for().  I can do this, just wanted to get some feedback before.

* Add unit tests for Drainer classes

In order for this to work without monkeypatching in the tests, I needed to call sleep(0) to let the gevent/eventlet greenlets to yield control back to the calling thread.  I also made the check interval configurable in the drainer so that we didn't need to sleep multiples of 1 second in the tests.

* Weaken asserts since they don't pass on CI

* Fix eventlet auto-patching DNS resolver module on import

By default it looks like "import eventlet" imports the greendns module unless the environment EVENTLET_NO_GREENDNS is set to true.  This broke a pymongo test.

* Add tests ensuring that the greenlet loop isn't blocked

These tests make sure that while drain_events_until is running that other gevent/eventlet concurrency can run.

* Clean up tests and make sure they wait for all the threads to stop

* Fix chords with chained groups (#5947)

* kombu 4.6.8

* update setup

* updated version 4.4.1

* Fix: Accept and swallow `kwargs` to handle unexpected keyword arguments

* Allow boto to look for credentials in S3Backend

* add reference to Rusty Celery

* Update document of revoke method in Control class

* Fix copy-paste error in result_compression docs

* Make 'socket_keepalive' optional variable (#6000)

* update connection params - socket_keepalive is optional now

* update readme - added versionadded 4.4.1 and fixed `redis_socket_keepalive`

* added check of socket_keepalive in arguments for UnixSocketConnect

* Fixed incorrect setting name in documentation (#6002)

* updated version 4.4.2

* Fix backend utf-8 encoding in s3 backend

Celery backend uses utf-8 to deserialize results,
which would fail for some serializations like pickle.

* Fix typo in celery.bin.multi document

* Upgraded pycurl to the latest version that supports wheel.

* pytest 5.3.5 max

* Add uptime to the stats inspect command

* Doc tweaks: mostly grammar and punctuation (#6016)

* Fix a bunch of comma splices in the docs

* Remove some unnecessary words from next-steps doc

* Tweak awkward wording; fix bad em-dash

* Fix a bunch more comma splices in next-steps doc

* Miscellaneous grammar/punctuation/wording fixes

* Link to task options in task decorator docs

* Fixing issue #6019: unable to use mysql SSL parameters when getting mysql engine (#6020)

* Fixing issue #6019: unable to use mysql SSL parametes in create_engine()

* adding test for get_engine when self.forked is False and engine args are passed in for create_engine()

* Clean TraceBack to reduce memory leaks for exception task (#6024)

* Clean TraceBack to reduce memory leaks

* add unit test

* add unit test

* reject unittest

* Patch For Python 2.7 compatibility

* update unittest

* Register to the garbage collector by explicitly referring to f_locals.

* need more check

* update code coverage

* update Missing unit test

* 3.4 -> 3.5

Co-authored-by: heedong.jung <heedong.jung@samsung.com>

* exceptions: NotRegistered: fix up language

Minor fix to the language.

* Note about autodiscover_tasks and periodic tasks

This is particularly important for Django projects that put periodic tasks into each app's `tasks.py` and want to use one as a periodic task.  By the time `autodiscover_tasks()` loads those tasks, the `on_after_configure` Signal has already come and gone, so anything decorated with `@app.on_after_finalize.connect` will never be called.

If there's other documentation on this subject, I could not find it.

* Avoid PyModulelevel, deprecated in Sphinx 4

Use `PyFunction` instead of `PyModulelevel` to avoid this
deprecation warning:

    RemovedInSphinx40Warning: PyModulelevel is deprecated.
    Please check the implementation of
    <class 'celery.contrib.sphinx.TaskDirective'>

This replacement is one of the options listed in the Sphinx docs
(https://www.sphinx-doc.org/en/master/extdev/deprecated.html).

* Give up sending a worker-offline message if transport is not connected (#6039)

* If worker-offline event fails to send, give up and die peacefully

* Add test for retry= and msgs in heartbeat

* Fix the build and all documentation warnings.

I finally upgraded our theme to 2.0.
As a result we've upgraded Sphinx to 2.0.
Work to upgrade Sphinx to 3.0 will proceed in a different PR.

This upgrade also fixes our build issues caused by #6032.
We don't support Sphinx 1.x as a result of that patch.

I've also included the missing 4.3 changelog to our history.

* Support both Sphinx 2 and 3.

* Add Task to __all__ in celery.__init__.py

* Add missing parenthesis to example in docs

* Ensure a single chain object in a chain does not raise MaximumRecursionError.

Previously chain([chain(sig)]) would crash.
We now ensure it doesn't.

Fixes #5973.

* update setup.py

* fix typo

missing quote at the end of line

* Fix a typo in monitoring doc

* update travis

* update ubuntu to focal foss

20.04 LTS

* Fix autoscale when prefetch_multiplier is 1

* Allow start_worker to function without ping task

* Update celeryd.conf

Move the directory of the program before the execution of the command/script

* Add documentation for "predefined_queue_urls"

* [Fix #6074]: Add missing documentation for MongoDB as result backend.

* update funding

* 🐛 Correctly handle configuring the serializer for always_eager mode. (#6079)

* 🐛 Correctly handle configuring the serializer for always_eager mode.

options['serializer'] will always exist, because it is initialized from an mattrgetter. Even if unset, it will be present in the options with a value of None.

* 🐛 Add a test for new always_eager + task_serializer behavior.

* ✏️ Whoops missed a :

* Remove doubling of prefetch_count increase when prefetch_multiplier gt 1 (#6081)

* try ubuntu focal (#6088)

* Fix eager function not returning result after retries.

Using apply function does not return correct results after at least one
retry because the return value of successive call is not going back to
the original caller.

* return retry result if not throw and is_eager

if throw is false, we would be interested by the result of retry and
not the current result which will be an exception.

This way it does not break the logic of `raise self.retry`

This should be used like `return self.retry(..., throw=False)` in an
except statement.

* revert formatting change

* Add tests for eager retry without throw

* update predefined-queues documentation

Suggested version of configuration does not work. 
Additionally I'd like to mention, that `access_key_id` and `secret_access_key` are mandatory fields and not allowing you to go with defaults AWS_* env variables.
I can contribute for this variables to be optional

Also I'm not sure if security token will apply, could you please advice how to do it?

* Fix couchbase version < 3.0.0 as API changed

* Remove reference to -O fair in optimizations

-O fair was made the default in Celery 4.0
https://docs.celeryproject.org/en/stable/history/whatsnew-4.0.html#ofair-is-now-the-default-scheduling-strategy

* pytest ranges

* pypy3

* revert to bionic

* do not load docs.txt requirements for python 2.7

As it requires Sphinx >= 2.0.0 and there is no such version compatible
with python 2.7

* update cassandra travis integration test configuration

cassandra:latest docker image changed location of cqlsh program

* pin cassandra-driver

CI get stuck after all cassandra integration tests

* Fix all flake8 lint errors

* Fix all pydocstyle lint errors

* Fix all configcheck lint errors

* Always requeue while worker lost regardless of the redelivered flag (#6103)

* #5598 fix, always redelivery while WorkerLostError

* fix, change the requeue flag so the task will remain PENDING

* Allow relative paths in the filesystem backend (#6070)

* Allow relative paths in the filesystem backend

* fix order of if statements

* [Fixed Issue #6017]
--> Added Multi default logfiles and pidfiles paths

[Description]:
--> Changed the default paths for log files & pid files to be '/var/log/celery' and '/var/run/celery'
--> Handled by creating the respective paths if not exist.
--> Used os.makedir(path,if_exists=True)

[Unit Test Added]:
--> .travis.yml - config updated with 'before install'.
--> t/unit/apps/test_multi.py - Changed the default log files & pid files paths wherever required.

* Avoid race condition due to task duplication.

In some circumstances like a network partitioning, some tasks might
be duplicated. Sometimes, this lead to a race condition where a lost
task overwrites the result of the last successful task in the backend.

In order to avoid this race condition we prevent updating the result if
it's already in successful state.

This fix has been done for KV backends only and therefore won't work
with other backends.

* adding tests

* Exceptions must be old-style classes or derived from BaseException,
but here self.result may not subclass of BaseException.

* update fund link

* Fix windows build (#6104)

* do not load memcache nor couchbase lib during windows build

those libraries depends on native libraries libcouchbase and libmemcached
that are not installed on Appveyor.
As only unit tests runs on Appveyor, it should be fine

* Add python 3.8 workaround for app trap

* skip tests file_descriptor_safety tests on windows

AsyncPool is not supported on Windows so Pool does have _fileno_to_outq
attribute, making the test fail

* Fix crossplatform log and pid files in multi mode

it relates to #6017

* Use tox to build and test on windows

* remove tox_install_command

* drop python 2.7 from windows build

* Add encode to meta task in base.py (#5894)

* Add encode to base.py meta result

Fix bug with impossibility to load None from task meta

* Add tests for None. Remove exceed encode.

* Update base.py

Add return payload if None

* Update time.py to solve the microsecond issues (#5199)

When `relative` is set to True, the day, hour, minutes second will be round to the nearest one, however, the original program do not update the microsecond (reset it). As a result, the run-time offset on the microsecond will then be accumulated. 
For example, given the interval is 15s and relative is set to True
1.    2018-11-27T15:01:30.123236+08:00
2.    2018-11-27T15:01:45.372687+08:00
3.    2018-11-27T15:02:00.712601+08:00
4.    2018-11-27T15:02:15.987720+08:00
5.    2018-11-27T15:02:31.023670+08:00

* Change backend _ensure_not_eager error to warning

* Add priority support for 'celery.chord_unlock' task (#5766)

* Change eager retry behaviour

even with raise self.retry, it should return the eventual value
or MaxRetriesExceededError.
if return value of eager apply is Retry exception, retry eagerly
the task signature

* Order supported Python versions

* Avoid race condition in elasticsearch backend

if a task is retried, the task retry may work concurrently to current task.
store_result may come out of order.
it may cause a non ready state (Retry) to override a ready state (Success, Failure).
If this happens, it will block indefinitely pending any chord depending on this task.

this change makes document updates safe for concurrent writes.

https://www.elastic.co/guide/en/elasticsearch/reference/current/optimistic-concurrency-control.html

* backends base get_many pass READY_STATES arg

* test backends base get_many pass READY_STATES arg

* Add integration tests for Elasticsearch and fix _update

* Revert "revert to bionic"

This reverts commit 6e091573f2ab0d0989b8d7c26b677c80377c1721.

* remove jython check

* feat(backend): Adds cleanup to ArangoDB backend

* Delete Document Known Issue with CONN_MAX_AGE in 4.3

* issue 6108 fix filesystem backend cannot not be serialized by picked (#6120)

* issue 6108 fix filesystem backend cannot not be serialized by picked

https://github.com/celery/celery/issues/6108

* issue-6108 fix unit test failure

* issue-6108 fix flake8 warning

Co-authored-by: Murphy Meng <mmeng@mirriad.com>

* kombu==4.6.9 (#6133)

* changelog for 4.4.3

* v 4.4.3

* remove un supported classifier

* Fix autoretry_for with explicit retry (#6138)

* Add tests for eager task retry

* Fixes #6135

If autoretry_for is set too broad on Exception, then autoretry may get a Retry
if that's the case, rethrow directly instead of wrapping it in another Retry
to avoid loosing new args

* Use Django DB max age connection setting (fixes #4116)

* Add retry on recoverable exception for the backend (#6122)

* Add state to KeyValueStoreBackend.set method

This way, a backend implementation is able to take decisions based on
current state to store meta in case of failures.

* Add retry on recoverable exception for the backend

acks.late makes celery acknowledge messages only after processing and
storing result on the backend.

However, in case of backend unreachable, it will shadow a Retry
exception and put the task as failed in the backend not retrying the
task and acknoledging it on the broker.

With this new result_backend_always_retry setting, if the backend
exception is recoverable (to be defined per backend implementation),
it will retry the backend operation with an exponential backoff.

* Make elasticsearch backward compatible with 6.x

* Make ES retry storing updates in a better way

if existing value in the backend is success, then do nothing.
if it is a ready status, then update it only if new value is a ready status as well.
else update it.

This way, a SUCCESS cannot be overriden so that we do not loose
results but any ready state other than success (FAILURE, REVOKED) can
be overriden by another ready status (i.e. a SUCCESS)

* Add test for value not found in ES backend

* Fix random distribution of jitter for exponential backoff

random.randrange should be called with the actual so that all numbers
have equivalent probability, otherwise maximum value does have a way
higher probability of occuring.

* fix unit test if extra modules are not present

* ElasticSearch: add setting to save meta as json

* fix #6136. celery 4.4.3 always trying create /var/run/celery directory (#6142)

* fix #6136. celery 4.4.3 always trying create /var/run/celery directory, even if it's not needed.

* fix #6136. cleanup

* Add task_internal_error signal (#6049)

* Add internal_error signal

There is no special signal for an out of body error which can be the
result of a bad result backend.

* Fix syntax error.

* Document the task_internal_error signal.

Co-authored-by: Laurentiu Dragan <ldragan@bloomberg.net>

* changelog for v4.4.4

* kombu 4.6.10 (#6144)

* v4.4.4

* Add missing dependency on future (#6146)

Fixes #6145

* ElasticSearch: Retry index if document was deleted between index and update (#6140)

* ElasticSearch: Retry index if document was deleted between index and update

* Elasticsearch increase coverage to 100%

* Fix pydocstyle

* Specify minimum version of Sphinx for Celery extension (#6150)

The Sphinx extension requires Sphinx 2 or later due to #6032.

* fix windows build

* fix flake8 error

* fix multi tests in local

Mock os.mkdir and os.makedirs to avoid creating /var/run/celery and
/var/log/celery during unit tests if run without root priviledges

* Customize the retry interval of chord_unlock tasks

* changelog v4.4.5

* v4.4.5

* Fix typo in comment.

* Remove autoscale force_scale methods (#6085)

* Remove autoscale force_scale methods

* Remove unused variable in test

* Pass ping destination to request

The destination argument worked fine from CLI but didn't get used when calling ping from Python.

* Fix autoscale test

* chord: merge init options with run options

* put back KeyValueStoreBackend.set method without state

It turns out it was breaking some other projects.
wrapping set method with _set_with_state, this way it will not break existing Backend.
while enabling this feature for other Backend.

Currently, only ElasticsearchBackend supports this feature.

It protects concurrent update to corrupt state in the backend.
Existing success cannot be overriden, nor a ready state by a non ready state.
i.e. a Retry state cannot override a Success or Failure.
As a result, chord_unlock task will not loop forever due to missing ready state on the backend.

* added --range-prefix option to `celery multi` (#6180)

* added --range-prefix option to `celery multi`

Added option for overriding default range prefix when running
multiple workers prividing range with `celery multy` command.

* covered multi --range-prefix with tests

* fixed --range-prefix test

* Added as_list function to AsyncResult class (#6179)

* Add as_list method to return task IDs as a list

* Add a test for as_list method

* Add docstring for as_list method

* Fix CassandraBackend error in threads or gevent pool (#6147)

* Fix CassandraBackend error in threads or gevent pool
        * remove CassandraBackend.process_cleanup

* Add test case

* Add test case

* Add comments test_as_uri

Co-authored-by: baixue <baixue@wecash.net>

* changelog for v4.4.6

* v4.4.6

* Update Wiki link in "resources"

In the page linked below, the link to wiki is outdated. Fixed that. 

https://docs.celeryproject.org/en/stable/getting-started/resources.html

* test_canvas: Add test for chord-in-chain

Add test case for the issue where a chord in a chain does not
work when using .apply(). This works fine with .apply_async().

* Trying to fix flaky tests in ci

* fix pydocstyle errors

* fix pydocstyle

* Drainer tests, put a lower constraint on number of intervals

liveness should iterate 10 times per interval while drain_events only
once. However, as it may use thread that may be scheduled out of
order, we may end up in some situation where liveness and drain_events
were called the same amount of time.

Lowering the constraint from < to <= to avoid failing the tests.

* pyupgrade.

* Fix merge error.

Co-authored-by: Борис Верховский <boris.verk@gmail.com>
Co-authored-by: Asif Saif Uddin <auvipy@gmail.com>
Co-authored-by: Jainal Gosaliya <jainal09gosalia@gmail.com>
Co-authored-by: gsfish <root@grassfish.net>
Co-authored-by: Dipankar Achinta <di.punk.car19@gmail.com>
Co-authored-by: spengjie <spengjie@sina.com>
Co-authored-by: Chris Griffin <chris-griffin@users.noreply.github.com>
Co-authored-by: Muhammad Hewedy <mhewedy@gmail.com>
Co-authored-by: Blaine Bublitz <blaine.bublitz@gmail.com>
Co-authored-by: Tamu <tamsanh@gmail.com>
Co-authored-by: Erik Tews <erik@datenzone.de>
Co-authored-by: abhinav nilaratna <anilaratna2@bloomberg.net>
Co-authored-by: Wyatt Paul <wpaul@hearsaycorp.com>
Co-authored-by: gal cohen <gal.nevis@gmail.com>
Co-authored-by: whuji <alfred@huji.fr>
Co-authored-by: Param Kapur <paramkapur2002@gmail.com>
Co-authored-by: Sven Ulland <sven.ulland@gmail.com>
Co-authored-by: Safwan Rahman <safwan.rahman15@gmail.com>
Co-authored-by: Aissaoui Anouar <tobia@crossbone.cc>
Co-authored-by: Neal Wang <qdzzyb2015@gmail.com>
Co-authored-by: Alireza Amouzadeh <alireza@amouzadeh.net>
Co-authored-by: Marcos Moyano <marcos@anue.biz>
Co-authored-by: Stepan Henek <stepan+github@henek.name>
Co-authored-by: Andrew Sklyarov <AndrewPix@users.noreply.github.com>
Co-authored-by: Michael Fladischer <michael@fladi.at>
Co-authored-by: Dejan Lekic <dejan.lekic@gmail.com>
Co-authored-by: Yannick Schuchmann <yannick.schuchmann@googlemail.com>
Co-authored-by: Matt Davis <matteius@gmail.com>
Co-authored-by: Xtreak <tir.karthi@gmail.com>
Co-authored-by: Bernd Wechner <bernd-wechner@users.noreply.github.com>
Co-authored-by: Sören Oldag <soeren_oldag@freenet.de>
Co-authored-by: uddmorningsun <memory.yancy@gmail.com>
Co-authored-by: Amar Fadil <34912365+marfgold1@users.noreply.github.com>
Co-authored-by: woodenrobot <woodenrobot1993@gmail.com>
Co-authored-by: Sardorbek Imomaliev <sardorbek.imomaliev@gmail.com>
Co-authored-by: Alex Riina <alex.riina@gmail.com>
Co-authored-by: Joon Hwan 김준환 <xncbf12@gmail.com>
Co-authored-by: Prabakaran Kumaresshan <k_prabakaran+github@hotmail.com>
Co-authored-by: Martey Dodoo <martey@mobolic.com>
Co-authored-by: Konstantin Seleznev <4374093+Seleznev-nvkz@users.noreply.github.com>
Co-authored-by: Prodge <Prodge@users.noreply.github.com>
Co-authored-by: Abdelhadi Dyouri <raryat@gmail.com>
Co-authored-by: Ixiodor <Ixiodor@users.noreply.github.com>
Co-authored-by: abhishekakamai <47558404+abhishekakamai@users.noreply.github.com>
Co-authored-by: Allan Lei <allanlei@helveticode.com>
Co-authored-by: M1ha Shvn <work_shvein_mihail@mail.ru>
Co-authored-by: Salih Caglar Ispirli <caglarispirli@gmail.com>
Co-authored-by: Micha Moskovic <michamos@gmail.com>
Co-authored-by: Chris Burr <chrisburr@users.noreply.github.com>
Co-authored-by: Dave King <tildedave@gmail.com>
Co-authored-by: Dmitry Nikulin <v.dmitry.nikulin@gmail.com>
Co-authored-by: Michael Gaddis <mgaddis@ancestry.com>
Co-authored-by: epwalsh <epwalsh10@gmail.com>
Co-authored-by: TalRoni <tal.inon.16@gmail.com>
Co-authored-by: Leo Singer <leo.singer@ligo.org>
Co-authored-by: Stephen Tomkinson <neonbunny@users.noreply.github.com>
Co-authored-by: Abhishek <abhishek.shukla@xeneta.com>
Co-authored-by: theirix <theirix@gmail.com>
Co-authored-by: yukihira1992 <hirayama@cancerscan.jp>
Co-authored-by: jpays <jerome.pays@cnp.fr>
Co-authored-by: Greg Ward <greg@gerg.ca>
Co-authored-by: Alexa Griffith <agriffith@bluecore.com>
Co-authored-by: heedong <63043496+heedong-jung@users.noreply.github.com>
Co-authored-by: heedong.jung <heedong.jung@samsung.com>
Co-authored-by: Shreyansh Khajanchi <shreyanshk@users.noreply.github.com>
Co-authored-by: Sam Thompson <georgedorn@users.noreply.github.com>
Co-authored-by: Alphadelta14 <alpha@pokesplash.net>
Co-authored-by: Azimjon Pulatov <azimjohn@yahoo.com>
Co-authored-by: ysde <ysde108@gmail.com>
Co-authored-by: AmirMohammad Ziaei <amir_zia@outlook.com>
Co-authored-by: Ben Nadler <ben.nadler@gusto.com>
Co-authored-by: Harald Nezbeda <hn@nezhar.com>
Co-authored-by: Chris Frisina <github@specialorange.org>
Co-authored-by: Adam Eijdenberg <adam.eijdenberg@digital.gov.au>
Co-authored-by: rafaelreuber <rafaelreuber@gmail.com>
Co-authored-by: Noah Kantrowitz <noah@coderanger.net>
Co-authored-by: Ben Nadler <jbennadler@gmail.com>
Co-authored-by: Clement Michaud <c.michaud@criteo.com>
Co-authored-by: Mathieu Chataigner <m.chataigner@criteo.com>
Co-authored-by: eugeneyalansky <65346459+eugeneyalansky@users.noreply.github.com>
Co-authored-by: Leonard Lu <leonard@socialcodeinc.com>
Co-authored-by: XinYang <yangxinhust@hotmail.com>
Co-authored-by: Ingolf Becker <ingolf.becker@googlemail.com>
Co-authored-by: Anuj Chauhan <an12ch98@gmail.com>
Co-authored-by: shaoziwei <ziwei.shao@cloudchef.io>
Co-authored-by: Mathieu Chataigner <mathieu.chataigner@gmail.com>
Co-authored-by: Anakael <shemeldima2@yandex.ru>
Co-authored-by: Danny Chan <danny.chan@c-k.dev>
Co-authored-by: Sebastiaan ten Pas <sebastiaan@diggimedia.nl>
Co-authored-by: David TILLOY <d.tilloy@criteo.com>
Co-authored-by: Anthony N. Simon <anthonynsimon@users.noreply.github.com>
Co-authored-by: lironhl <liron.lavy@gmail.com>
Co-authored-by: Raphael Cohen <raphael.cohen@sekoia.fr>
Co-authored-by: JaeyoungHeo <jay.jaeyoung@gmail.com>
Co-authored-by: singlaive <singlaive@gmail.com>
Co-authored-by: Murphy Meng <mmeng@mirriad.com>
Co-authored-by: Wu Haotian <whtsky@gmail.com>
Co-authored-by: Kwist <velnik@gmail.com>
Co-authored-by: Laurentiu Dragan <ldragan@bloomberg.net>
Co-authored-by: Michal Čihař <michal@cihar.com>
Co-authored-by: Radim Sückr <radim.suckr@gmail.com>
Co-authored-by: Artem Vasilyev <artem.v.vasilyev@gmail.com>
Co-authored-by: kakakikikeke-fork <kakakikikeke_new@yahoo.co.jp>
Co-authored-by: Pysaoke <pysaoke@gmail.com>
Co-authored-by: baixue <baixue@wecash.net>
Co-authored-by: Prashant Sinha <prashantsinha@outlook.com>
Co-authored-by: AbdealiJK <abdealikothari@gmail.com>

* Remove Python 2 compatibility code from Celery (#6221)

* Remove five from celery/__init__.py

* Remove five from celery/beat.py

* Remove five from celery/bootsteps.py

* Remove five from celery/exceptions.py

* Remove five from celery/local.py

* Remove five from celery/platforms.py

* Remove five from celery/result.py

* Remove five from celery/schedules.py

* Remove five from celery/app/amqp.py

* Remove five from celery/app/annotations.py

* Remove five from celery/app/backends.py

* Remove five from celery/app/base.py

* Remove five from celery/app/control.py

* Remove five from celery/app/defaults.py

* Remove five from celery/app/log.py

* Remove five from celery/app/registry.py

* Remove five from celery/app/routes.py

* Remove five from celery/app/task.py

* Remove five from celery/app/trace.py

* Remove five from celery/app/utils.py

* Remove five from celery/apps/beat.py

* Remove five from celery/apps/multi.py

* Remove five from celery/apps/worker.py

* Remove five from celery/backends/database/__init__.py

* Remove five from celery/backends/amqp.py

* Remove five from celery/backends/asynchronous.py

* Remove five from celery/backends/base.py

* Remove five from celery/backends/dynamodb.py

* Remove five from celery/backends/elasticsearch.py

* Remove five from celery/backends/mongodb.py

* Remove five from celery/backends/redis.py

* Remove five from celery/backends/rpc.py

* Remove five from celery/concurrency/asynpool.py

* Remove five from celery/concurrency/base.py

* Remove five from celery/concurrency/prefork.py

* Remove five from celery/contrib/testing/manager.py

* Remove five from celery/contrib/migrate.py

* Remove five from celery/contrib/rdb.py

* Remove five from celery/events/cursesmon.py

* Remove five from celery/events/dispatcher.py

* Remove five from celery/events/state.py

* Remove five from celery/loaders/base.py

* Remove five from celery/security/certificate.py

* Remove five from celery/security/utils.py

* Remove five from celery/task/base.py

* Remove five from celery/utils/dispatch/signal.py

* Remove five from celery/utils/abstract.py

* Remove five from celery/utils/collections.py

* Remove five from celery/utils/debug.py

* Remove five from celery/utils/functional.py

* Remove five from celery/utils/graph.py

* Remove five from celery/utils/imports.py

* Remove five from celery/utils/log.py

* Remove five from celery/utils/saferepr.py

* Remove five from celery/utils/serialization.py

* Remove five from celery/utils/term.py

* Remove five from celery/utils/text.py

* Remove five from celery/utils/threads.py

* Remove five from celery/utils/time.py

* Remove five from celery/utils/timer2.py

* Remove five from celery/consumer/consumer.py

* Remove five from celery/consumer/gossip.py

* Remove five from celery/consumer/mingle.py

* Remove five from celery/worker/autoscale.py

* Remove five from celery/worker/components.py

* Remove five from celery/worker/control.py

* Remove five from celery/worker/request.py

* Remove five from celery/worker/state.py

* Remove five from celery/worker/worker.py

* Remove five from celery/t/benchmarks/bench_worker.py

* Remove five from celery/t/integration/test_canvas.py

* Remove five from celery/t/unit/app

* Remove five from celery/t/unit/backends

* Remove five from celery/t/unit/compat_modules

* Remove five from celery/t/unit/concurrency

* Remove five from celery/t/unit/contrib

* Remove five from celery/t/unit/events

* Remove five from celery/t/unit/security

* Remove five from celery/t/unit/tasks

* Remove five from celery/t/unit/utils

* Remove five from celery/t/unit/worker

* Sort imports.

* Comment out PyPy for now.

* Remove flakeplus.

* Happify linter.

* Fix merge problems.

* Delete backport.

* Remove unused import.

* Remove logic that notifies user that the Python version isn't supported from setup.py.

pip already does that for us.

* Add a trove classifier to indicate Celery only supports Python 3.

* Restore usage of `reraise` for consistency with the kombu port.

* Drop Python 2 compatibility code from our Sphinx extension.

* Remove mention of flakeplus from tox.ini.

* Remove mention of flakeplus from our CONTRIBUTING guide.

* Bump Sphinx requirement.

* Remove Python 2 compatibility code from our custom Sphinx extension.

* Resolve Sphinx warning due to removed section in 32ff7b45aa3d78aedca61b6554a9db39122924fd.

* Remove pydocstyle from build matrix as it was removed from master.

See #6278.

* Bump version: 4.4.7 → 5.0.0-alpha1

* Final touches.

* Fix README.

* Bump Kombu to 5.0.0.

* Bump version: 5.0.0-alpha1 → 5.0.0a2

* Fix wrong version.

* Remove autodoc for removed module.

* Remove documentation for removed methods.

* Remove the riak backend since riak is no longer maintained.

* Remove riak backend since riak is no longer maintained.

* Start fresh.

* Added all arguments for the celery worker command.

Still needs more documentation and improvements...

* Load the application and execute a worker.

* Added the rest of the global options.

If an app is not specified we now use the default app.

In addition, we now exit with the correct status code.

* Extract validation into parameter types.

* Restructure and document.

* Allow to pass worker configuration options from command line.

* Implement the beat command.

* Allow to configure celery options through the CLI.

* Implement the `celery call` command.

* Implement the `celery list bindings` command.

* Implement the `celery purge` command.

* Implement the `celery result` command.

* Implement the `celery migrate` task.

* Implemented the celery@thedrow: OK

1 node online. command.

* Take --no-color in consideration when outputting to stdout.

* Ensure `celery worker` takes `--no-color` into consideration.

* Use the preformatted OK string.

* Adopt the NO_COLOR standard.

See https://no-color.org/ for details.

* Split commands into separate files.

* Added 'did you mean' messages.

* Implement the `celery events` command.

* Text style should take --no-color into consideration as well.

* Implement the basic `celery inspect` command.

* Improve UI.

* Organize the code.

* Implement the `celery graph bootsteps` command.

* Implement the `celery graph workers` command.

* Implement the `celery upgrade settings` command.

* Implement the `celery report` command.

* Delete former unit tests.

* Implement the `celery logtool` command.

* Pass the quiet argument to the CLI context.

* Limit inspect to existing actions.

* Implement the `celery control` command.

* Basic scaffold for the `celery amqp` shell command.

* Start implementing the shell commands.

* Implement basic.publish and basic.get.

* Echo OK after acknowledgement.

* Reformat Code.

* Implement the exchange.declare command.

* Implement the exchange.delete command.

* Implement the queue.bind command.

* Implement the queue.declare command.

* Implement the queue.delete command.

* Echo queue.declare result to screen.

* Echo queue.delete result to screen.

* Implement the queue.purge command.

* Fix color support for error().

* Report errors and continue.

* Handle connection errors and reconnect on error.

* Refactor.

* Implement the `celery shell` command.

* Isort.

* Add documentation.

* Correct argument types.

* Implement detach for `celery worker`.

* Documentation.

* Implement detach for `celery beat`.

* Documentation.

* Implement the `celery multi` command.

* Documentation.

* Implement user options.

* Collect command actions from the correct registry.

* Isort.

* Fix access to app.

* Match arguments for control.

* Start fres…
155 contributors

Users who have contributed to this file

@ask @thedrow @mchataigner @Prodge @Ixiodor @gsfish @nadflinn @auvipy @AndrewPix @Seleznev-nvkz @M1ha-Shvn @marcosmoyano
1240 lines (1026 sloc) 44.6 KB
"""Actual App instance implementation."""
import inspect
import os
import threading
import warnings
from collections import UserDict, defaultdict, deque
from datetime import datetime
from operator import attrgetter
from kombu import pools
from kombu.clocks import LamportClock
from kombu.common import oid_from
from kombu.utils.compat import register_after_fork
from kombu.utils.objects import cached_property
from kombu.utils.uuid import uuid
from vine import starpromise
from celery import platforms, signals
from celery._state import (_announce_app_finalized, _deregister_app,
_register_app, _set_current_app, _task_stack,
connect_on_app_finalize, get_current_app,
get_current_worker_task, set_default_app)
from celery.exceptions import AlwaysEagerIgnored, ImproperlyConfigured
from celery.loaders import get_loader_cls
from celery.local import PromiseProxy, maybe_evaluate
from celery.utils import abstract
from celery.utils.collections import AttributeDictMixin
from celery.utils.dispatch import Signal
from celery.utils.functional import first, head_from_fun, maybe_list
from celery.utils.imports import gen_task_name, instantiate, symbol_by_name
from celery.utils.log import get_logger
from celery.utils.objects import FallbackContext, mro_lookup
from celery.utils.time import timezone, to_utc
from . import backends
# Load all builtin tasks
from . import builtins # noqa
from .annotations import prepare as prepare_annotations
from .autoretry import add_autoretry_behaviour
from .defaults import DEFAULT_SECURITY_DIGEST, find_deprecated_settings
from .registry import TaskRegistry
from .utils import (AppPickler, Settings, _new_key_to_old, _old_key_to_new,
_unpickle_app, _unpickle_app_v2, appstr, bugreport,
detect_settings)
__all__ = ('Celery',)
logger = get_logger(__name__)
BUILTIN_FIXUPS = {
'celery.fixups.django:fixup',
}
USING_EXECV = os.environ.get('FORKED_BY_MULTIPROCESSING')
ERR_ENVVAR_NOT_SET = """
The environment variable {0!r} is not set,
and as such the configuration could not be loaded.
Please set this variable and make sure it points to
a valid configuration module.
Example:
{0}="proj.celeryconfig"
"""
def app_has_custom(app, attr):
"""Return true if app has customized method `attr`.
Note:
This is used for optimizations in cases where we know
how the default behavior works, but need to account
for someone using inheritance to override a method/property.
"""
return mro_lookup(app.__class__, attr, stop={Celery, object},
monkey_patched=[__name__])
def _unpickle_appattr(reverse_name, args):
"""Unpickle app."""
# Given an attribute name and a list of args, gets
# the attribute from the current app and calls it.
return get_current_app()._rgetattr(reverse_name)(*args)
def _after_fork_cleanup_app(app):
# This is used with multiprocessing.register_after_fork,
# so need to be at module level.
try:
app._after_fork()
except Exception as exc: # pylint: disable=broad-except
logger.info('after forker raised exception: %r', exc, exc_info=1)
class PendingConfiguration(UserDict, AttributeDictMixin):
# `app.conf` will be of this type before being explicitly configured,
# meaning the app can keep any configuration set directly
# on `app.conf` before the `app.config_from_object` call.
#
# accessing any key will finalize the configuration,
# replacing `app.conf` with a concrete settings object.
callback = None
_data = None
def __init__(self, conf, callback):
object.__setattr__(self, '_data', conf)
object.__setattr__(self, 'callback', callback)
def __setitem__(self, key, value):
self._data[key] = value
def clear(self):
self._data.clear()
def update(self, *args, **kwargs):
self._data.update(*args, **kwargs)
def setdefault(self, *args, **kwargs):
return self._data.setdefault(*args, **kwargs)
def __contains__(self, key):
# XXX will not show finalized configuration
# setdefault will cause `key in d` to happen,
# so for setdefault to be lazy, so does contains.
return key in self._data
def __len__(self):
return len(self.data)
def __repr__(self):
return repr(self.data)
@cached_property
def data(self):
return self.callback()
class Celery:
"""Celery application.
Arguments:
main (str): Name of the main module if running as `__main__`.
This is used as the prefix for auto-generated task names.
Keyword Arguments:
broker (str): URL of the default broker used.
backend (Union[str, Type[celery.backends.base.Backend]]):
The result store backend class, or the name of the backend
class to use.
Default is the value of the :setting:`result_backend` setting.
autofinalize (bool): If set to False a :exc:`RuntimeError`
will be raised if the task registry or tasks are used before
the app is finalized.
set_as_current (bool): Make this the global current app.
include (List[str]): List of modules every worker should import.
amqp (Union[str, Type[AMQP]]): AMQP object or class name.
events (Union[str, Type[celery.app.events.Events]]): Events object or
class name.
log (Union[str, Type[Logging]]): Log object or class name.
control (Union[str, Type[celery.app.control.Control]]): Control object
or class name.
tasks (Union[str, Type[TaskRegistry]]): A task registry, or the name of
a registry class.
fixups (List[str]): List of fix-up plug-ins (e.g., see
:mod:`celery.fixups.django`).
config_source (Union[str, class]): Take configuration from a class,
or object. Attributes may include any settings described in
the documentation.
task_cls (Union[str, Type[celery.app.task.Task]]): base task class to
use. See :ref:`this section <custom-task-cls-app-wide>` for usage.
"""
#: This is deprecated, use :meth:`reduce_keys` instead
Pickler = AppPickler
SYSTEM = platforms.SYSTEM
IS_macOS, IS_WINDOWS = platforms.IS_macOS, platforms.IS_WINDOWS
#: Name of the `__main__` module. Required for standalone scripts.
#:
#: If set this will be used instead of `__main__` when automatically
#: generating task names.
main = None
#: Custom options for command-line programs.
#: See :ref:`extending-commandoptions`
user_options = None
#: Custom bootsteps to extend and modify the worker.
#: See :ref:`extending-bootsteps`.
steps = None
builtin_fixups = BUILTIN_FIXUPS
amqp_cls = 'celery.app.amqp:AMQP'
backend_cls = None
events_cls = 'celery.app.events:Events'
loader_cls = None
log_cls = 'celery.app.log:Logging'
control_cls = 'celery.app.control:Control'
task_cls = 'celery.app.task:Task'
registry_cls = 'celery.app.registry:TaskRegistry'
_fixups = None
_pool = None
_conf = None
_after_fork_registered = False
#: Signal sent when app is loading configuration.
on_configure = None
#: Signal sent after app has prepared the configuration.
on_after_configure = None
#: Signal sent after app has been finalized.
on_after_finalize = None
#: Signal sent by every new process after fork.
on_after_fork = None
def __init__(self, main=None, loader=None, backend=None,
amqp=None, events=None, log=None, control=None,
set_as_current=True, tasks=None, broker=None, include=None,
changes=None, config_source=None, fixups=None, task_cls=None,
autofinalize=True, namespace=None, strict_typing=True,
**kwargs):
self.clock = LamportClock()
self.main = main
self.amqp_cls = amqp or self.amqp_cls
self.events_cls = events or self.events_cls
self.loader_cls = loader or self._get_default_loader()
self.log_cls = log or self.log_cls
self.control_cls = control or self.control_cls
self.task_cls = task_cls or self.task_cls
self.set_as_current = set_as_current
self.registry_cls = symbol_by_name(self.registry_cls)
self.user_options = defaultdict(set)
self.steps = defaultdict(set)
self.autofinalize = autofinalize
self.namespace = namespace
self.strict_typing = strict_typing
self.configured = False
self._config_source = config_source
self._pending_defaults = deque()
self._pending_periodic_tasks = deque()
self.finalized = False
self._finalize_mutex = threading.Lock()
self._pending = deque()
self._tasks = tasks
if not isinstance(self._tasks, TaskRegistry):
self._tasks = self.registry_cls(self._tasks or {})
# If the class defines a custom __reduce_args__ we need to use
# the old way of pickling apps: pickling a list of
# args instead of the new way that pickles a dict of keywords.
self._using_v1_reduce = app_has_custom(self, '__reduce_args__')
# these options are moved to the config to
# simplify pickling of the app object.
self._preconf = changes or {}
self._preconf_set_by_auto = set()
self.__autoset('broker_url', broker)
self.__autoset('result_backend', backend)
self.__autoset('include', include)
self.__autoset('broker_use_ssl', kwargs.get('broker_use_ssl'))
self.__autoset('redis_backend_use_ssl', kwargs.get('redis_backend_use_ssl'))
self._conf = Settings(
PendingConfiguration(
self._preconf, self._finalize_pending_conf),
prefix=self.namespace,
keys=(_old_key_to_new, _new_key_to_old),
)
# - Apply fix-ups.
self.fixups = set(self.builtin_fixups) if fixups is None else fixups
# ...store fixup instances in _fixups to keep weakrefs alive.
self._fixups = [symbol_by_name(fixup)(self) for fixup in self.fixups]
if self.set_as_current:
self.set_current()
# Signals
if self.on_configure is None:
# used to be a method pre 4.0
self.on_configure = Signal(name='app.on_configure')
self.on_after_configure = Signal(
name='app.on_after_configure',
providing_args={'source'},
)
self.on_after_finalize = Signal(name='app.on_after_finalize')
self.on_after_fork = Signal(name='app.on_after_fork')
self.on_init()
_register_app(self)
def _get_default_loader(self):
# the --loader command-line argument sets the environment variable.
return (
os.environ.get('CELERY_LOADER') or
self.loader_cls or
'celery.loaders.app:AppLoader'
)
def on_init(self):
"""Optional callback called at init."""
def __autoset(self, key, value):
if value:
self._preconf[key] = value
self._preconf_set_by_auto.add(key)
def set_current(self):
"""Make this the current app for this thread."""
_set_current_app(self)
def set_default(self):
"""Make this the default app for all threads."""
set_default_app(self)
def _ensure_after_fork(self):
if not self._after_fork_registered:
self._after_fork_registered = True
if register_after_fork is not None:
register_after_fork(self, _after_fork_cleanup_app)
def close(self):
"""Clean up after the application.
Only necessary for dynamically created apps, and you should
probably use the :keyword:`with` statement instead.
Example:
>>> with Celery(set_as_current=False) as app:
... with app.connection_for_write() as conn:
... pass
"""
self._pool = None
_deregister_app(self)
def task(self, *args, **opts):
"""Decorator to create a task class out of any callable.
See :ref:`Task options<task-options>` for a list of the
arguments that can be passed to this decorator.
Examples:
.. code-block:: python
@app.task
def refresh_feed(url):
store_feed(feedparser.parse(url))
with setting extra options:
.. code-block:: python
@app.task(exchange='feeds')
def refresh_feed(url):
return store_feed(feedparser.parse(url))
Note:
App Binding: For custom apps the task decorator will return
a proxy object, so that the act of creating the task is not
performed until the task is used or the task registry is accessed.
If you're depending on binding to be deferred, then you must
not access any attributes on the returned object until the
application is fully set up (finalized).
"""
if USING_EXECV and opts.get('lazy', True):
# When using execv the task in the original module will point to a
# different app, so doing things like 'add.request' will point to
# a different task instance. This makes sure it will always use
# the task instance from the current app.
# Really need a better solution for this :(
from . import shared_task
return shared_task(*args, lazy=False, **opts)
def inner_create_task_cls(shared=True, filter=None, lazy=True, **opts):
_filt = filter
def _create_task_cls(fun):
if shared:
def cons(app):
return app._task_from_fun(fun, **opts)
cons.__name__ = fun.__name__
connect_on_app_finalize(cons)
if not lazy or self.finalized:
ret = self._task_from_fun(fun, **opts)
else:
# return a proxy object that evaluates on first use
ret = PromiseProxy(self._task_from_fun, (fun,), opts,
__doc__=fun.__doc__)
self._pending.append(ret)
if _filt:
return _filt(ret)
return ret
return _create_task_cls
if len(args) == 1:
if callable(args[0]):
return inner_create_task_cls(**opts)(*args)
raise TypeError('argument 1 to @task() must be a callable')
if args:
raise TypeError(
'@task() takes exactly 1 argument ({} given)'.format(
sum([len(args), len(opts)])))
return inner_create_task_cls(**opts)
def _task_from_fun(self, fun, name=None, base=None, bind=False, **options):
if not self.finalized and not self.autofinalize:
raise RuntimeError('Contract breach: app not finalized')
name = name or self.gen_task_name(fun.__name__, fun.__module__)
base = base or self.Task
if name not in self._tasks:
run = fun if bind else staticmethod(fun)
task = type(fun.__name__, (base,), dict({
'app': self,
'name': name,
'run': run,
'_decorated': True,
'__doc__': fun.__doc__,
'__module__': fun.__module__,
'__header__': staticmethod(head_from_fun(fun, bound=bind)),
'__wrapped__': run}, **options))()
# for some reason __qualname__ cannot be set in type()
# so we have to set it here.
try:
task.__qualname__ = fun.__qualname__
except AttributeError:
pass
self._tasks[task.name] = task
task.bind(self) # connects task to this app
add_autoretry_behaviour(task, **options)
else:
task = self._tasks[name]
return task
def register_task(self, task):
"""Utility for registering a task-based class.
Note:
This is here for compatibility with old Celery 1.0
style task classes, you should not need to use this for
new projects.
"""
task = inspect.isclass(task) and task() or task
if not task.name:
task_cls = type(task)
task.name = self.gen_task_name(
task_cls.__name__, task_cls.__module__)
add_autoretry_behaviour(task)
self.tasks[task.name] = task
task._app = self
task.bind(self)
return task
def gen_task_name(self, name, module):
return gen_task_name(self, name, module)
def finalize(self, auto=False):
"""Finalize the app.
This loads built-in tasks, evaluates pending task decorators,
reads configuration, etc.
"""
with self._finalize_mutex:
if not self.finalized:
if auto and not self.autofinalize:
raise RuntimeError('Contract breach: app not finalized')
self.finalized = True
_announce_app_finalized(self)
pending = self._pending
while pending:
maybe_evaluate(pending.popleft())
for task in self._tasks.values():
task.bind(self)
self.on_after_finalize.send(sender=self)
def add_defaults(self, fun):
"""Add default configuration from dict ``d``.
If the argument is a callable function then it will be regarded
as a promise, and it won't be loaded until the configuration is
actually needed.
This method can be compared to:
.. code-block:: pycon
>>> celery.conf.update(d)
with a difference that 1) no copy will be made and 2) the dict will
not be transferred when the worker spawns child processes, so
it's important that the same configuration happens at import time
when pickle restores the object on the other side.
"""
if not callable(fun):
d, fun = fun, lambda: d
if self.configured:
return self._conf.add_defaults(fun())
self._pending_defaults.append(fun)
def config_from_object(self, obj,
silent=False, force=False, namespace=None):
"""Read configuration from object.
Object is either an actual object or the name of a module to import.
Example:
>>> celery.config_from_object('myapp.celeryconfig')
>>> from myapp import celeryconfig
>>> celery.config_from_object(celeryconfig)
Arguments:
silent (bool): If true then import errors will be ignored.
force (bool): Force reading configuration immediately.
By default the configuration will be read only when required.
"""
self._config_source = obj
self.namespace = namespace or self.namespace
if force or self.configured:
self._conf = None
if self.loader.config_from_object(obj, silent=silent):
return self.conf
def config_from_envvar(self, variable_name, silent=False, force=False):
"""Read configuration from environment variable.
The value of the environment variable must be the name
of a module to import.
Example:
>>> os.environ['CELERY_CONFIG_MODULE'] = 'myapp.celeryconfig'
>>> celery.config_from_envvar('CELERY_CONFIG_MODULE')
"""
module_name = os.environ.get(variable_name)
if not module_name:
if silent:
return False
raise ImproperlyConfigured(
ERR_ENVVAR_NOT_SET.strip().format(variable_name))
return self.config_from_object(module_name, silent=silent, force=force)
def config_from_cmdline(self, argv, namespace='celery'):
self._conf.update(
self.loader.cmdline_config_parser(argv, namespace)
)
def setup_security(self, allowed_serializers=None, key=None, cert=None,
store=None, digest=DEFAULT_SECURITY_DIGEST,
serializer='json'):
"""Setup the message-signing serializer.
This will affect all application instances (a global operation).
Disables untrusted serializers and if configured to use the ``auth``
serializer will register the ``auth`` serializer with the provided
settings into the Kombu serializer registry.
Arguments:
allowed_serializers (Set[str]): List of serializer names, or
content_types that should be exempt from being disabled.
key (str): Name of private key file to use.
Defaults to the :setting:`security_key` setting.
cert (str): Name of certificate file to use.
Defaults to the :setting:`security_certificate` setting.
store (str): Directory containing certificates.
Defaults to the :setting:`security_cert_store` setting.
digest (str): Digest algorithm used when signing messages.
Default is ``sha256``.
serializer (str): Serializer used to encode messages after
they've been signed. See :setting:`task_serializer` for
the serializers supported. Default is ``json``.
"""
from celery.security import setup_security
return setup_security(allowed_serializers, key, cert,
store, digest, serializer, app=self)
def autodiscover_tasks(self, packages=None,
related_name='tasks', force=False):
"""Auto-discover task modules.
Searches a list of packages for a "tasks.py" module (or use
related_name argument).
If the name is empty, this will be delegated to fix-ups (e.g., Django).
For example if you have a directory layout like this:
.. code-block:: text
foo/__init__.py
tasks.py
models.py
bar/__init__.py
tasks.py
models.py
baz/__init__.py
models.py
Then calling ``app.autodiscover_tasks(['foo', 'bar', 'baz'])`` will
result in the modules ``foo.tasks`` and ``bar.tasks`` being imported.
Arguments:
packages (List[str]): List of packages to search.
This argument may also be a callable, in which case the
value returned is used (for lazy evaluation).
related_name (Optional[str]): The name of the module to find. Defaults
to "tasks": meaning "look for 'module.tasks' for every
module in ``packages``.". If ``None`` will only try to import
the package, i.e. "look for 'module'".
force (bool): By default this call is lazy so that the actual
auto-discovery won't happen until an application imports
the default modules. Forcing will cause the auto-discovery
to happen immediately.
"""
if force:
return self._autodiscover_tasks(packages, related_name)
signals.import_modules.connect(starpromise(
self._autodiscover_tasks, packages, related_name,
), weak=False, sender=self)
def _autodiscover_tasks(self, packages, related_name, **kwargs):
if packages:
return self._autodiscover_tasks_from_names(packages, related_name)
return self._autodiscover_tasks_from_fixups(related_name)
def _autodiscover_tasks_from_names(self, packages, related_name):
# packages argument can be lazy
return self.loader.autodiscover_tasks(
packages() if callable(packages) else packages, related_name,
)
def _autodiscover_tasks_from_fixups(self, related_name):
return self._autodiscover_tasks_from_names([
pkg for fixup in self._fixups
for pkg in fixup.autodiscover_tasks()
if hasattr(fixup, 'autodiscover_tasks')
], related_name=related_name)
def send_task(self, name, args=None, kwargs=None, countdown=None,
eta=None, task_id=None, producer=None, connection=None,
router=None, result_cls=None, expires=None,
publisher=None, link=None, link_error=None,
add_to_parent=True, group_id=None, group_index=None,
retries=0, chord=None,
reply_to=None, time_limit=None, soft_time_limit=None,
root_id=None, parent_id=None, route_name=None,
shadow=None, chain=None, task_type=None, **options):
"""Send task by name.
Supports the same arguments as :meth:`@-Task.apply_async`.
Arguments:
name (str): Name of task to call (e.g., `"tasks.add"`).
result_cls (AsyncResult): Specify custom result class.
"""
parent = have_parent = None
amqp = self.amqp
task_id = task_id or uuid()
producer = producer or publisher # XXX compat
router = router or amqp.router
conf = self.conf
if conf.task_always_eager: # pragma: no cover
warnings.warn(AlwaysEagerIgnored(
'task_always_eager has no effect on send_task',
), stacklevel=2)
ignored_result = options.pop('ignore_result', False)
options = router.route(
options, route_name or name, args, kwargs, task_type)
if not root_id or not parent_id:
parent = self.current_worker_task
if parent:
if not root_id:
root_id = parent.request.root_id or parent.request.id
if not parent_id:
parent_id = parent.request.id
if conf.task_inherit_parent_priority:
options.setdefault('priority',
parent.request.delivery_info.get('priority'))
message = amqp.create_task_message(
task_id, name, args, kwargs, countdown, eta, group_id, group_index,
expires, retries, chord,
maybe_list(link), maybe_list(link_error),
reply_to or self.oid, time_limit, soft_time_limit,
self.conf.task_send_sent_event,
root_id, parent_id, shadow, chain,
argsrepr=options.get('argsrepr'),
kwargsrepr=options.get('kwargsrepr'),
)
if connection:
producer = amqp.Producer(connection, auto_declare=False)
with self.producer_or_acquire(producer) as P:
with P.connection._reraise_as_library_errors():
if not ignored_result:
self.backend.on_task_call(P, task_id)
amqp.send_task_message(P, name, message, **options)
result = (result_cls or self.AsyncResult)(task_id)
# We avoid using the constructor since a custom result class
# can be used, in which case the constructor may still use
# the old signature.
result.ignored = ignored_result
if add_to_parent:
if not have_parent:
parent, have_parent = self.current_worker_task, True
if parent:
parent.add_trail(result)
return result
def connection_for_read(self, url=None, **kwargs):
"""Establish connection used for consuming.
See Also:
:meth:`connection` for supported arguments.
"""
return self._connection(url or self.conf.broker_read_url, **kwargs)
def connection_for_write(self, url=None, **kwargs):
"""Establish connection used for producing.
See Also:
:meth:`connection` for supported arguments.
"""
return self._connection(url or self.conf.broker_write_url, **kwargs)
def connection(self, hostname=None, userid=None, password=None,
virtual_host=None, port=None, ssl=None,
connect_timeout=None, transport=None,
transport_options=None, heartbeat=None,
login_method=None, failover_strategy=None, **kwargs):
"""Establish a connection to the message broker.
Please use :meth:`connection_for_read` and
:meth:`connection_for_write` instead, to convey the intent
of use for this connection.
Arguments:
url: Either the URL or the hostname of the broker to use.
hostname (str): URL, Hostname/IP-address of the broker.
If a URL is used, then the other argument below will
be taken from the URL instead.
userid (str): Username to authenticate as.
password (str): Password to authenticate with
virtual_host (str): Virtual host to use (domain).
port (int): Port to connect to.
ssl (bool, Dict): Defaults to the :setting:`broker_use_ssl`
setting.
transport (str): defaults to the :setting:`broker_transport`
setting.
transport_options (Dict): Dictionary of transport specific options.
heartbeat (int): AMQP Heartbeat in seconds (``pyamqp`` only).
login_method (str): Custom login method to use (AMQP only).
failover_strategy (str, Callable): Custom failover strategy.
**kwargs: Additional arguments to :class:`kombu.Connection`.
Returns:
kombu.Connection: the lazy connection instance.
"""
return self.connection_for_write(
hostname or self.conf.broker_write_url,
userid=userid, password=password,
virtual_host=virtual_host, port=port, ssl=ssl,
connect_timeout=connect_timeout, transport=transport,
transport_options=transport_options, heartbeat=heartbeat,
login_method=login_method, failover_strategy=failover_strategy,
**kwargs
)
def _connection(self, url, userid=None, password=None,
virtual_host=None, port=None, ssl=None,
connect_timeout=None, transport=None,
transport_options=None, heartbeat=None,
login_method=None, failover_strategy=None, **kwargs):
conf = self.conf
return self.amqp.Connection(
url,
userid or conf.broker_user,
password or conf.broker_password,
virtual_host or conf.broker_vhost,
port or conf.broker_port,
transport=transport or conf.broker_transport,
ssl=self.either('broker_use_ssl', ssl),
heartbeat=heartbeat,
login_method=login_method or conf.broker_login_method,
failover_strategy=(
failover_strategy or conf.broker_failover_strategy
),
transport_options=dict(
conf.broker_transport_options, **transport_options or {}
),
connect_timeout=self.either(
'broker_connection_timeout', connect_timeout
),
)
broker_connection = connection
def _acquire_connection(self, pool=True):
"""Helper for :meth:`connection_or_acquire`."""
if pool:
return self.pool.acquire(block=True)
return self.connection_for_write()
def connection_or_acquire(self, connection=None, pool=True, *_, **__):
"""Context used to acquire a connection from the pool.
For use within a :keyword:`with` statement to get a connection
from the pool if one is not already provided.
Arguments:
connection (kombu.Connection): If not provided, a connection
will be acquired from the connection pool.
"""
return FallbackContext(connection, self._acquire_connection, pool=pool)
default_connection = connection_or_acquire # XXX compat
def producer_or_acquire(self, producer=None):
"""Context used to acquire a producer from the pool.
For use within a :keyword:`with` statement to get a producer
from the pool if one is not already provided
Arguments:
producer (kombu.Producer): If not provided, a producer
will be acquired from the producer pool.
"""
return FallbackContext(
producer, self.producer_pool.acquire, block=True,
)
default_producer = producer_or_acquire # XXX compat
def prepare_config(self, c):
"""Prepare configuration before it is merged with the defaults."""
return find_deprecated_settings(c)
def now(self):
"""Return the current time and date as a datetime."""
now_in_utc = to_utc(datetime.utcnow())
return now_in_utc.astimezone(self.timezone)
def select_queues(self, queues=None):
"""Select subset of queues.
Arguments:
queues (Sequence[str]): a list of queue names to keep.
"""
return self.amqp.queues.select(queues)
def either(self, default_key, *defaults):
"""Get key from configuration or use default values.
Fallback to the value of a configuration key if none of the
`*values` are true.
"""
return first(None, [
first(None, defaults), starpromise(self.conf.get, default_key),
])
def bugreport(self):
"""Return information useful in bug reports."""
return bugreport(self)
def _get_backend(self):
backend, url = backends.by_url(
self.backend_cls or self.conf.result_backend,
self.loader)
return backend(app=self, url=url)
def _finalize_pending_conf(self):
"""Get config value by key and finalize loading the configuration.
Note:
This is used by PendingConfiguration:
as soon as you access a key the configuration is read.
"""
conf = self._conf = self._load_config()
return conf
def _load_config(self):
if isinstance(self.on_configure, Signal):
self.on_configure.send(sender=self)
else:
# used to be a method pre 4.0
self.on_configure()
if self._config_source:
self.loader.config_from_object(self._config_source)
self.configured = True
settings = detect_settings(
self.prepare_config(self.loader.conf), self._preconf,
ignore_keys=self._preconf_set_by_auto, prefix=self.namespace,
)
if self._conf is not None:
# replace in place, as someone may have referenced app.conf,
# done some changes, accessed a key, and then try to make more
# changes to the reference and not the finalized value.
self._conf.swap_with(settings)
else:
self._conf = settings
# load lazy config dict initializers.
pending_def = self._pending_defaults
while pending_def:
self._conf.add_defaults(maybe_evaluate(pending_def.popleft()()))
# load lazy periodic tasks
pending_beat = self._pending_periodic_tasks
while pending_beat:
self._add_periodic_task(*pending_beat.popleft())
self.on_after_configure.send(sender=self, source=self._conf)
return self._conf
def _after_fork(self):
self._pool = None
try:
self.__dict__['amqp']._producer_pool = None
except (AttributeError, KeyError):
pass
self.on_after_fork.send(sender=self)
def signature(self, *args, **kwargs):
"""Return a new :class:`~celery.Signature` bound to this app."""
kwargs['app'] = self
return self._canvas.signature(*args, **kwargs)
def add_periodic_task(self, schedule, sig,
args=(), kwargs=(), name=None, **opts):
key, entry = self._sig_to_periodic_task_entry(
schedule, sig, args, kwargs, name, **opts)
if self.configured:
self._add_periodic_task(key, entry)
else:
self._pending_periodic_tasks.append((key, entry))
return key
def _sig_to_periodic_task_entry(self, schedule, sig,
args=(), kwargs=None, name=None, **opts):
kwargs = {} if not kwargs else kwargs
sig = (sig.clone(args, kwargs)
if isinstance(sig, abstract.CallableSignature)
else self.signature(sig.name, args, kwargs))
return name or repr(sig), {
'schedule': schedule,
'task': sig.name,
'args': sig.args,
'kwargs': sig.kwargs,
'options': dict(sig.options, **opts),
}
def _add_periodic_task(self, key, entry):
self._conf.beat_schedule[key] = entry
def create_task_cls(self):
"""Create a base task class bound to this app."""
return self.subclass_with_self(
self.task_cls, name='Task', attribute='_app',
keep_reduce=True, abstract=True,
)
def subclass_with_self(self, Class, name=None, attribute='app',
reverse=None, keep_reduce=False, **kw):
"""Subclass an app-compatible class.
App-compatible means that the class has a class attribute that
provides the default app it should use, for example:
``class Foo: app = None``.
Arguments:
Class (type): The app-compatible class to subclass.
name (str): Custom name for the target class.
attribute (str): Name of the attribute holding the app,
Default is 'app'.
reverse (str): Reverse path to this object used for pickling
purposes. For example, to get ``app.AsyncResult``,
use ``"AsyncResult"``.
keep_reduce (bool): If enabled a custom ``__reduce__``
implementation won't be provided.
"""
Class = symbol_by_name(Class)
reverse = reverse if reverse else Class.__name__
def __reduce__(self):
return _unpickle_appattr, (reverse, self.__reduce_args__())
attrs = dict(
{attribute: self},
__module__=Class.__module__,
__doc__=Class.__doc__,
**kw)
if not keep_reduce:
attrs['__reduce__'] = __reduce__
return type(name or Class.__name__, (Class,), attrs)
def _rgetattr(self, path):
return attrgetter(path)(self)
def __enter__(self):
return self
def __exit__(self, *exc_info):
self.close()
def __repr__(self):
return '<{} {}>'.format(type(self).__name__, appstr(self))
def __reduce__(self):
if self._using_v1_reduce:
return self.__reduce_v1__()
return (_unpickle_app_v2, (self.__class__, self.__reduce_keys__()))
def __reduce_v1__(self):
# Reduce only pickles the configuration changes,
# so the default configuration doesn't have to be passed
# between processes.
return (
_unpickle_app,
(self.__class__, self.Pickler) + self.__reduce_args__(),
)
def __reduce_keys__(self):
"""Keyword arguments used to reconstruct the object when unpickling."""
return {
'main': self.main,
'changes':
self._conf.changes if self.configured else self._preconf,
'loader': self.loader_cls,
'backend': self.backend_cls,
'amqp': self.amqp_cls,
'events': self.events_cls,
'log': self.log_cls,
'control': self.control_cls,
'fixups': self.fixups,
'config_source': self._config_source,
'task_cls': self.task_cls,
'namespace': self.namespace,
}
def __reduce_args__(self):
"""Deprecated method, please use :meth:`__reduce_keys__` instead."""
return (self.main, self._conf.changes if self.configured else {},
self.loader_cls, self.backend_cls, self.amqp_cls,
self.events_cls, self.log_cls, self.control_cls,
False, self._config_source)
@cached_property
def Worker(self):
"""Worker application.
See Also:
:class:`~@Worker`.
"""
return self.subclass_with_self('celery.apps.worker:Worker')
@cached_property
def WorkController(self, **kwargs):
"""Embeddable worker.
See Also:
:class:`~@WorkController`.
"""
return self.subclass_with_self('celery.worker:WorkController')
@cached_property
def Beat(self, **kwargs):
""":program:`celery beat` scheduler application.
See Also:
:class:`~@Beat`.
"""
return self.subclass_with_self('celery.apps.beat:Beat')
@cached_property
def Task(self):
"""Base task class for this app."""
return self.create_task_cls()
@cached_property
def annotations(self):
return prepare_annotations(self.conf.task_annotations)
@cached_property
def AsyncResult(self):
"""Create new result instance.
See Also:
:class:`celery.result.AsyncResult`.
"""
return self.subclass_with_self('celery.result:AsyncResult')
@cached_property
def ResultSet(self):
return self.subclass_with_self('celery.result:ResultSet')
@cached_property
def GroupResult(self):
"""Create new group result instance.
See Also:
:class:`celery.result.GroupResult`.
"""
return self.subclass_with_self('celery.result:GroupResult')
@property
def pool(self):
"""Broker connection pool: :class:`~@pool`.
Note:
This attribute is not related to the workers concurrency pool.
"""
if self._pool is None:
self._ensure_after_fork()
limit = self.conf.broker_pool_limit
pools.set_limit(limit)
self._pool = pools.connections[self.connection_for_write()]
return self._pool
@property
def current_task(self):
"""Instance of task being executed, or :const:`None`."""
return _task_stack.top
@property
def current_worker_task(self):
"""The task currently being executed by a worker or :const:`None`.
Differs from :data:`current_task` in that it's not affected
by tasks calling other tasks directly, or eagerly.
"""
return get_current_worker_task()
@cached_property
def oid(self):
"""Universally unique identifier for this app."""
# since 4.0: thread.get_ident() is not included when
# generating the process id. This is due to how the RPC
# backend now dedicates a single thread to receive results,
# which would not work if each thread has a separate id.
return oid_from(self, threads=False)
@cached_property
def amqp(self):
"""AMQP related functionality: :class:`~@amqp`."""
return instantiate(self.amqp_cls, app=self)
@cached_property
def backend(self):
"""Current backend instance."""
return self._get_backend()
@property
def conf(self):
"""Current configuration."""
if self._conf is None:
self._conf = self._load_config()
return self._conf
@conf.setter
def conf(self, d): # noqa
self._conf = d
@cached_property
def control(self):
"""Remote control: :class:`~@control`."""
return instantiate(self.control_cls, app=self)
@cached_property
def events(self):
"""Consuming and sending events: :class:`~@events`."""
return instantiate(self.events_cls, app=self)
@cached_property
def loader(self):
"""Current loader instance."""
return get_loader_cls(self.loader_cls)(app=self)
@cached_property
def log(self):
"""Logging: :class:`~@log`."""
return instantiate(self.log_cls, app=self)
@cached_property
def _canvas(self):
from celery import canvas
return canvas
@cached_property
def tasks(self):
"""Task registry.
Warning:
Accessing this attribute will also auto-finalize the app.
"""
self.finalize(auto=True)
return self._tasks
@property
def producer_pool(self):
return self.amqp.producer_pool
def uses_utc_timezone(self):
"""Check if the application uses the UTC timezone."""
return self.timezone == timezone.utc
@cached_property
def timezone(self):
"""Current timezone for this app.
This is a cached property taking the time zone from the
:setting:`timezone` setting.
"""
conf = self.conf
if not conf.timezone:
if conf.enable_utc:
return timezone.utc
else:
return timezone.local
return timezone.get_timezone(conf.timezone)
App = Celery # noqa: E305 XXX compat
You can’t perform that action at this time.