diff --git a/airflow-core/src/airflow/api_fastapi/auth/managers/base_auth_manager.py b/airflow-core/src/airflow/api_fastapi/auth/managers/base_auth_manager.py index a26a4413aadf3..9d139c883b399 100644 --- a/airflow-core/src/airflow/api_fastapi/auth/managers/base_auth_manager.py +++ b/airflow-core/src/airflow/api_fastapi/auth/managers/base_auth_manager.py @@ -158,6 +158,18 @@ async def get_user_from_token(self, token: str) -> BaseUser: log.error("Couldn't deserialize user from token, JWT token is not valid: %s", e) raise InvalidTokenError(str(e)) + def get_fastapi_middlewares(self) -> list[tuple[type, dict[str, Any]]]: + """ + Return middlewares the auth manager wants registered on the main FastAPI app. + + Each entry is a ``(middleware_class, kwargs)`` tuple and is registered via + ``app.add_middleware`` by the API server. Auth managers that need to intercept or + augment incoming requests (for example, the FAB auth manager attaching an + anonymous user to unauthenticated requests when ``[fab] auth_role_public`` is + configured) should override this method. + """ + return [] + def generate_jwt( self, user: T, *, expiration_time_in_seconds: int = conf.getint("api_auth", "jwt_expiration_time") ) -> str: diff --git a/airflow-core/src/airflow/api_fastapi/auth/managers/simple/simple_auth_manager.py b/airflow-core/src/airflow/api_fastapi/auth/managers/simple/simple_auth_manager.py index a40915d5da81c..e84e18c42e978 100644 --- a/airflow-core/src/airflow/api_fastapi/auth/managers/simple/simple_auth_manager.py +++ b/airflow-core/src/airflow/api_fastapi/auth/managers/simple/simple_auth_manager.py @@ -319,6 +319,14 @@ def is_authorized_hitl_task(self, *, assigned_users: set[str], user: SimpleAuthM # Delegate to parent class for the actual authorization check return super().is_authorized_hitl_task(assigned_users=assigned_users, user=user) + def get_fastapi_middlewares(self) -> list[tuple[type, dict[str, Any]]]: + """Register the all-admins middleware when ``[core] simple_auth_manager_all_admins`` is set.""" + if not conf.getboolean("core", "simple_auth_manager_all_admins"): + return [] + from airflow.api_fastapi.auth.managers.simple.middleware import SimpleAllAdminMiddleware + + return [(SimpleAllAdminMiddleware, {})] + def get_fastapi_app(self) -> FastAPI | None: """ Specify a sub FastAPI application specific to the auth manager. diff --git a/airflow-core/src/airflow/api_fastapi/core_api/app.py b/airflow-core/src/airflow/api_fastapi/core_api/app.py index 16719e3f6d5bd..c36bb6c978ffc 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/app.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/app.py @@ -29,7 +29,6 @@ from fastapi.templating import Jinja2Templates from airflow.api_fastapi.auth.tokens import get_signing_key -from airflow.configuration import conf from airflow.exceptions import AirflowException log = logging.getLogger(__name__) @@ -165,14 +164,14 @@ def init_error_handlers(app: FastAPI) -> None: def init_middlewares(app: FastAPI) -> None: + from airflow.api_fastapi.app import get_auth_manager from airflow.api_fastapi.auth.middlewares.refresh_token import JWTRefreshMiddleware from airflow.api_fastapi.common.http_access_log import HttpAccessLogMiddleware app.add_middleware(JWTRefreshMiddleware) - if conf.getboolean("core", "simple_auth_manager_all_admins"): - from airflow.api_fastapi.auth.managers.simple.middleware import SimpleAllAdminMiddleware - app.add_middleware(SimpleAllAdminMiddleware) + for middleware_cls, middleware_kwargs in get_auth_manager().get_fastapi_middlewares(): + app.add_middleware(middleware_cls, **middleware_kwargs) # GZipMiddleware must be inside HttpAccessLogMiddleware so that access logs capture # the full end-to-end duration including compression time. diff --git a/airflow-core/tests/unit/api_fastapi/auth/managers/simple/test_simple_auth_manager.py b/airflow-core/tests/unit/api_fastapi/auth/managers/simple/test_simple_auth_manager.py index 1e9f195c8b715..b11fec3e508d2 100644 --- a/airflow-core/tests/unit/api_fastapi/auth/managers/simple/test_simple_auth_manager.py +++ b/airflow-core/tests/unit/api_fastapi/auth/managers/simple/test_simple_auth_manager.py @@ -367,3 +367,13 @@ def test_is_authorized_hitl_task(self, auth_manager, all_admins, user_id, assign def test_get_teams(self, auth_manager): teams = auth_manager._get_teams() assert teams == {"test", "marketing"} + + @conf_vars({("core", "simple_auth_manager_all_admins"): "false"}) + def test_get_fastapi_middlewares_disabled(self, auth_manager): + assert auth_manager.get_fastapi_middlewares() == [] + + @conf_vars({("core", "simple_auth_manager_all_admins"): "true"}) + def test_get_fastapi_middlewares_enabled(self, auth_manager): + from airflow.api_fastapi.auth.managers.simple.middleware import SimpleAllAdminMiddleware + + assert auth_manager.get_fastapi_middlewares() == [(SimpleAllAdminMiddleware, {})] diff --git a/airflow-core/tests/unit/api_fastapi/core_api/test_security.py b/airflow-core/tests/unit/api_fastapi/core_api/test_security.py index e5cb5f17cf98e..0d05a0fdc78c3 100644 --- a/airflow-core/tests/unit/api_fastapi/core_api/test_security.py +++ b/airflow-core/tests/unit/api_fastapi/core_api/test_security.py @@ -110,6 +110,11 @@ async def test_get_user_expired_token(self, mock_get_auth_manager): auth_manager.get_user_from_token.assert_called_once_with(token_str) + async def test_resolve_user_from_token_no_token_raises(self): + """No token always produces 401; public-user fallback is handled by middleware, not here.""" + with pytest.raises(HTTPException, match="Not authenticated"): + await resolve_user_from_token(None) + @patch("airflow.api_fastapi.core_api.security.resolve_user_from_token") async def test_get_user_with_request_state(self, mock_resolve_user_from_token): user = Mock() diff --git a/providers/fab/provider.yaml b/providers/fab/provider.yaml index 5a246916abfdc..61f6ad65c360a 100644 --- a/providers/fab/provider.yaml +++ b/providers/fab/provider.yaml @@ -264,6 +264,22 @@ config: type: integer example: ~ default: "30" + auth_role_public: + description: | + Role that Anonymous (unauthenticated) users are granted. When set, the FAB auth manager + will allow access to the API server and UI without requiring a login, and anonymous + requests will be treated as members of the given role. Leave empty (the default) to + require authentication. + + This replaces the previous ``AUTH_ROLE_PUBLIC`` setting in ``webserver_config.py``. When + both are set, this ``[fab] auth_role_public`` config takes precedence. Setting this + config also applies the equivalent ``AUTH_ROLE_PUBLIC`` to the Flask app used by the FAB + auth manager, so all FAB auth code paths (FastAPI-based API server and legacy Flask + views) honor it consistently. + version_added: 3.6.2 + type: string + example: "Admin" + default: "" auth-managers: - airflow.providers.fab.auth_manager.fab_auth_manager.FabAuthManager diff --git a/providers/fab/src/airflow/providers/fab/auth_manager/fab_auth_manager.py b/providers/fab/src/airflow/providers/fab/auth_manager/fab_auth_manager.py index 61bbf142dc2e2..c9fbf16281aa3 100644 --- a/providers/fab/src/airflow/providers/fab/auth_manager/fab_auth_manager.py +++ b/providers/fab/src/airflow/providers/fab/auth_manager/fab_auth_manager.py @@ -300,11 +300,57 @@ def is_logged_in(self) -> bool: """Return whether the user is logged in.""" user = self.get_user() return bool( - self.appbuilder - and self.appbuilder.app.config.get("AUTH_ROLE_PUBLIC", None) - or (not user.is_anonymous and user.is_active) + (self.appbuilder and self._get_auth_role_public()) or (not user.is_anonymous and user.is_active) ) + def _get_auth_role_public(self) -> str | None: + """ + Return the role granted to anonymous users, or ``None`` if public access is disabled. + + ``providers/fab/www/app.py`` copies ``[fab] auth_role_public`` from the Airflow config + into ``AUTH_ROLE_PUBLIC`` on the Flask app when the app is created, so reading the + Flask config here covers both the new config key and the legacy + ``AUTH_ROLE_PUBLIC`` entry in ``webserver_config.py``. + """ + if self.appbuilder is not None: + return self.appbuilder.app.config.get("AUTH_ROLE_PUBLIC", None) + return None + + @provide_session + def build_public_user(self, *, session: Session = NEW_SESSION) -> AnonymousUser | None: + """ + Build an :class:`AnonymousUser` pre-populated with the configured public role. + + Returns ``None`` when neither ``[fab] auth_role_public`` nor the legacy + ``AUTH_ROLE_PUBLIC`` entry in ``webserver_config.py`` is set. + + The role and its permissions are resolved via the database so the caller does not + need a Flask app/request context to use the returned user (see #60897). + + :meta private: + """ + public_role_name = self._get_auth_role_public() + if not public_role_name: + return None + + user = AnonymousUser() + role = session.scalar(select(Role).where(Role.name == public_role_name)) + if role is not None: + # ``AnonymousUser.roles``/``perms`` normally resolve lazily through Flask's + # ``current_app``. Writing ``_roles``/``_perms`` directly freezes a snapshot for + # the lifetime of a single FastAPI authorization check. + user._roles = {role} + user._perms = {(perm.action.name, perm.resource.name) for perm in role.permissions} + return user + + def get_fastapi_middlewares(self) -> list[tuple[type, dict[str, Any]]]: + """Register the FAB public-access middleware when public access is configured.""" + if not self._get_auth_role_public(): + return [] + from airflow.providers.fab.auth_manager.middleware import FabAuthRolePublicMiddleware + + return [(FabAuthRolePublicMiddleware, {})] + def create_token(self, headers: dict[str, str], body: dict[str, Any]) -> User | None: """ Create a new token from a payload. diff --git a/providers/fab/src/airflow/providers/fab/auth_manager/middleware.py b/providers/fab/src/airflow/providers/fab/auth_manager/middleware.py new file mode 100644 index 0000000000000..8a160453f1f16 --- /dev/null +++ b/providers/fab/src/airflow/providers/fab/auth_manager/middleware.py @@ -0,0 +1,54 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations + +from typing import TYPE_CHECKING + +from starlette.middleware.base import BaseHTTPMiddleware + +from airflow.api_fastapi.app import get_auth_manager +from airflow.api_fastapi.auth.managers.base_auth_manager import COOKIE_NAME_JWT_TOKEN + +if TYPE_CHECKING: + from fastapi import Request + + +class FabAuthRolePublicMiddleware(BaseHTTPMiddleware): + """ + Attach an anonymous user to unauthenticated requests when public access is enabled. + + When ``[fab] auth_role_public`` (or the legacy ``AUTH_ROLE_PUBLIC`` entry in + ``webserver_config.py``) is set, requests that do not carry any authentication + token get a pre-populated :class:`AnonymousUser` assigned to + ``request.state.user``. The FastAPI ``get_user`` dependency picks it up before + attempting JWT validation, which would otherwise reject the request with 401. + """ + + async def dispatch(self, request: Request, call_next): + if not self._has_auth_token(request): + public_user = get_auth_manager().build_public_user() + if public_user is not None: + request.state.user = public_user + return await call_next(request) + + @staticmethod + def _has_auth_token(request: Request) -> bool: + auth_header = request.headers.get("authorization") + if auth_header: + return True + return bool(request.cookies.get(COOKIE_NAME_JWT_TOKEN)) diff --git a/providers/fab/src/airflow/providers/fab/www/app.py b/providers/fab/src/airflow/providers/fab/www/app.py index 765d942ccc7a4..28bdae3418983 100644 --- a/providers/fab/src/airflow/providers/fab/www/app.py +++ b/providers/fab/src/airflow/providers/fab/www/app.py @@ -84,6 +84,15 @@ def remove_duplicate_date_header(response): with flask_app.app_context(): flask_app.config.from_pyfile(webserver_config, silent=True) + # Bridge ``[fab] auth_role_public`` Airflow config into the Flask app config so legacy FAB + # code paths that read ``AUTH_ROLE_PUBLIC`` from ``current_app.config`` (e.g. + # ``AnonymousUser.roles``, basic_auth, kerberos_auth, security manager) stay in sync with + # the FastAPI-based auth flow. The Airflow config takes precedence over + # ``webserver_config.py`` when both are set so there is a single source of truth. + auth_role_public_conf = conf.get("fab", "auth_role_public", fallback="") or "" + if auth_role_public_conf: + flask_app.config["AUTH_ROLE_PUBLIC"] = auth_role_public_conf + url = make_url(flask_app.config["SQLALCHEMY_DATABASE_URI"]) if url.drivername == "sqlite" and url.database and not isabs(url.database): raise AirflowConfigException( diff --git a/providers/fab/tests/unit/fab/auth_manager/test_fab_auth_manager.py b/providers/fab/tests/unit/fab/auth_manager/test_fab_auth_manager.py index 99f7dc24edc23..6b253207079a7 100644 --- a/providers/fab/tests/unit/fab/auth_manager/test_fab_auth_manager.py +++ b/providers/fab/tests/unit/fab/auth_manager/test_fab_auth_manager.py @@ -262,6 +262,89 @@ def test_is_logged_in_with_inactive_user(self, mock_get_user, auth_manager_with_ assert auth_manager_with_appbuilder.is_logged_in() is False + @mock.patch.object(FabAuthManager, "get_user") + def test_is_logged_in_with_auth_role_public(self, mock_get_user, flask_app, auth_manager_with_appbuilder): + """When ``AUTH_ROLE_PUBLIC`` is set on the Flask app, anonymous users are 'logged in'.""" + user = Mock() + user.is_anonymous.return_value = True + user.is_active.return_value = False + mock_get_user.return_value = user + + previous = flask_app.config.get("AUTH_ROLE_PUBLIC") + flask_app.config["AUTH_ROLE_PUBLIC"] = "Admin" + try: + assert auth_manager_with_appbuilder.is_logged_in() is True + finally: + flask_app.config["AUTH_ROLE_PUBLIC"] = previous + + def test_build_public_user_returns_none_when_not_configured( + self, flask_app, auth_manager_with_appbuilder + ): + """Without ``AUTH_ROLE_PUBLIC`` set on the Flask app, there is no public user.""" + previous = flask_app.config.get("AUTH_ROLE_PUBLIC") + flask_app.config["AUTH_ROLE_PUBLIC"] = None + try: + assert auth_manager_with_appbuilder.build_public_user() is None + finally: + flask_app.config["AUTH_ROLE_PUBLIC"] = previous + + def test_build_public_user_returns_anonymous_user(self, flask_app, auth_manager_with_appbuilder): + """``AUTH_ROLE_PUBLIC`` yields an :class:`AnonymousUser` with the role resolved from the DB.""" + from airflow.providers.fab.auth_manager.models.anonymous_user import AnonymousUser + + previous = flask_app.config.get("AUTH_ROLE_PUBLIC") + flask_app.config["AUTH_ROLE_PUBLIC"] = "Admin" + try: + user = auth_manager_with_appbuilder.build_public_user() + finally: + flask_app.config["AUTH_ROLE_PUBLIC"] = previous + + assert isinstance(user, AnonymousUser) + assert len(user.roles) == 1 + assert user.roles[0].name == "Admin" + # ``perms`` must be pre-populated so FastAPI can evaluate authorization outside a + # Flask request/app context. + assert user._perms, "Expected permissions to be pre-populated on the public user" + + def test_build_public_user_with_unknown_role_returns_user_with_empty_perms( + self, flask_app, auth_manager_with_appbuilder + ): + """A misconfigured role name still yields an :class:`AnonymousUser` with empty perms.""" + from airflow.providers.fab.auth_manager.models.anonymous_user import AnonymousUser + + previous = flask_app.config.get("AUTH_ROLE_PUBLIC") + flask_app.config["AUTH_ROLE_PUBLIC"] = "DoesNotExist" + try: + user = auth_manager_with_appbuilder.build_public_user() + finally: + flask_app.config["AUTH_ROLE_PUBLIC"] = previous + + assert isinstance(user, AnonymousUser) + # Role not found in the DB, so no pre-populated perms. + assert user._perms == set() + + def test_get_fastapi_middlewares_disabled(self, flask_app, auth_manager_with_appbuilder): + """No middleware is registered when public access is not configured.""" + previous = flask_app.config.get("AUTH_ROLE_PUBLIC") + flask_app.config["AUTH_ROLE_PUBLIC"] = None + try: + assert auth_manager_with_appbuilder.get_fastapi_middlewares() == [] + finally: + flask_app.config["AUTH_ROLE_PUBLIC"] = previous + + def test_get_fastapi_middlewares_enabled(self, flask_app, auth_manager_with_appbuilder): + """``FabAuthRolePublicMiddleware`` is registered when public access is configured.""" + from airflow.providers.fab.auth_manager.middleware import FabAuthRolePublicMiddleware + + previous = flask_app.config.get("AUTH_ROLE_PUBLIC") + flask_app.config["AUTH_ROLE_PUBLIC"] = "Admin" + try: + middlewares = auth_manager_with_appbuilder.get_fastapi_middlewares() + finally: + flask_app.config["AUTH_ROLE_PUBLIC"] = previous + + assert middlewares == [(FabAuthRolePublicMiddleware, {})] + @pytest.mark.parametrize( ("auth_type", "method"), [ diff --git a/providers/fab/tests/unit/fab/auth_manager/test_middleware.py b/providers/fab/tests/unit/fab/auth_manager/test_middleware.py new file mode 100644 index 0000000000000..e4117a0fe9198 --- /dev/null +++ b/providers/fab/tests/unit/fab/auth_manager/test_middleware.py @@ -0,0 +1,95 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from unittest.mock import AsyncMock, Mock, patch + +import pytest + +from airflow.api_fastapi.auth.managers.base_auth_manager import COOKIE_NAME_JWT_TOKEN +from airflow.providers.fab.auth_manager.middleware import FabAuthRolePublicMiddleware + + +@pytest.mark.asyncio +class TestFabAuthRolePublicMiddleware: + def _make_request(self, headers: dict | None = None, cookies: dict | None = None) -> Mock: + request = Mock() + request.headers = headers or {} + request.cookies = cookies or {} + request.state = Mock(spec=[]) + return request + + @patch("airflow.providers.fab.auth_manager.middleware.get_auth_manager") + async def test_sets_public_user_when_no_auth_present(self, mock_get_auth_manager): + public_user = Mock(name="public_user") + auth_manager = Mock() + auth_manager.build_public_user = Mock(return_value=public_user) + mock_get_auth_manager.return_value = auth_manager + + middleware = FabAuthRolePublicMiddleware(app=Mock()) + request = self._make_request() + call_next = AsyncMock(return_value=Mock(name="response")) + + await middleware.dispatch(request, call_next) + + assert request.state.user is public_user + auth_manager.build_public_user.assert_called_once_with() + call_next.assert_awaited_once_with(request) + + @patch("airflow.providers.fab.auth_manager.middleware.get_auth_manager") + async def test_skips_when_authorization_header_present(self, mock_get_auth_manager): + auth_manager = Mock() + auth_manager.build_public_user = Mock() + mock_get_auth_manager.return_value = auth_manager + + middleware = FabAuthRolePublicMiddleware(app=Mock()) + request = self._make_request(headers={"authorization": "Bearer real-token"}) + call_next = AsyncMock(return_value=Mock(name="response")) + + await middleware.dispatch(request, call_next) + + auth_manager.build_public_user.assert_not_called() + assert not hasattr(request.state, "user") + + @patch("airflow.providers.fab.auth_manager.middleware.get_auth_manager") + async def test_skips_when_jwt_cookie_present(self, mock_get_auth_manager): + auth_manager = Mock() + auth_manager.build_public_user = Mock() + mock_get_auth_manager.return_value = auth_manager + + middleware = FabAuthRolePublicMiddleware(app=Mock()) + request = self._make_request(cookies={COOKIE_NAME_JWT_TOKEN: "cookie-token"}) + call_next = AsyncMock(return_value=Mock(name="response")) + + await middleware.dispatch(request, call_next) + + auth_manager.build_public_user.assert_not_called() + + @patch("airflow.providers.fab.auth_manager.middleware.get_auth_manager") + async def test_does_nothing_when_auth_manager_has_no_public_user(self, mock_get_auth_manager): + auth_manager = Mock() + auth_manager.build_public_user = Mock(return_value=None) + mock_get_auth_manager.return_value = auth_manager + + middleware = FabAuthRolePublicMiddleware(app=Mock()) + request = self._make_request() + call_next = AsyncMock(return_value=Mock(name="response")) + + await middleware.dispatch(request, call_next) + + assert not hasattr(request.state, "user") + call_next.assert_awaited_once_with(request)