Skip to content

Commit

Permalink
Apply suggestions from code review and fix test
Browse files Browse the repository at this point in the history
  • Loading branch information
ephraimbuddy committed Mar 17, 2021
1 parent 741fa56 commit cec7ef2
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 11 deletions.
4 changes: 2 additions & 2 deletions airflow/models/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -1112,11 +1112,11 @@ def topological_sort(self, include_subdag_tasks: bool = False):
@provide_session
def set_dag_runs_state(
self,
dag_ids: List[str] = None,
state: str = State.RUNNING,
session: Session = None,
start_date: Optional[datetime] = None,
end_date: Optional[datetime] = None,
dag_ids: List[str] = None,
) -> None:
dag_ids = dag_ids or [self.dag_id]
query = session.query(DagRun).filter(DagRun.dag_id.in_(dag_ids))
Expand Down Expand Up @@ -1333,11 +1333,11 @@ def clear(
)

self.set_dag_runs_state(
dag_ids=dag_ids,
session=session,
start_date=start_date,
end_date=end_date,
state=dag_run_state,
dag_ids=dag_ids,
)
else:
count = 0
Expand Down
19 changes: 10 additions & 9 deletions tests/models/test_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -1328,8 +1328,14 @@ def test_clear_set_dagrun_state_for_subdag(self, dag_run_state):
start_date=DEFAULT_DATE,
execution_date=DEFAULT_DATE,
)
dagrun_2 = subdag.create_dagrun(
run_type=DagRunType.BACKFILL_JOB,
state=State.FAILED,
start_date=DEFAULT_DATE,
execution_date=DEFAULT_DATE,
)
session.merge(dagrun_1)

session.merge(dagrun_2)
task_instance_1 = TI(t_1, execution_date=DEFAULT_DATE, state=State.RUNNING)
task_instance_2 = TI(t_2, execution_date=DEFAULT_DATE, state=State.RUNNING)
session.merge(task_instance_1)
Expand All @@ -1345,18 +1351,13 @@ def test_clear_set_dagrun_state_for_subdag(self, dag_run_state):
session=session,
)

dagruns = (
dagrun = (
session.query(
DagRun,
)
.filter(
DagRun.dag_id.in_([dag_id, dag_id + '.test']),
)
.all()
.filter(DagRun.dag_id == subdag.dag_id)
.one()
)

assert len(dagruns) == 1
dagrun = dagruns[0] # type: DagRun
assert dagrun.state == dag_run_state

@parameterized.expand(
Expand Down

0 comments on commit cec7ef2

Please sign in to comment.