Skip to content

Commit

Permalink
[AIRFLOW-966] Make celery broker_transport_options configurable
Browse files Browse the repository at this point in the history
Required for changing visibility timeout and other options required
for Redis/SQS.
  • Loading branch information
bolkedebruin committed Dec 5, 2017
1 parent 1359d87 commit be79f87
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 1 deletion.
7 changes: 7 additions & 0 deletions airflow/config_templates/default_airflow.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,13 @@ default_queue = default
# Import path for celery configuration options
celery_config_options = airflow.config_templates.default_celery.DEFAULT_CELERY_CONFIG

[celery_broker_transport_options]
# The visibility timeout defines the number of seconds to wait for the worker
# to acknowledge the task before the message is redelivered to another worker.
# Make sure to increase the visibility timeout to match the time of the longest
# ETA you’re planning to use. Especially important in case of using Redis or SQS
visibility_timeout = 21600

[dask]
# This section only applies if you are using the DaskExecutor in
# [core] section above
Expand Down
6 changes: 5 additions & 1 deletion airflow/config_templates/default_celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@
from airflow.utils.log.logging_mixin import LoggingMixin


broker_transport_options = configuration.getsection('celery_broker_transport_options')
if broker_transport_options is None:
broker_transport_options = {'visibility_timeout': 21600}

DEFAULT_CELERY_CONFIG = {
'accept_content': ['json', 'pickle'],
'event_serializer': 'json',
Expand All @@ -28,7 +32,7 @@
'task_default_queue': configuration.get('celery', 'DEFAULT_QUEUE'),
'task_default_exchange': configuration.get('celery', 'DEFAULT_QUEUE'),
'broker_url': configuration.get('celery', 'BROKER_URL'),
'broker_transport_options': {'visibility_timeout': 21600},
'broker_transport_options': {'visibility_timeout': broker_transport_options},
'result_backend': configuration.get('celery', 'CELERY_RESULT_BACKEND'),
'worker_concurrency': configuration.getint('celery', 'CELERYD_CONCURRENCY'),
}
Expand Down
10 changes: 10 additions & 0 deletions airflow/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,12 @@ def read(self, filenames):
ConfigParser.read(self, filenames)
self._validate()

def getsection(self, section):
if section in self._sections:
return self._sections[section]

return None

def as_dict(self, display_source=False, display_sensitive=False):
"""
Returns the current configuration as an OrderedDict of OrderedDicts.
Expand Down Expand Up @@ -423,6 +429,10 @@ def getint(section, key):
return conf.getint(section, key)


def getsection(section):
return conf.getsection(section)


def has_option(section, key):
return conf.has_option(section, key)

Expand Down
5 changes: 5 additions & 0 deletions docs/configuration.rst
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,11 @@ Note that you can also run "Celery Flower", a web UI built on top of Celery,
to monitor your workers. You can use the shortcut command ``airflow flower``
to start a Flower web server.

Some caveats:

- Make sure to use a database backed result backend
- Make sure to set a visibility timeout in [celery_broker_transport_options] that exceeds the ETA of your longest running task
- Tasks can and consume resources, make sure your worker as enough resources to run `celeryd_concurrency` tasks

Scaling Out with Dask
'''''''''''''''''''''
Expand Down

0 comments on commit be79f87

Please sign in to comment.