Skip to content

Commit

Permalink
Merge pull request #109 from airbnb/policy
Browse files Browse the repository at this point in the history
Adding a policy hook to allow setting up system-wide policy
  • Loading branch information
mistercrunch committed Jul 9, 2015
2 parents d2dd459 + 9368c71 commit 03eb7ed
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 4 deletions.
1 change: 1 addition & 0 deletions airflow/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -820,6 +820,7 @@ def signal_handler(signum, frame):

self.render_templates()
context = self.get_template_context()
settings.policy(task_copy)
task_copy.pre_execute(context=context)
task_copy.execute(context=context)
task_copy.post_execute(context=context)
Expand Down
41 changes: 37 additions & 4 deletions airflow/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,40 @@
sessionmaker(autocommit=False, autoflush=False, bind=engine))

# can't move this to configuration due to ConfigParser interpolation
LOG_FORMAT = \
'[%(asctime)s] {%(filename)s:%(lineno)d} %(levelname)s - %(message)s'
SIMPLE_LOG_FORMAT = \
'%(asctime)s %(levelname)s - %(message)s'
LOG_FORMAT = (
'[%(asctime)s] {%(filename)s:%(lineno)d} %(levelname)s - %(message)s')
SIMPLE_LOG_FORMAT = '%(asctime)s %(levelname)s - %(message)s'


def policy(task_instance):
"""
This policy setting allows altering task instances right before they
are executed. It allows administrator to rewire some task parameters.
Note that the ``TaskInstance`` object has an attribute ``task`` pointing
to its related task object, that in turns has a reference to the DAG
object. So you can use the attributes of all of these to define your
policy.
To define policy, add a ``airflow_local_settings`` module
to your PYTHONPATH that defines this ``policy`` function. It receives
a ``TaskInstance`` object and can alter it where needed.
Here are a few examples of how this can be useful:
* You could enforce a specific queue (say the ``spark`` queue)
for tasks using the ``SparkOperator`` to make sure that these
task instances get wired to the right workers
* You could force all task instances running on an
``execution_date`` older than a week old to run in a ``backfill``
pool.
* ...
"""
pass


try:
from airflow_local_settings import *
logging.info("Loaded airflow_local_settings.")
except:
pass

0 comments on commit 03eb7ed

Please sign in to comment.