Skip to content

Commit

Permalink
fix: DagRuns with UPSTREAM_FAILED tasks get stuck in the backfill. (a…
Browse files Browse the repository at this point in the history
  • Loading branch information
avkirilishin authored and amoghrajesh committed Jan 30, 2024
1 parent 731a6b4 commit 817c5a4
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 4 deletions.
8 changes: 5 additions & 3 deletions airflow/jobs/backfill_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ class _DagRunTaskStatus:
failed: set[TaskInstanceKey] = attr.ib(factory=set)
not_ready: set[TaskInstanceKey] = attr.ib(factory=set)
deadlocked: set[TaskInstance] = attr.ib(factory=set)
active_runs: list[DagRun] = attr.ib(factory=list)
active_runs: set[DagRun] = attr.ib(factory=set)
executed_dag_run_dates: set[pendulum.DateTime] = attr.ib(factory=set)
finished_runs: int = 0
total_runs: int = 0
Expand Down Expand Up @@ -518,6 +518,8 @@ def _per_task_process(key, ti: TaskInstance, session):
ti_status.running.pop(key)
# Reset the failed task in backfill to scheduled state
ti.set_state(TaskInstanceState.SCHEDULED, session=session)
if ti.dag_run not in ti_status.active_runs:
ti_status.active_runs.add(ti.dag_run)
else:
# Default behaviour which works for subdag.
if ti.state in (TaskInstanceState.FAILED, TaskInstanceState.UPSTREAM_FAILED):
Expand Down Expand Up @@ -738,7 +740,7 @@ def to_keep(key: TaskInstanceKey) -> bool:
session.commit()

# update dag run state
_dag_runs = ti_status.active_runs[:]
_dag_runs = ti_status.active_runs.copy()
for run in _dag_runs:
run.update_state(session=session)
if run.state in State.finished_dr_states:
Expand Down Expand Up @@ -840,7 +842,7 @@ def _execute_dagruns(
dag_run = self._get_dag_run(dagrun_info, dag, session=session)
if dag_run is not None:
tis_map = self._task_instances_for_dag_run(dag, dag_run, session=session)
ti_status.active_runs.append(dag_run)
ti_status.active_runs.add(dag_run)
ti_status.to_run.update(tis_map or {})

processed_dag_run_dates = self._process_backfill_task_instances(
Expand Down
36 changes: 36 additions & 0 deletions tests/dags/test_backfill_with_upstream_failed_task.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

import datetime

from airflow.models.dag import DAG
from airflow.operators.bash import BashOperator

dag = DAG(
dag_id="test_backfill_with_upstream_failed_task",
default_args={"retries": 0, "start_date": datetime.datetime(2010, 1, 1)},
schedule="0 0 * * *",
)

failing_task = BashOperator(task_id="failing_task", bash_command="exit 1", dag=dag)
downstream_task = BashOperator(task_id="downstream_task", bash_command="echo 1", dag=dag)
downstream_task.set_upstream(failing_task)

if __name__ == "__main__":
dag.cli()
30 changes: 29 additions & 1 deletion tests/jobs/test_backfill_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -1916,7 +1916,7 @@ def consumer(value):
executor = MockExecutor()

ti_status = BackfillJobRunner._DagRunTaskStatus()
ti_status.active_runs.append(dr)
ti_status.active_runs.add(dr)
ti_status.to_run = {ti.key: ti for ti in dr.task_instances}

job = Job(executor=executor)
Expand Down Expand Up @@ -2103,3 +2103,31 @@ def test_backfill_disable_retry(self, dag_maker, disable_retry, try_number, exce
assert dag_run.state == DagRunState.FAILED

dag.clear()

def test_backfill_failed_dag_with_upstream_failed_task(self, dag_maker):
self.dagbag.process_file(str(TEST_DAGS_FOLDER / "test_backfill_with_upstream_failed_task.py"))
dag = self.dagbag.get_dag("test_backfill_with_upstream_failed_task")

# We have to use the "fake" version of perform_heartbeat due to the 'is_unit_test' check in
# the original one. However, instead of using the original version of perform_heartbeat,
# we can simply wait for a LocalExecutor's worker cycle. The approach with sleep works well now,
# but it can be replaced with checking the state of the LocalTaskJob.
def fake_perform_heartbeat(*args, **kwargs):
import time

time.sleep(1)

with mock.patch("airflow.jobs.backfill_job_runner.perform_heartbeat", fake_perform_heartbeat):
job = Job(executor=ExecutorLoader.load_executor("LocalExecutor"))
job_runner = BackfillJobRunner(
job=job,
dag=dag,
start_date=DEFAULT_DATE,
end_date=DEFAULT_DATE,
rerun_failed_tasks=True,
)
with pytest.raises(BackfillUnfinished):
run_job(job=job, execute_callable=job_runner._execute)

dr: DagRun = dag.get_last_dagrun()
assert dr.state == State.FAILED

0 comments on commit 817c5a4

Please sign in to comment.