New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix entire DAG stops when one task has end_date #20920
Conversation
Please revert all the whitespace changes. They make static checks fail, and it’s extremely difficult to review what you actually want to change due to all the noise. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I looks good to me but requires another commiter approval. Also it needs additional test covering the scenario, to avoid regressions in the future.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Need another commiter approval.
The PR most likely needs to run full matrix of tests because it modifies parts of the core of Airflow. However, committers might decide to merge it quickly and take the risk. If they don't merge it quickly - please rebase it to the latest main at your convenience, or amend the last commit of the PR, and push it with --force-with-lease. |
@chenglongyan please rebase |
I'm sorry, I've been too busy lately, I'll follow up in time |
def task_filter(task: "Operator") -> bool: | ||
return task.task_id not in task_ids and ( | ||
self.is_backfill or task.start_date <= self.execution_date | ||
self.is_backfill | ||
or task.start_date <= self.execution_date | ||
and (task.end_date is None or self.execution_date <= task.end_date) | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let’s refactor this to early returns to improve readability, something like
def task_filter(task: "Operator") -> bool:
if task.task_id in task_ids:
return False
if self.is_backfill:
return ...
...
return ...
Closes: #19917 , #20471