Skip to content

Commit

Permalink
Fix running child tasks in a subdag after clearing a successful subdag (
Browse files Browse the repository at this point in the history
#14776)

After successfully running a SUBDAG, clearing it
(including downstream+recursive) doesn't trigger the inner tasks.
Instead, the subdag is marked successful and the inner tasks all
stay cleared and aren't re-run.

The above problem is because the DagRun state of the subdags are not updated
after clearing. This PR solves it by updating the DagRun state of all DAGs
including subdags when include_subdags is True

(cherry picked from commit 0521635)
  • Loading branch information
ephraimbuddy authored and ashb committed Mar 18, 2021
1 parent ac43056 commit f4cc5c5
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 2 deletions.
10 changes: 8 additions & 2 deletions airflow/models/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -1116,13 +1116,15 @@ def set_dag_runs_state(
session: Session = None,
start_date: Optional[datetime] = None,
end_date: Optional[datetime] = None,
dag_ids: List[str] = None,
) -> None:
query = session.query(DagRun).filter_by(dag_id=self.dag_id)
dag_ids = dag_ids or [self.dag_id]
query = session.query(DagRun).filter(DagRun.dag_id.in_(dag_ids))
if start_date:
query = query.filter(DagRun.execution_date >= start_date)
if end_date:
query = query.filter(DagRun.execution_date <= end_date)
query.update({DagRun.state: state})
query.update({DagRun.state: state}, synchronize_session='fetch')

@provide_session
def clear(
Expand Down Expand Up @@ -1183,11 +1185,13 @@ def clear(
"""
TI = TaskInstance
tis = session.query(TI)
dag_ids = []
if include_subdags:
# Crafting the right filter for dag_id and task_ids combo
conditions = []
for dag in self.subdags + [self]:
conditions.append((TI.dag_id == dag.dag_id) & TI.task_id.in_(dag.task_ids))
dag_ids.append(dag.dag_id)
tis = tis.filter(or_(*conditions))
else:
tis = session.query(TI).filter(TI.dag_id == self.dag_id)
Expand Down Expand Up @@ -1327,11 +1331,13 @@ def clear(
dag=self,
activate_dag_runs=False, # We will set DagRun state later.
)

self.set_dag_runs_state(
session=session,
start_date=start_date,
end_date=end_date,
state=dag_run_state,
dag_ids=dag_ids,
)
else:
count = 0
Expand Down
55 changes: 55 additions & 0 deletions tests/models/test_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -1298,6 +1298,61 @@ def test_clear_set_dagrun_state(self, dag_run_state):
dagrun = dagruns[0] # type: DagRun
assert dagrun.state == dag_run_state

@parameterized.expand(
[
(State.NONE,),
(State.RUNNING,),
]
)
def test_clear_set_dagrun_state_for_subdag(self, dag_run_state):
dag_id = 'test_clear_set_dagrun_state_subdag'
self._clean_up(dag_id)
task_id = 't1'
dag = DAG(dag_id, start_date=DEFAULT_DATE, max_active_runs=1)
t_1 = DummyOperator(task_id=task_id, dag=dag)
subdag = DAG(dag_id + '.test', start_date=DEFAULT_DATE, max_active_runs=1)
SubDagOperator(task_id='test', subdag=subdag, dag=dag)
t_2 = DummyOperator(task_id='task', dag=subdag)

session = settings.Session()
dagrun_1 = dag.create_dagrun(
run_type=DagRunType.BACKFILL_JOB,
state=State.FAILED,
start_date=DEFAULT_DATE,
execution_date=DEFAULT_DATE,
)
dagrun_2 = subdag.create_dagrun(
run_type=DagRunType.BACKFILL_JOB,
state=State.FAILED,
start_date=DEFAULT_DATE,
execution_date=DEFAULT_DATE,
)
session.merge(dagrun_1)
session.merge(dagrun_2)
task_instance_1 = TI(t_1, execution_date=DEFAULT_DATE, state=State.RUNNING)
task_instance_2 = TI(t_2, execution_date=DEFAULT_DATE, state=State.RUNNING)
session.merge(task_instance_1)
session.merge(task_instance_2)
session.commit()

dag.clear(
start_date=DEFAULT_DATE,
end_date=DEFAULT_DATE + datetime.timedelta(days=1),
dag_run_state=dag_run_state,
include_subdags=True,
include_parentdag=False,
session=session,
)

dagrun = (
session.query(
DagRun,
)
.filter(DagRun.dag_id == subdag.dag_id)
.one()
)
assert dagrun.state == dag_run_state

@parameterized.expand(
[(state, State.NONE) for state in State.task_states if state != State.RUNNING]
+ [(State.RUNNING, State.SHUTDOWN)]
Expand Down

0 comments on commit f4cc5c5

Please sign in to comment.