-
Notifications
You must be signed in to change notification settings - Fork 13.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[AIRFLOW-4415] skip status propagation #5189
Conversation
cc: @ArgentFalcon |
@fmao There are flake8 violations that needs to be resolved so the tests can run on your code:
|
Codecov Report
@@ Coverage Diff @@
## master #5189 +/- ##
==========================================
- Coverage 80.02% 78.98% -1.05%
==========================================
Files 594 480 -114
Lines 34769 30128 -4641
==========================================
- Hits 27824 23796 -4028
+ Misses 6945 6332 -613
Continue to review full report at Codecov.
|
@milton0825 @ArgentFalcon Any follow up on this commit? |
@feng-tao This pull request might need a little bit more efforts to review. |
@fmao please rebase onto master |
@feluelle rebased. |
I think there is an issue with rebasing |
08d8365
to
46a5712
Compare
This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions. |
I'm not sure that this actually fixes the issue, because it doesn't propogate the skip state, which is the issue I've generally seen. |
@@ -264,7 +264,7 @@ def __init__( | |||
on_failure_callback: Optional[Callable] = None, | |||
on_success_callback: Optional[Callable] = None, | |||
on_retry_callback: Optional[Callable] = None, | |||
trigger_rule: str = TriggerRule.ALL_SUCCESS, | |||
trigger_rule: str = TriggerRule.NONE_FAILED, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is changing the default trigger_rule to NONE_FAILED. So previously if any immediate upstream job is skipped, a task would not run. But after this PR this behaviour changes. A task will run even if its upstream tasks were skipped. That's going to cause problems for people relying on the current behaviour. Why not leave this change out and tell users to set the trigger rule to NONE_FAILED only if they want this new behaviour?
Closing this PR as it stands as:
|
Make sure you have checked all steps below.
Jira
My PR addresses the following Airflow JIRA issues and references them in the PR title.
https://issues.apache.org/jira/browse/AIRFLOW-4415
references:
https://issues.apache.org/jira/browse/AIRFLOW-719
https://issues.apache.org/jira/browse/AIRFLOW-982
https://issues.apache.org/jira/browse/AIRFLOW-983
The temp fix was remove after 1.8.0 due to #2195
in the latest branch, a new trigger rule is added.
#4182
Description
Issue: skip status stop propagation to down streams and get randomly stopped with the dag status marked as failed.
The issue is located in the version 1.8.1.
In version 1.8.0 there is a temp fix but removed after this version.
4077c6d
92965e8
root casue:
In a loop, the scheduler evaluates each dag and all its task dependcies around by around.
Each round evaluation happens twice in the context of flag_upstream_failed = false and =true.
The dag run update method mark the dag run deadlocked which stops the dag and all its tasks from be processed furture.
https://github.com/apache/airflow/blob/1.8.1/airflow/models.py#L4184
It is due to in no_dependencies_met. All_sccucess trigger rule misses skipped status check and marks the task as failed when upstream only has skipped tasks.
https://github.com/apache/airflow/blob/1.8.1/airflow/models.py#L4152
https://github.com/apache/airflow/blob/1.8.1/airflow/ti_deps/deps/trigger_rule_dep.py#L165
Each dag update will checks all its task deps and sent ready tasks to run in the context of flag_upstream_failed=false (defalt)
https://github.com/apache/airflow/blob/1.8.1/airflow/models.py#L4156 which wont handle skip status propagation.
After dag update, dag will checks all its task deps and sent ready tasks to run in the context of flag_upstream_failed=true
https://github.com/apache/airflow/blob/1.8.1/airflow/jobs.py#L904
which handles skip status propogration.
https://github.com/apache/airflow/blob/1.8.1/airflow/ti_deps/deps/trigger_rule_dep.py#L138
Two potential causes that will trigger dag update detect a deadlock.
The skip status proprogatation rely on detected skipped upstreams (which happens asyncly by other nodes writing skipped status to db).
If the tasks been evaluated are not following topoloy order(random order) by priority_weigth. It requried multipe loop rounds to propogate skip statue to all downsteam tasks.
Depending on how close the topoloy order the tasks fetched, the proprogation may go further or shorter.
The deadlock detetion can be avoid only the following conditions happen at the same time:
Fix approaches:
Mark the ALL_sucess trigger rule : num_failures = upstream - successes - skipped.
It will prevent the deadlock detectoin from being triggered. If the tasks are not ordered, mulitple rounds are required and will eventually mark all of the tasked as skipped. Or add an additional trigger rule. That is [AIRFLOW-3336] Add new TriggerRule that will consider skipped ancestors as success #4182 and make it as the default trigger rule.
Ordered tasks by topopy order to speed speedup skip status propogation in one round of evaluation.
https://github.com/apache/airflow/blob/1.8.1/airflow/jobs.py#L893
tis = sorted(tis, key=lambda x: x.priority_weight, reverse=True)
Tests
Commits
Documentation
Code Quality
flake8