-
-
Notifications
You must be signed in to change notification settings - Fork 4.6k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
* add kafka broker docs * modify config options to be more accurate * add additional documentation on findings * update config and add limitations * sasl
- Loading branch information
Showing
2 changed files
with
85 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,82 @@ | ||
.. _broker-kafka: | ||
|
||
============= | ||
Using Kafka | ||
============= | ||
|
||
.. _broker-Kafka-installation: | ||
|
||
Configuration | ||
============= | ||
|
||
For celeryconfig.py: | ||
|
||
.. code-block:: python | ||
import os | ||
task_serializer = 'json' | ||
broker_transport_options = { | ||
# "allow_create_topics": True, | ||
} | ||
broker_connection_retry_on_startup = True | ||
# For using SQLAlchemy as the backend | ||
# result_backend = 'db+postgresql://postgres:example@localhost/postgres' | ||
broker_transport_options.update({ | ||
"security_protocol": "SASL_SSL", | ||
"sasl_mechanism": "SCRAM-SHA-512", | ||
}) | ||
sasl_username = os.environ["SASL_USERNAME"] | ||
sasl_password = os.environ["SASL_PASSWORD"] | ||
broker_url = f"confluentkafka://{sasl_username}:{sasl_password}@broker:9094" | ||
kafka_admin_config = { | ||
"sasl.username": sasl_username, | ||
"sasl.password": sasl_password, | ||
} | ||
kafka_common_config = { | ||
"sasl.username": sasl_username, | ||
"sasl.password": sasl_password, | ||
"security.protocol": "SASL_SSL", | ||
"sasl.mechanism": "SCRAM-SHA-512", | ||
"bootstrap_servers": "broker:9094", | ||
} | ||
Please note that "allow_create_topics" is needed if the topic does not exist | ||
yet but is not necessary otherwise. | ||
|
||
For tasks.py: | ||
|
||
.. code-block:: python | ||
from celery import Celery | ||
app = Celery('tasks') | ||
app.config_from_object('celeryconfig') | ||
@app.task | ||
def add(x, y): | ||
return x + y | ||
Auth | ||
==== | ||
|
||
See above. The SASL username and password are passed in as environment variables. | ||
|
||
Further Info | ||
============ | ||
|
||
Celery queues get routed to Kafka topics. For example, if a queue is named "add_queue", | ||
then a topic named "add_queue" will be created/used in Kafka. | ||
|
||
For canvas, when using a backend that supports it, the typical mechanisms like | ||
chain, group, and chord seem to work. | ||
|
||
|
||
Limitations | ||
=========== | ||
|
||
Currently, using Kafka as a broker means that only one worker can be used. | ||
See https://github.com/celery/kombu/issues/1785. |