Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
# under the License.
from __future__ import annotations

import logging
import warnings
from contextlib import suppress
from functools import cached_property
Expand Down Expand Up @@ -121,6 +122,8 @@
RESOURCE_ASSET_ALIAS,
)

log = logging.getLogger(__name__)


_MAP_DAG_ACCESS_ENTITY_TO_FAB_RESOURCE_TYPE: dict[DagAccessEntity, tuple[str, ...]] = {
DagAccessEntity.AUDIT_LOG: (RESOURCE_AUDIT_LOG,),
Expand Down Expand Up @@ -240,7 +243,10 @@ async def cleanup_session_middleware(request, call_next):
from airflow import settings

if settings.Session:
settings.Session.remove()
try:
settings.Session.remove()
except Exception:
log.warning("Failed to remove session during cleanup", exc_info=True)

app.mount("/", WSGIMiddleware(flask_app))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1110,3 +1110,84 @@ def test_session_cleanup_middleware_on_fastapi_route(self, mock_create_app):

# Verify Session.remove() was called
mock_session.remove.assert_called()


class TestFabAuthManagerSessionCleanupErrorHandling:
"""Test that cleanup_session_middleware handles Session.remove() failures gracefully.

When the underlying database connection is dead (e.g., MySQL 'Server has gone away',
PostgreSQL connection timeout), Session.remove() internally attempts a ROLLBACK which
raises an OperationalError. The middleware should catch and log this error instead of
propagating it as a 500 Internal Server Error to the client.

See: https://github.com/apache/airflow/issues/XXXXX
"""

@mock.patch("airflow.providers.fab.auth_manager.fab_auth_manager.create_app")
def test_session_remove_db_error_does_not_propagate(self, mock_create_app):
"""When Session.remove() raises OperationalError, request should still succeed.

Simulates MySQL 'Server has gone away' or similar DB errors during session
cleanup. The middleware should catch the exception and log a warning.
"""
from unittest.mock import patch

from fastapi.testclient import TestClient
from sqlalchemy.exc import OperationalError

mock_flask_app = MagicMock()
mock_create_app.return_value = mock_flask_app

auth_manager = FabAuthManager()
fastapi_app = auth_manager.get_fastapi_app()

client = TestClient(fastapi_app, raise_server_exceptions=False)

with (
patch("airflow.settings.Session") as mock_session,
patch("airflow.providers.fab.auth_manager.fab_auth_manager.log") as mock_log,
):
# Simulate MySQL 'Server has gone away' error on Session.remove()
mock_session.remove.side_effect = OperationalError(
"ROLLBACK", {}, Exception("(2006, 'Server has gone away')")
)

response = client.get("/users/list/")

# The request should NOT get a 500 from the middleware error
# (it may get other status codes from the mock Flask app, but not
# an unhandled exception from cleanup_session_middleware)
mock_session.remove.assert_called()
mock_log.warning.assert_called()
assert response is not None

@mock.patch("airflow.providers.fab.auth_manager.fab_auth_manager.create_app")
def test_session_remove_generic_error_does_not_propagate(self, mock_create_app):
"""Any exception from Session.remove() should be caught during cleanup.

This covers edge cases beyond OperationalError, such as AttributeError
when the session registry is in an unexpected state.
"""
from unittest.mock import patch

from fastapi.testclient import TestClient

mock_flask_app = MagicMock()
mock_create_app.return_value = mock_flask_app

auth_manager = FabAuthManager()
fastapi_app = auth_manager.get_fastapi_app()

client = TestClient(fastapi_app, raise_server_exceptions=False)

with (
patch("airflow.settings.Session") as mock_session,
patch("airflow.providers.fab.auth_manager.fab_auth_manager.log") as mock_log,
):
mock_session.remove.side_effect = RuntimeError("unexpected session error")

# Should not raise - the middleware catches all exceptions from Session.remove()
response = client.get("/users/list/")
mock_session.remove.assert_called()
mock_log.warning.assert_called()
assert response is not None