Skip to content
Permalink
Browse files
Pools with negative open slots should not block other pools (#23143)
  • Loading branch information
tanelk committed May 9, 2022
1 parent 24bb9f3 commit 7132be2f11db24161940f57613874b4af86369c7
Showing 3 changed files with 38 additions and 6 deletions.
@@ -273,7 +273,7 @@ def _executable_task_instances_to_queued(self, max_tis: int, session: Session =

# If the pools are full, there is no point doing anything!
# If _somehow_ the pool is overfull, don't let the limit go negative - it breaks SQL
pool_slots_free = max(0, sum(pool['open'] for pool in pools.values()))
pool_slots_free = sum(max(0, pool['open']) for pool in pools.values())

if pool_slots_free == 0:
self.log.debug("All pools are full!")
@@ -191,11 +191,7 @@ def slots_stats(

# calculate open metric
for pool_name, stats_dict in pools.items():
if stats_dict["total"] == -1:
# -1 means infinite
stats_dict["open"] = -1
else:
stats_dict["open"] = stats_dict["total"] - stats_dict["running"] - stats_dict["queued"]
stats_dict["open"] = stats_dict["total"] - stats_dict["running"] - stats_dict["queued"]

return pools

@@ -1135,6 +1135,42 @@ def test_find_executable_task_instances_not_enough_task_concurrency_for_first(se

session.rollback()

def test_find_executable_task_instances_negative_open_pool_slots(self, dag_maker):
"""
Pools with negative open slots should not block other pools.
Negative open slots can happen when reducing the number of total slots in a pool
while tasks are running in that pool.
"""
set_default_pool_slots(0)

self.scheduler_job = SchedulerJob(subdir=os.devnull)
session = settings.Session()

pool1 = Pool(pool='pool1', slots=1)
pool2 = Pool(pool='pool2', slots=1)

session.add(pool1)
session.add(pool2)

dag_id = 'SchedulerJobTest.test_find_executable_task_instances_negative_open_pool_slots'
with dag_maker(dag_id=dag_id):
op1 = EmptyOperator(task_id='op1', pool='pool1')
op2 = EmptyOperator(task_id='op2', pool='pool2', pool_slots=2)

dr1 = dag_maker.create_dagrun(run_type=DagRunType.SCHEDULED)

ti1 = dr1.get_task_instance(op1.task_id, session)
ti2 = dr1.get_task_instance(op2.task_id, session)
ti1.state = State.SCHEDULED
ti2.state = State.RUNNING
session.flush()

res = self.scheduler_job._executable_task_instances_to_queued(max_tis=1, session=session)
assert 1 == len(res)
assert res[0].key == ti1.key

session.rollback()

@mock.patch('airflow.jobs.scheduler_job.Stats.gauge')
def test_emit_pool_starving_tasks_metrics(self, mock_stats_gauge, dag_maker):
self.scheduler_job = SchedulerJob(subdir=os.devnull)

0 comments on commit 7132be2

Please sign in to comment.