Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create DAG-level cluster policy #12184

Merged
merged 5 commits into from
Nov 13, 2020

Conversation

turbaszek
Copy link
Member

This commit adds new concept of dag_policy which is checked
once for every DAG when creating DagBag. It also improves
documentation around cluster policies.

closes: #12179


^ Add meaningful description above

Read the Pull Request Guidelines for more information.
In case of fundamental code change, Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in UPDATING.md.

@turbaszek
Copy link
Member Author

CC @olchas @TobKed @jaketf

@turbaszek turbaszek added the area:Scheduler including HA (high availability) scheduler label Nov 8, 2020
UPDATING.md Show resolved Hide resolved
for task in dag.tasks:
settings.policy(task)
settings.task_policy(task)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here we should check if 'policy' is defined and run it as well but with deprecation warning.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, this is what users will see:

root@817b1471dabe:/opt/airflow# airflow scheduler
/opt/airflow/airflow/models/dag.py:61: DeprecationWarning: Using `policy` in airflow_local_settings.py is deprecated. Please rename your `policy` to `task_policy`.
  from airflow.models.dagbag import DagBag
  ____________       _____________
 ____    |__( )_________  __/__  /________      __
____  /| |_  /__  ___/_  /_ __  /_  __ \_ | /| / /
___  ___ |  / _  /   _  __/ _  / / /_/ /_ |/ |/ /
 _/_/  |_/_/  /_/    /_/    /_/  \____/____/|__/
[2020-11-08 19:09:09,005] {scheduler_job.py:1248} INFO - Starting the scheduler
[2020-11-08 19:09:09,005] {scheduler_job.py:1253} INFO - Processing each file at most -1 times
[2020-11-08 19:09:09,114] {scheduler_job.py:1275} INFO - Resetting orphaned tasks for active dag runs

@turbaszek turbaszek requested a review from potiuk November 8, 2020 19:13
UPDATING.md Show resolved Hide resolved
docs/concepts.rst Outdated Show resolved Hide resolved
@github-actions
Copy link

The PR needs to run all tests because it modifies core of Airflow! Please rebase it to latest master or ask committer to re-run it!

@github-actions github-actions bot added the full tests needed We need to run full set of tests for this PR to merge label Nov 12, 2020
Copy link
Member

@ashb ashb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One small change, otherwise looks good.

Comment on lines 50 to 58
# TODO: Remove once deprecated
if hasattr(settings, "policy"):
warnings.warn(
"Using `policy` in airflow_local_settings.py is deprecated. "
"Please rename your `policy` to `task_policy`.",
DeprecationWarning,
stacklevel=2,
)
setattr(settings, "task_policy", settings.policy) # pylint: disable=no-member
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be done in airflow.settings.import_local_settings function please.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That was my intial idea but somehow I decided to do this in dagbag 🤷‍♂️ Done in a250f41

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm just conscious that DagBag may go away at some point soon -- it does a lot less than it used to.

Also top-level code is a bit of a smell to me.

@github-actions github-actions bot removed the full tests needed We need to run full set of tests for this PR to merge label Nov 12, 2020
@turbaszek turbaszek requested a review from ashb November 12, 2020 14:22
airflow/settings.py Outdated Show resolved Hide resolved
@turbaszek turbaszek requested a review from ashb November 12, 2020 18:39
@github-actions github-actions bot added the full tests needed We need to run full set of tests for this PR to merge label Nov 12, 2020
@github-actions
Copy link

The PR needs to run all tests because it modifies core of Airflow! Please rebase it to latest master or ask committer to re-run it!

This commit adds new concept of dag_policy which is checked
once for every DAG when creating DagBag. It also improves
documentation around cluster policies.

closes: apache#12179
turbaszek and others added 4 commits November 13, 2020 11:57
Co-authored-by: Kaxil Naik <kaxilnaik@gmail.com>
Co-authored-by: Ash Berlin-Taylor <ash_github@firemirror.com>
@turbaszek turbaszek merged commit 1222ebd into apache:master Nov 13, 2020
@turbaszek turbaszek deleted the dag-cluster-policy branch November 13, 2020 13:32
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:Scheduler including HA (high availability) scheduler full tests needed We need to run full set of tests for this PR to merge
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Add DAG level cluster policy
5 participants