Skip to content

Commit

Permalink
Fix changing the parent dag state on subdag clear (#15562)
Browse files Browse the repository at this point in the history
Closes: apache/airflow#15374
This pull request follows apache/airflow#14776.

Clearing a subdag with Downstream+Recursive does not automatically set the state of the parent dag so that the downstream parent tasks can execute.

GitOrigin-RevId: a4211e276fce6521f0423fe94b01241a9c43a22c
  • Loading branch information
zarrarrana authored and Cloud Composer Team committed Dec 7, 2022
1 parent f8350f7 commit 0c57d6f
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 0 deletions.
1 change: 1 addition & 0 deletions airflow/models/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -1206,6 +1206,7 @@ def clear(
tis = tis.filter(TI.task_id.in_(self.task_ids))

if include_parentdag and self.is_subdag and self.parent_dag is not None:
dag_ids.append(self.parent_dag.dag_id)
p_dag = self.parent_dag.sub_dag(
task_ids_or_regex=r"^{}$".format(self.dag_id.split('.')[1]),
include_upstream=False,
Expand Down
57 changes: 57 additions & 0 deletions tests/models/test_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -1363,6 +1363,63 @@ def test_clear_set_dagrun_state_for_subdag(self, dag_run_state):
)
assert dagrun.state == dag_run_state

@parameterized.expand(
[
(State.NONE,),
(State.RUNNING,),
]
)
def test_clear_set_dagrun_state_for_parent_dag(self, dag_run_state):
dag_id = 'test_clear_set_dagrun_state_parent_dag'
self._clean_up(dag_id)
task_id = 't1'
dag = DAG(dag_id, start_date=DEFAULT_DATE, max_active_runs=1)
t_1 = DummyOperator(task_id=task_id, dag=dag)
subdag = DAG(dag_id + '.test', start_date=DEFAULT_DATE, max_active_runs=1)
SubDagOperator(task_id='test', subdag=subdag, dag=dag)
t_2 = DummyOperator(task_id='task', dag=subdag)
subdag.parent_dag = dag
subdag.is_subdag = True

session = settings.Session()
dagrun_1 = dag.create_dagrun(
run_type=DagRunType.BACKFILL_JOB,
state=State.FAILED,
start_date=DEFAULT_DATE,
execution_date=DEFAULT_DATE,
)
dagrun_2 = subdag.create_dagrun(
run_type=DagRunType.BACKFILL_JOB,
state=State.FAILED,
start_date=DEFAULT_DATE,
execution_date=DEFAULT_DATE,
)
session.merge(dagrun_1)
session.merge(dagrun_2)
task_instance_1 = TI(t_1, execution_date=DEFAULT_DATE, state=State.RUNNING)
task_instance_2 = TI(t_2, execution_date=DEFAULT_DATE, state=State.RUNNING)
session.merge(task_instance_1)
session.merge(task_instance_2)
session.commit()

subdag.clear(
start_date=DEFAULT_DATE,
end_date=DEFAULT_DATE + datetime.timedelta(days=1),
dag_run_state=dag_run_state,
include_subdags=True,
include_parentdag=True,
session=session,
)

dagrun = (
session.query(
DagRun,
)
.filter(DagRun.dag_id == dag_id)
.one()
)
assert dagrun.state == dag_run_state

@parameterized.expand(
[(state, State.NONE) for state in State.task_states if state != State.RUNNING]
+ [(State.RUNNING, State.SHUTDOWN)]
Expand Down

0 comments on commit 0c57d6f

Please sign in to comment.