From 48533f3990680a2ba0d8de5e2c801c74d9d38a4b Mon Sep 17 00:00:00 2001 From: ColtenOuO Date: Fri, 1 May 2026 18:46:06 +0000 Subject: [PATCH 1/4] Fix N+1 query pattern in bulk pool delete endpoint --- .../core_api/services/public/pools.py | 16 ++++++---------- 1 file changed, 6 insertions(+), 10 deletions(-) diff --git a/airflow-core/src/airflow/api_fastapi/core_api/services/public/pools.py b/airflow-core/src/airflow/api_fastapi/core_api/services/public/pools.py index d23b0acc02beb..dce54099772de 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/services/public/pools.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/services/public/pools.py @@ -188,7 +188,9 @@ def handle_bulk_update(self, action: BulkUpdateAction[PoolBody], results: BulkAc def handle_bulk_delete(self, action: BulkDeleteAction[PoolBody], results: BulkActionResponse) -> None: """Bulk delete pools.""" to_delete_pool_names = set(action.entities) - _, matched_pool_names, not_found_pool_names = self.categorize_pools(to_delete_pool_names) + existing_pools_dict, matched_pool_names, not_found_pool_names = self.categorize_pools( + to_delete_pool_names + ) try: if action.action_on_non_existence == BulkActionNotOnExistence.FAIL and not_found_pool_names: @@ -196,16 +198,10 @@ def handle_bulk_delete(self, action: BulkDeleteAction[PoolBody], results: BulkAc status_code=status.HTTP_404_NOT_FOUND, detail=f"The pools with these pool names: {not_found_pool_names} were not found.", ) - if action.action_on_non_existence == BulkActionNotOnExistence.SKIP: - delete_pool_names = matched_pool_names - else: - delete_pool_names = to_delete_pool_names - for pool_name in delete_pool_names: - existing_pool = self.session.scalar(select(Pool).where(Pool.pool == pool_name).limit(1)) - if existing_pool: - self.session.delete(existing_pool) - results.success.append(pool_name) + for pool_name in matched_pool_names: + self.session.delete(existing_pools_dict[pool_name]) + results.success.append(pool_name) except HTTPException as e: results.errors.append({"error": f"{e.detail}", "status_code": e.status_code}) From 6ded2bfc14b47d6abb32abad67243af5aa0d3862 Mon Sep 17 00:00:00 2001 From: ColtenOuO Date: Sat, 2 May 2026 20:01:33 +0000 Subject: [PATCH 2/4] Add query-count test for bulk pool delete --- .../core_api/routes/public/test_pools.py | 38 +++++++++++++++++++ 1 file changed, 38 insertions(+) diff --git a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_pools.py b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_pools.py index 2741d0543351a..d259944f5a5bd 100644 --- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_pools.py +++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_pools.py @@ -25,6 +25,7 @@ from airflow.models.team import Team from airflow.utils.session import provide_session +from tests_common.test_utils.asserts import count_queries from tests_common.test_utils.config import conf_vars from tests_common.test_utils.db import clear_db_pools, clear_db_teams from tests_common.test_utils.logs import check_last_log @@ -1133,6 +1134,43 @@ def test_update_mask_preserves_other_fields(self, test_client, session): assert updated_pool.description is None # unchanged assert updated_pool.include_deferred is True # unchanged + def test_bulk_delete_query_count_is_independent_of_pool_count(self, test_client, session): + # Regression guard for the N+1 fix in BulkPoolService.handle_bulk_delete: + # the query count for a bulk delete must be the same regardless of how + # many pools are deleted. A regression that re-queries each pool inside + # the loop would add one SELECT per pool, so the larger run would issue + # strictly more queries than the smaller one. + counts: dict[int, int] = {} + for num_pools in (5, 10): + pool_names = [f"perf_pool_{num_pools}_{i}" for i in range(num_pools)] + session.add_all(Pool(pool=name, slots=1, include_deferred=False) for name in pool_names) + session.commit() + + request_body = { + "actions": [ + { + "action": "delete", + "entities": pool_names, + "action_on_non_existence": "fail", + } + ] + } + + with count_queries() as result: + response = test_client.patch("/pools", json=request_body) + counts[num_pools] = sum(result.values()) + + assert response.status_code == 200 + assert sorted(response.json()["delete"]["success"]) == sorted(pool_names) + remaining = session.scalars(select(Pool).where(Pool.pool.in_(pool_names))).all() + assert remaining == [] + + assert counts[5] == counts[10], ( + f"Bulk-delete query count is not constant: {counts[5]} queries for 5 pools " + f"vs {counts[10]} queries for 10 pools. A regression that re-queries pools " + f"inside the loop would add one SELECT per pool." + ) + def test_should_respond_401(self, unauthenticated_test_client): response = unauthenticated_test_client.patch("/pools", json={}) assert response.status_code == 401 From c1542bc2ad2f1e8bb8eedacc1c6c646d758fed1c Mon Sep 17 00:00:00 2001 From: ColtenOuO Date: Sun, 17 May 2026 08:44:07 +0000 Subject: [PATCH 3/4] Refactor bulk-delete query-count test to parametrize --- .../core_api/routes/public/test_pools.py | 41 +++++++++++-------- 1 file changed, 24 insertions(+), 17 deletions(-) diff --git a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_pools.py b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_pools.py index d259944f5a5bd..e899632befd7b 100644 --- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_pools.py +++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_pools.py @@ -1134,41 +1134,48 @@ def test_update_mask_preserves_other_fields(self, test_client, session): assert updated_pool.description is None # unchanged assert updated_pool.include_deferred is True # unchanged - def test_bulk_delete_query_count_is_independent_of_pool_count(self, test_client, session): + @pytest.mark.parametrize( + ("count_a", "count_b"), + [ + (5, 10), # testcase: pool count 5 compare with pool count 10 + (5, 20), # testcase: pool count 5 compare with pool count 20 + (10, 20), # testcase: pool count 10 compare with pool count 20 + ], + ) + def test_bulk_delete_query_count_is_independent_of_pool_count( + self, test_client, session, count_a, count_b + ): # Regression guard for the N+1 fix in BulkPoolService.handle_bulk_delete: # the query count for a bulk delete must be the same regardless of how # many pools are deleted. A regression that re-queries each pool inside # the loop would add one SELECT per pool, so the larger run would issue # strictly more queries than the smaller one. - counts: dict[int, int] = {} - for num_pools in (5, 10): + + def execute_and_count(num_pools: int) -> int: pool_names = [f"perf_pool_{num_pools}_{i}" for i in range(num_pools)] session.add_all(Pool(pool=name, slots=1, include_deferred=False) for name in pool_names) session.commit() request_body = { - "actions": [ - { - "action": "delete", - "entities": pool_names, - "action_on_non_existence": "fail", - } - ] + "actions": [{"action": "delete", "entities": pool_names, "action_on_non_existence": "fail"}] } with count_queries() as result: response = test_client.patch("/pools", json=request_body) - counts[num_pools] = sum(result.values()) assert response.status_code == 200 assert sorted(response.json()["delete"]["success"]) == sorted(pool_names) - remaining = session.scalars(select(Pool).where(Pool.pool.in_(pool_names))).all() - assert remaining == [] + assert session.scalars(select(Pool).where(Pool.pool.in_(pool_names))).all() == [] + + return sum(result.values()) + + queries_a = execute_and_count(count_a) + queries_b = execute_and_count(count_b) - assert counts[5] == counts[10], ( - f"Bulk-delete query count is not constant: {counts[5]} queries for 5 pools " - f"vs {counts[10]} queries for 10 pools. A regression that re-queries pools " - f"inside the loop would add one SELECT per pool." + assert queries_a == queries_b, ( + f"Bulk-delete query count is not constant! " + f"{queries_a} queries for {count_a} pools vs {queries_b} queries for {count_b} pools. " + f"A regression that re-queries pools inside the loop would add one SELECT per pool." ) def test_should_respond_401(self, unauthenticated_test_client): From 69fc441ed4922d5701f8b27b588526690796a717 Mon Sep 17 00:00:00 2001 From: ColtenOuO Date: Sun, 17 May 2026 17:50:10 +0000 Subject: [PATCH 4/4] Parametrize bulk-delete query-count test by pool size --- .../core_api/routes/public/test_pools.py | 45 ++++++++----------- 1 file changed, 18 insertions(+), 27 deletions(-) diff --git a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_pools.py b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_pools.py index e899632befd7b..4cd953d9518f4 100644 --- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_pools.py +++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_pools.py @@ -1135,46 +1135,37 @@ def test_update_mask_preserves_other_fields(self, test_client, session): assert updated_pool.include_deferred is True # unchanged @pytest.mark.parametrize( - ("count_a", "count_b"), - [ - (5, 10), # testcase: pool count 5 compare with pool count 10 - (5, 20), # testcase: pool count 5 compare with pool count 20 - (10, 20), # testcase: pool count 10 compare with pool count 20 - ], + ("pool_count"), + [5, 10, 20], ) - def test_bulk_delete_query_count_is_independent_of_pool_count( - self, test_client, session, count_a, count_b - ): + def test_bulk_delete_query_count_is_independent_of_pool_count(self, test_client, session, pool_count): # Regression guard for the N+1 fix in BulkPoolService.handle_bulk_delete: # the query count for a bulk delete must be the same regardless of how # many pools are deleted. A regression that re-queries each pool inside # the loop would add one SELECT per pool, so the larger run would issue # strictly more queries than the smaller one. - def execute_and_count(num_pools: int) -> int: - pool_names = [f"perf_pool_{num_pools}_{i}" for i in range(num_pools)] - session.add_all(Pool(pool=name, slots=1, include_deferred=False) for name in pool_names) - session.commit() + EXPECTED_QUERY_COUNT = 4 - request_body = { - "actions": [{"action": "delete", "entities": pool_names, "action_on_non_existence": "fail"}] - } + pool_names = [f"perf_pool_{pool_count}_{i}" for i in range(pool_count)] + session.add_all(Pool(pool=name, slots=1, include_deferred=False) for name in pool_names) + session.commit() - with count_queries() as result: - response = test_client.patch("/pools", json=request_body) + request_body = { + "actions": [{"action": "delete", "entities": pool_names, "action_on_non_existence": "fail"}] + } - assert response.status_code == 200 - assert sorted(response.json()["delete"]["success"]) == sorted(pool_names) - assert session.scalars(select(Pool).where(Pool.pool.in_(pool_names))).all() == [] + with count_queries() as result: + response = test_client.patch("/pools", json=request_body) - return sum(result.values()) + assert response.status_code == 200 + assert sorted(response.json()["delete"]["success"]) == sorted(pool_names) + assert session.scalars(select(Pool).where(Pool.pool.in_(pool_names))).all() == [] - queries_a = execute_and_count(count_a) - queries_b = execute_and_count(count_b) + query_count = sum(result.values()) - assert queries_a == queries_b, ( - f"Bulk-delete query count is not constant! " - f"{queries_a} queries for {count_a} pools vs {queries_b} queries for {count_b} pools. " + assert query_count == EXPECTED_QUERY_COUNT, ( + f"Bulk-delete query count {query_count} does not match expected {EXPECTED_QUERY_COUNT}. " f"A regression that re-queries pools inside the loop would add one SELECT per pool." )