Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -188,24 +188,20 @@ def handle_bulk_update(self, action: BulkUpdateAction[PoolBody], results: BulkAc
def handle_bulk_delete(self, action: BulkDeleteAction[PoolBody], results: BulkActionResponse) -> None:
"""Bulk delete pools."""
to_delete_pool_names = set(action.entities)
_, matched_pool_names, not_found_pool_names = self.categorize_pools(to_delete_pool_names)
existing_pools_dict, matched_pool_names, not_found_pool_names = self.categorize_pools(
to_delete_pool_names
)

try:
if action.action_on_non_existence == BulkActionNotOnExistence.FAIL and not_found_pool_names:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"The pools with these pool names: {not_found_pool_names} were not found.",
)
if action.action_on_non_existence == BulkActionNotOnExistence.SKIP:
delete_pool_names = matched_pool_names
else:
delete_pool_names = to_delete_pool_names

for pool_name in delete_pool_names:
existing_pool = self.session.scalar(select(Pool).where(Pool.pool == pool_name).limit(1))
if existing_pool:
self.session.delete(existing_pool)
results.success.append(pool_name)
for pool_name in matched_pool_names:
self.session.delete(existing_pools_dict[pool_name])
results.success.append(pool_name)

except HTTPException as e:
results.errors.append({"error": f"{e.detail}", "status_code": e.status_code})
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from airflow.models.team import Team
from airflow.utils.session import provide_session

from tests_common.test_utils.asserts import count_queries
from tests_common.test_utils.config import conf_vars
from tests_common.test_utils.db import clear_db_pools, clear_db_teams
from tests_common.test_utils.logs import check_last_log
Expand Down Expand Up @@ -1133,6 +1134,41 @@ def test_update_mask_preserves_other_fields(self, test_client, session):
assert updated_pool.description is None # unchanged
assert updated_pool.include_deferred is True # unchanged

@pytest.mark.parametrize(
("pool_count"),
[5, 10, 20],
)
def test_bulk_delete_query_count_is_independent_of_pool_count(self, test_client, session, pool_count):
# Regression guard for the N+1 fix in BulkPoolService.handle_bulk_delete:
# the query count for a bulk delete must be the same regardless of how
# many pools are deleted. A regression that re-queries each pool inside
# the loop would add one SELECT per pool, so the larger run would issue
# strictly more queries than the smaller one.

EXPECTED_QUERY_COUNT = 4

pool_names = [f"perf_pool_{pool_count}_{i}" for i in range(pool_count)]
session.add_all(Pool(pool=name, slots=1, include_deferred=False) for name in pool_names)
session.commit()

request_body = {
"actions": [{"action": "delete", "entities": pool_names, "action_on_non_existence": "fail"}]
}

with count_queries() as result:
response = test_client.patch("/pools", json=request_body)

assert response.status_code == 200
assert sorted(response.json()["delete"]["success"]) == sorted(pool_names)
assert session.scalars(select(Pool).where(Pool.pool.in_(pool_names))).all() == []

query_count = sum(result.values())

assert query_count == EXPECTED_QUERY_COUNT, (
f"Bulk-delete query count {query_count} does not match expected {EXPECTED_QUERY_COUNT}. "
f"A regression that re-queries pools inside the loop would add one SELECT per pool."
)

def test_should_respond_401(self, unauthenticated_test_client):
response = unauthenticated_test_client.patch("/pools", json={})
assert response.status_code == 401
Expand Down
Loading