-
Notifications
You must be signed in to change notification settings - Fork 16.3k
Description
Apache Airflow version: 2.0, LocalExecutor
Environment: Docker on Windows 10 with WSL using image apache/airflow:2.0.0-python3.8
What happened:
Situation:
- There is a DAG, say
mydag, withcatchup=Truemax_active_runs=1
- Let's say there are two DAG runs, t=0 and t=1
- The first task of the DAG is a sensor that senses if the previous DAG was succesful
- Now, t=0 gets run, tasks are scheduled, and a task in t=0 fails
- Then, t=1 gets run, and the first task - the sensor - cannot sense the successful task, thus keeps sensing
- Now I clear the failed task in t=0 and expect that this would run, as it did in airflow 1.x
- It doesn't - instead the scheduler gives the following:
scheduler_1 | [2020-12-31 15:25:32,770] {scheduler_job.py:1667} INFO - DAG mydag already has 1 active runs, not queuing any tasks for run 2020-12-26 05:00:00+00:00 [note: this is t=0]
Thus, t=0 never finishes and t=1 never sensed the finished run, and any t=n with n>1 also have no chance of ever succeeding.
One alternative would be to remove the max_active_runs constraint, but that is not feasible, as this would create hundreds of DAG runs at once and that is a complete and total performance killer.
What you expected to happen:
As with previous airflow versions, I would expect that the cleared tasks get scheduled again, which they don't.
Why this happens:
tl;dr Ultimately, this happens because airflow uses TI instead of DR here: https://github.com/apache/airflow/blob/v2-0-stable/airflow/jobs/scheduler_job.py#L1499-L1509
_do_scheduling() runs _schedule_dag_run() once for each dag_id, and gives the set of active dag runs as arg, here: https://github.com/apache/airflow/blob/v2-0-stable/airflow/jobs/scheduler_job.py#L1515. The tasks that should be queued are not queued because the dag runs are not in the abovementioned set of active dag runs. This is in spite of the fact that they are running. This is because https://github.com/apache/airflow/blob/v2-0-stable/airflow/jobs/scheduler_job.py#L1499-L1509 looks at all TaskInstances of that dagrun and their execution date instead of looking at the DagRuns, and since the tasks were successfull or failed and then cleared, they are filtered out in the query. If you replace TI with DR in that query, this should work perfectly fine, without breaking anything that currently works and fixing this issue.
How to reproduce it:
You don't need to have the sensor logic I described above to reproduce this behavior. While I didn't do this, the following should reproduce the behavior:
- Create a DAG
mydagwithcatchup=Trueandmax_active_runs=1 - Just have a dummy task or something, let it run a couple of times so you have a couple of successful DAG states
- Pause the DAG*
- Clear a couple of tasks in dag runs that were successful
- run this snippet to see the result of the query with
TIandDR, respectively
from airflow import models, settings
from airflow.utils.state import State
TI = models.TaskInstance
DR = models.DagRun
dag_id = "mydag"
result = "\n\nactive DAG runs according to current code logic:"
for data_tuple in settings.Session().query(TI.dag_id, TI.execution_date).filter(TI.dag_id.in_([dag_id]), TI.state.notin_(list(State.finished))):
result += "\n\t" + str(data_tuple)
result += "\n\nactive DAG runs according to my proposed code logic:"
for data_tuple in settings.Session().query(DR.dag_id, DR.execution_date).filter(DR.dag_id.in_([dag_id]), DR.state.in_([State.RUNNING])):
result += "\n\t" + str(data_tuple)
print(result, "\n")*Pausing of the DAG only avoids that your airflow instance works through the dag runs one-by-one; you would not need to pause if your DAG has a sensor that senses the success of the previous DAG like mine do.
I will be creating a PR with the suggested fix shortly.