Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions airflow-core/src/airflow/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -373,16 +373,20 @@ def create_async_metadata_engine(
sql_alchemy_conn_async: str,
*,
connect_args: dict[str, Any],
engine_args: dict[str, Any] | None = None,
) -> AsyncEngine:
"""
Create the async SQLAlchemy Engine for the Airflow metadata database.

Override in ``airflow_local_settings.py`` to customize async engine creation.
For ``do_connect`` handlers, register on ``engine.sync_engine``.

:param engine_args: Pool and engine configuration (pool_size, pool_recycle, etc.).
"""
return create_async_engine(
sql_alchemy_conn_async,
connect_args=connect_args,
**(engine_args or {}),
future=True,
)

Expand All @@ -402,9 +406,23 @@ def _configure_async_session() -> None:
AsyncSession = None
return

# Apply the same pool health settings used by the sync engine.
# Without these, the async pool uses SQLAlchemy defaults (pool_recycle=-1,
# pool_pre_ping=False) which means dead connections from PostgreSQL idle
# timeouts or pgbouncer disconnects are never detected.
engine_args: dict[str, Any] = {}
if not conf.getboolean("database", "SQL_ALCHEMY_POOL_ENABLED"):
engine_args["poolclass"] = NullPool
elif not SQL_ALCHEMY_CONN_ASYNC.startswith("sqlite"):
engine_args["pool_size"] = conf.getint("database", "SQL_ALCHEMY_POOL_SIZE", fallback=5)
engine_args["pool_recycle"] = conf.getint("database", "SQL_ALCHEMY_POOL_RECYCLE", fallback=1800)
engine_args["pool_pre_ping"] = conf.getboolean("database", "SQL_ALCHEMY_POOL_PRE_PING", fallback=True)
engine_args["max_overflow"] = conf.getint("database", "SQL_ALCHEMY_MAX_OVERFLOW", fallback=10)

async_engine = create_async_metadata_engine(
SQL_ALCHEMY_CONN_ASYNC,
connect_args=_get_connect_args("async"),
engine_args=engine_args,
)
AsyncSession = async_sessionmaker(
bind=async_engine,
Expand Down
65 changes: 62 additions & 3 deletions airflow-core/tests/unit/core/test_settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import pytest
from sqlalchemy.engine import Engine
from sqlalchemy.ext.asyncio import AsyncEngine
from sqlalchemy.pool import NullPool

from airflow import settings
from airflow.exceptions import AirflowClusterPolicyViolation, AirflowConfigException
Expand Down Expand Up @@ -262,18 +263,70 @@ def test_configure_orm_delegates_to_create_metadata_engine(self, mock_async_sess
def test_configure_async_session_delegates_to_create_async_metadata_engine(
self, mock_create_async_engine
):
"""_configure_async_session() must call create_async_metadata_engine."""
"""_configure_async_session() must call create_async_metadata_engine with no pool args for sqlite."""
from airflow import settings

mock_create_async_engine.return_value = MagicMock()

with patch("airflow.settings.SQL_ALCHEMY_CONN_ASYNC", "sqlite+aiosqlite://"):
with (
patch("airflow.settings.SQL_ALCHEMY_CONN_ASYNC", "sqlite+aiosqlite://"),
patch("airflow.settings.conf") as mock_conf,
):
# Pool enabled but sqlite -- pool args should be skipped
mock_conf.getboolean.return_value = True
settings._configure_async_session()

mock_create_async_engine.assert_called_once()
call_kwargs = mock_create_async_engine.call_args
assert call_kwargs[0][0] == "sqlite+aiosqlite://"
assert "connect_args" in call_kwargs[1]
# sqlite doesn't support pool size args
assert call_kwargs[1]["engine_args"] == {}

@patch("airflow.settings.create_async_metadata_engine")
def test_configure_async_session_passes_pool_args_for_non_sqlite(self, mock_create_async_engine):
"""_configure_async_session() must pass pool configuration for non-sqlite backends."""
from airflow import settings

mock_create_async_engine.return_value = MagicMock()

with (
patch("airflow.settings.SQL_ALCHEMY_CONN_ASYNC", "postgresql+asyncpg://localhost/airflow"),
patch("airflow.settings.conf") as mock_conf,
):
mock_conf.getint.side_effect = lambda section, key, fallback=None: {
"SQL_ALCHEMY_POOL_SIZE": 10,
"SQL_ALCHEMY_POOL_RECYCLE": 900,
"SQL_ALCHEMY_MAX_OVERFLOW": 5,
}.get(key, fallback)
mock_conf.getboolean.return_value = True

settings._configure_async_session()

engine_args = mock_create_async_engine.call_args[1]["engine_args"]
assert engine_args["pool_size"] == 10
assert engine_args["pool_recycle"] == 900
assert engine_args["pool_pre_ping"] is True
assert engine_args["max_overflow"] == 5

@patch("airflow.settings.create_async_metadata_engine")
def test_configure_async_session_uses_nullpool_when_pool_disabled(self, mock_create_async_engine):
"""_configure_async_session() must use NullPool when SQL_ALCHEMY_POOL_ENABLED is False."""
from airflow import settings

mock_create_async_engine.return_value = MagicMock()

with (
patch("airflow.settings.SQL_ALCHEMY_CONN_ASYNC", "postgresql+asyncpg://localhost/airflow"),
patch("airflow.settings.conf") as mock_conf,
):
mock_conf.getboolean.return_value = False

settings._configure_async_session()

engine_args = mock_create_async_engine.call_args[1]["engine_args"]
assert engine_args["poolclass"] is NullPool
assert "pool_size" not in engine_args

@patch("airflow.settings.create_async_metadata_engine")
def test_configure_async_session_skips_when_no_async_conn(self, mock_create_async_engine):
Expand Down Expand Up @@ -313,12 +366,18 @@ def test_default_create_async_metadata_engine_forwards_args(self, mock_sa_create

mock_sa_create_async.return_value = MagicMock()
connect_args = {"timeout": 30}
engine_args = {"pool_size": 5, "pool_recycle": 1800, "pool_pre_ping": True}

settings.create_async_metadata_engine("sqlite+aiosqlite://", connect_args=connect_args)
settings.create_async_metadata_engine(
"sqlite+aiosqlite://", connect_args=connect_args, engine_args=engine_args
)

mock_sa_create_async.assert_called_once_with(
"sqlite+aiosqlite://",
connect_args={"timeout": 30},
pool_size=5,
pool_recycle=1800,
pool_pre_ping=True,
future=True,
)

Expand Down
Loading