Skip to content

Commit

Permalink
Fix slow (cleared) tasks being be adopted by Celery worker. (#16718)
Browse files Browse the repository at this point in the history
Celery executor is currently adopting anything that has ever run before and has been cleared since then.

**Example of the issue:**
We have a DAG that runs over 150 sensor tasks and 50 ETL tasks while having a concurrency of 3 and max_active_runs of 16. This setup is required because we want to divide the resources and we don't want this DAG to take up all the resources. What will happen is that many tasks will be in scheduled for a bit as it can't queue them due to the concurrency of 3. However, because of the current implementations, if these tasks ever run before, they would get adopted by the schedulers executor instance and become stuck forever [without this PR](#16550). However, they should have never been adopted in the first place.

**Contents of the PR**:
1. Tasks that are in scheduled should never have arrived at an executor. Hence, we remove the task state scheduled from the option to be adopted.
2. Given this task instance `external_executor_id`  is quite important in deciding whether it is adopted, we will also reset this when we reset the state of the TaskInstance.

(cherry picked from commit 554a239)
  • Loading branch information
Jorricks authored and kaxil committed Aug 17, 2021
1 parent f7c6562 commit 69af899
Show file tree
Hide file tree
Showing 5 changed files with 35 additions and 8 deletions.
2 changes: 1 addition & 1 deletion airflow/jobs/scheduler_job.py
Expand Up @@ -1859,7 +1859,7 @@ def adopt_or_reset_orphaned_tasks(self, session: Session = None):
self.log.info("Marked %d SchedulerJob instances as failed", num_failed)
Stats.incr(self.__class__.__name__.lower() + '_end', num_failed)

resettable_states = [State.SCHEDULED, State.QUEUED, State.RUNNING]
resettable_states = [State.QUEUED, State.RUNNING]
query = (
session.query(TI)
.filter(TI.state.in_(resettable_states))
Expand Down
1 change: 1 addition & 0 deletions airflow/models/taskinstance.py
Expand Up @@ -170,6 +170,7 @@ def clear_task_instances(
# original max_tries or the last attempted try number.
ti.max_tries = max(ti.max_tries, ti.prev_attempted_tries)
ti.state = State.NONE
ti.external_executor_id = None
session.merge(ti)

task_id_by_key[ti.dag_id][ti.execution_date][ti.try_number].add(ti.task_id)
Expand Down
2 changes: 2 additions & 0 deletions tests/executors/test_celery_executor.py
Expand Up @@ -329,9 +329,11 @@ def test_try_adopt_task_instances(self):
ti1 = TaskInstance(task=task_1, execution_date=exec_date)
ti1.external_executor_id = '231'
ti1.queued_dttm = queued_dttm
ti1.state = State.QUEUED
ti2 = TaskInstance(task=task_2, execution_date=exec_date)
ti2.external_executor_id = '232'
ti2.queued_dttm = queued_dttm
ti2.state = State.QUEUED

tis = [ti1, ti2]
executor = celery_executor.CeleryExecutor()
Expand Down
14 changes: 7 additions & 7 deletions tests/jobs/test_scheduler_job.py
Expand Up @@ -2124,9 +2124,9 @@ def test_adopt_or_reset_orphaned_tasks(self):
session=session,
)
ti = dr.get_task_instance(task_id=op1.task_id, session=session)
ti.state = State.SCHEDULED
ti.state = State.QUEUED
ti2 = dr2.get_task_instance(task_id=op1.task_id, session=session)
ti2.state = State.SCHEDULED
ti2.state = State.QUEUED
session.commit()

processor = mock.MagicMock()
Expand All @@ -2140,7 +2140,7 @@ def test_adopt_or_reset_orphaned_tasks(self):
assert ti.state == State.NONE

ti2 = dr2.get_task_instance(task_id=op1.task_id, session=session)
assert ti2.state == State.SCHEDULED, "Tasks run by Backfill Jobs should not be reset"
assert ti2.state == State.QUEUED, "Tasks run by Backfill Jobs should not be reset"

@parameterized.expand(
[
Expand Down Expand Up @@ -3654,7 +3654,7 @@ def test_adopt_or_reset_orphaned_tasks_external_triggered_dag(self):
session=session,
)
ti = dr1.get_task_instances(session=session)[0]
ti.state = State.SCHEDULED
ti.state = State.QUEUED
session.merge(ti)
session.merge(dr1)
session.commit()
Expand Down Expand Up @@ -3799,12 +3799,12 @@ def test_adopt_or_reset_orphaned_tasks_stale_scheduler_jobs(self):

ti1, ti2 = dr1.get_task_instances(session=session)
dr1.state = State.RUNNING
ti1.state = State.SCHEDULED
ti1.state = State.QUEUED
ti1.queued_by_job_id = old_job.id
session.merge(dr1)
session.merge(ti1)

ti2.state = State.SCHEDULED
ti2.state = State.QUEUED
ti2.queued_by_job_id = self.scheduler_job.id
session.merge(ti2)
session.flush()
Expand All @@ -3816,7 +3816,7 @@ def test_adopt_or_reset_orphaned_tasks_stale_scheduler_jobs(self):
session.refresh(ti1)
assert ti1.state is None
session.refresh(ti2)
assert State.SCHEDULED == ti2.state
assert ti2.state == State.QUEUED
session.rollback()
if old_job.processor_agent:
old_job.processor_agent.end()
Expand Down
24 changes: 24 additions & 0 deletions tests/models/test_cleartasks.py
Expand Up @@ -56,6 +56,7 @@ def test_clear_task_instances(self):

ti0.run()
ti1.run()

with create_session() as session:
qry = session.query(TI).filter(TI.dag_id == dag.dag_id).all()
clear_task_instances(qry, session, dag=dag)
Expand All @@ -68,6 +69,29 @@ def test_clear_task_instances(self):
assert ti1.try_number == 2
assert ti1.max_tries == 3

def test_clear_task_instances_external_executor_id(self):
dag = DAG(
'test_clear_task_instances_external_executor_id',
start_date=DEFAULT_DATE,
end_date=DEFAULT_DATE + datetime.timedelta(days=10),
)
task0 = DummyOperator(task_id='task0', owner='test', dag=dag)
ti0 = TI(task=task0, execution_date=DEFAULT_DATE)
ti0.state = State.SUCCESS
ti0.external_executor_id = "some_external_executor_id"

with create_session() as session:
session.add(ti0)
session.commit()

qry = session.query(TI).filter(TI.dag_id == dag.dag_id).all()
clear_task_instances(qry, session, dag=dag)

ti0.refresh_from_db()

assert ti0.state is None
assert ti0.external_executor_id is None

def test_clear_task_instances_without_task(self):
dag = DAG(
'test_clear_task_instances_without_task',
Expand Down

0 comments on commit 69af899

Please sign in to comment.