Skip to content

Commit

Permalink
fail the tasks instances rather than set to none
Browse files Browse the repository at this point in the history
  • Loading branch information
ephraimbuddy committed Jan 13, 2022
1 parent 4ca34f6 commit 3c5079f
Show file tree
Hide file tree
Showing 2 changed files with 2 additions and 2 deletions.
2 changes: 1 addition & 1 deletion airflow/jobs/scheduler_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -411,7 +411,7 @@ def _executable_task_instances_to_queued(self, max_tis: int, session: Session =
task_instance,
)
session.query(TI).filter(TI.dag_id == dag_id, TI.state == State.SCHEDULED).update(
{TI.state: State.NONE}, synchronize_session='fetch'
{TI.state: State.FAILED}, synchronize_session='fetch'
)
continue
if serialized_dag.has_task(task_instance.task_id):
Expand Down
2 changes: 1 addition & 1 deletion tests/jobs/test_scheduler_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -671,7 +671,7 @@ def test_queued_task_instances_fails_with_missing_dag(self, dag_maker, session):
assert 0 == len(res)
tis = dr.get_task_instances(session=session)
assert len(tis) == 2
assert all(ti.state == State.NONE for ti in tis)
assert all(ti.state == State.FAILED for ti in tis)

def test_nonexistent_pool(self, dag_maker):
dag_id = 'SchedulerJobTest.test_nonexistent_pool'
Expand Down

0 comments on commit 3c5079f

Please sign in to comment.