diff --git a/airflow-core/src/airflow/settings.py b/airflow-core/src/airflow/settings.py index bf47f86a2c27a..6648714ad21dc 100644 --- a/airflow-core/src/airflow/settings.py +++ b/airflow-core/src/airflow/settings.py @@ -373,16 +373,20 @@ def create_async_metadata_engine( sql_alchemy_conn_async: str, *, connect_args: dict[str, Any], + engine_args: dict[str, Any] | None = None, ) -> AsyncEngine: """ Create the async SQLAlchemy Engine for the Airflow metadata database. Override in ``airflow_local_settings.py`` to customize async engine creation. For ``do_connect`` handlers, register on ``engine.sync_engine``. + + :param engine_args: Pool and engine configuration (pool_size, pool_recycle, etc.). """ return create_async_engine( sql_alchemy_conn_async, connect_args=connect_args, + **(engine_args or {}), future=True, ) @@ -402,9 +406,23 @@ def _configure_async_session() -> None: AsyncSession = None return + # Apply the same pool health settings used by the sync engine. + # Without these, the async pool uses SQLAlchemy defaults (pool_recycle=-1, + # pool_pre_ping=False) which means dead connections from PostgreSQL idle + # timeouts or pgbouncer disconnects are never detected. + engine_args: dict[str, Any] = {} + if not conf.getboolean("database", "SQL_ALCHEMY_POOL_ENABLED"): + engine_args["poolclass"] = NullPool + elif not SQL_ALCHEMY_CONN_ASYNC.startswith("sqlite"): + engine_args["pool_size"] = conf.getint("database", "SQL_ALCHEMY_POOL_SIZE", fallback=5) + engine_args["pool_recycle"] = conf.getint("database", "SQL_ALCHEMY_POOL_RECYCLE", fallback=1800) + engine_args["pool_pre_ping"] = conf.getboolean("database", "SQL_ALCHEMY_POOL_PRE_PING", fallback=True) + engine_args["max_overflow"] = conf.getint("database", "SQL_ALCHEMY_MAX_OVERFLOW", fallback=10) + async_engine = create_async_metadata_engine( SQL_ALCHEMY_CONN_ASYNC, connect_args=_get_connect_args("async"), + engine_args=engine_args, ) AsyncSession = async_sessionmaker( bind=async_engine, diff --git a/airflow-core/tests/unit/core/test_settings.py b/airflow-core/tests/unit/core/test_settings.py index 22cb00b36bb0e..588567181c52e 100644 --- a/airflow-core/tests/unit/core/test_settings.py +++ b/airflow-core/tests/unit/core/test_settings.py @@ -27,6 +27,7 @@ import pytest from sqlalchemy.engine import Engine from sqlalchemy.ext.asyncio import AsyncEngine +from sqlalchemy.pool import NullPool from airflow import settings from airflow.exceptions import AirflowClusterPolicyViolation, AirflowConfigException @@ -262,18 +263,70 @@ def test_configure_orm_delegates_to_create_metadata_engine(self, mock_async_sess def test_configure_async_session_delegates_to_create_async_metadata_engine( self, mock_create_async_engine ): - """_configure_async_session() must call create_async_metadata_engine.""" + """_configure_async_session() must call create_async_metadata_engine with no pool args for sqlite.""" from airflow import settings mock_create_async_engine.return_value = MagicMock() - with patch("airflow.settings.SQL_ALCHEMY_CONN_ASYNC", "sqlite+aiosqlite://"): + with ( + patch("airflow.settings.SQL_ALCHEMY_CONN_ASYNC", "sqlite+aiosqlite://"), + patch("airflow.settings.conf") as mock_conf, + ): + # Pool enabled but sqlite -- pool args should be skipped + mock_conf.getboolean.return_value = True settings._configure_async_session() mock_create_async_engine.assert_called_once() call_kwargs = mock_create_async_engine.call_args assert call_kwargs[0][0] == "sqlite+aiosqlite://" assert "connect_args" in call_kwargs[1] + # sqlite doesn't support pool size args + assert call_kwargs[1]["engine_args"] == {} + + @patch("airflow.settings.create_async_metadata_engine") + def test_configure_async_session_passes_pool_args_for_non_sqlite(self, mock_create_async_engine): + """_configure_async_session() must pass pool configuration for non-sqlite backends.""" + from airflow import settings + + mock_create_async_engine.return_value = MagicMock() + + with ( + patch("airflow.settings.SQL_ALCHEMY_CONN_ASYNC", "postgresql+asyncpg://localhost/airflow"), + patch("airflow.settings.conf") as mock_conf, + ): + mock_conf.getint.side_effect = lambda section, key, fallback=None: { + "SQL_ALCHEMY_POOL_SIZE": 10, + "SQL_ALCHEMY_POOL_RECYCLE": 900, + "SQL_ALCHEMY_MAX_OVERFLOW": 5, + }.get(key, fallback) + mock_conf.getboolean.return_value = True + + settings._configure_async_session() + + engine_args = mock_create_async_engine.call_args[1]["engine_args"] + assert engine_args["pool_size"] == 10 + assert engine_args["pool_recycle"] == 900 + assert engine_args["pool_pre_ping"] is True + assert engine_args["max_overflow"] == 5 + + @patch("airflow.settings.create_async_metadata_engine") + def test_configure_async_session_uses_nullpool_when_pool_disabled(self, mock_create_async_engine): + """_configure_async_session() must use NullPool when SQL_ALCHEMY_POOL_ENABLED is False.""" + from airflow import settings + + mock_create_async_engine.return_value = MagicMock() + + with ( + patch("airflow.settings.SQL_ALCHEMY_CONN_ASYNC", "postgresql+asyncpg://localhost/airflow"), + patch("airflow.settings.conf") as mock_conf, + ): + mock_conf.getboolean.return_value = False + + settings._configure_async_session() + + engine_args = mock_create_async_engine.call_args[1]["engine_args"] + assert engine_args["poolclass"] is NullPool + assert "pool_size" not in engine_args @patch("airflow.settings.create_async_metadata_engine") def test_configure_async_session_skips_when_no_async_conn(self, mock_create_async_engine): @@ -313,12 +366,18 @@ def test_default_create_async_metadata_engine_forwards_args(self, mock_sa_create mock_sa_create_async.return_value = MagicMock() connect_args = {"timeout": 30} + engine_args = {"pool_size": 5, "pool_recycle": 1800, "pool_pre_ping": True} - settings.create_async_metadata_engine("sqlite+aiosqlite://", connect_args=connect_args) + settings.create_async_metadata_engine( + "sqlite+aiosqlite://", connect_args=connect_args, engine_args=engine_args + ) mock_sa_create_async.assert_called_once_with( "sqlite+aiosqlite://", connect_args={"timeout": 30}, + pool_size=5, + pool_recycle=1800, + pool_pre_ping=True, future=True, )