Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,18 @@ async def get_user_from_token(self, token: str) -> BaseUser:
log.error("Couldn't deserialize user from token, JWT token is not valid: %s", e)
raise InvalidTokenError(str(e))

def get_fastapi_middlewares(self) -> list[tuple[type, dict[str, Any]]]:
"""
Return middlewares the auth manager wants registered on the main FastAPI app.

Each entry is a ``(middleware_class, kwargs)`` tuple and is registered via
``app.add_middleware`` by the API server. Auth managers that need to intercept or
augment incoming requests (for example, the FAB auth manager attaching an
anonymous user to unauthenticated requests when ``[fab] auth_role_public`` is
configured) should override this method.
"""
return []

def generate_jwt(
self, user: T, *, expiration_time_in_seconds: int = conf.getint("api_auth", "jwt_expiration_time")
) -> str:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,14 @@ def is_authorized_hitl_task(self, *, assigned_users: set[str], user: SimpleAuthM
# Delegate to parent class for the actual authorization check
return super().is_authorized_hitl_task(assigned_users=assigned_users, user=user)

def get_fastapi_middlewares(self) -> list[tuple[type, dict[str, Any]]]:
"""Register the all-admins middleware when ``[core] simple_auth_manager_all_admins`` is set."""
if not conf.getboolean("core", "simple_auth_manager_all_admins"):
return []
from airflow.api_fastapi.auth.managers.simple.middleware import SimpleAllAdminMiddleware

return [(SimpleAllAdminMiddleware, {})]

def get_fastapi_app(self) -> FastAPI | None:
"""
Specify a sub FastAPI application specific to the auth manager.
Expand Down
7 changes: 3 additions & 4 deletions airflow-core/src/airflow/api_fastapi/core_api/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
from fastapi.templating import Jinja2Templates

from airflow.api_fastapi.auth.tokens import get_signing_key
from airflow.configuration import conf
from airflow.exceptions import AirflowException

log = logging.getLogger(__name__)
Expand Down Expand Up @@ -165,14 +164,14 @@ def init_error_handlers(app: FastAPI) -> None:


def init_middlewares(app: FastAPI) -> None:
from airflow.api_fastapi.app import get_auth_manager
from airflow.api_fastapi.auth.middlewares.refresh_token import JWTRefreshMiddleware
from airflow.api_fastapi.common.http_access_log import HttpAccessLogMiddleware

app.add_middleware(JWTRefreshMiddleware)
if conf.getboolean("core", "simple_auth_manager_all_admins"):
from airflow.api_fastapi.auth.managers.simple.middleware import SimpleAllAdminMiddleware

app.add_middleware(SimpleAllAdminMiddleware)
for middleware_cls, middleware_kwargs in get_auth_manager().get_fastapi_middlewares():
app.add_middleware(middleware_cls, **middleware_kwargs)

# GZipMiddleware must be inside HttpAccessLogMiddleware so that access logs capture
# the full end-to-end duration including compression time.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -367,3 +367,13 @@ def test_is_authorized_hitl_task(self, auth_manager, all_admins, user_id, assign
def test_get_teams(self, auth_manager):
teams = auth_manager._get_teams()
assert teams == {"test", "marketing"}

@conf_vars({("core", "simple_auth_manager_all_admins"): "false"})
def test_get_fastapi_middlewares_disabled(self, auth_manager):
assert auth_manager.get_fastapi_middlewares() == []

@conf_vars({("core", "simple_auth_manager_all_admins"): "true"})
def test_get_fastapi_middlewares_enabled(self, auth_manager):
from airflow.api_fastapi.auth.managers.simple.middleware import SimpleAllAdminMiddleware

assert auth_manager.get_fastapi_middlewares() == [(SimpleAllAdminMiddleware, {})]
5 changes: 5 additions & 0 deletions airflow-core/tests/unit/api_fastapi/core_api/test_security.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,11 @@ async def test_get_user_expired_token(self, mock_get_auth_manager):

auth_manager.get_user_from_token.assert_called_once_with(token_str)

async def test_resolve_user_from_token_no_token_raises(self):
"""No token always produces 401; public-user fallback is handled by middleware, not here."""
with pytest.raises(HTTPException, match="Not authenticated"):
await resolve_user_from_token(None)

@patch("airflow.api_fastapi.core_api.security.resolve_user_from_token")
async def test_get_user_with_request_state(self, mock_resolve_user_from_token):
user = Mock()
Expand Down
16 changes: 16 additions & 0 deletions providers/fab/provider.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,22 @@ config:
type: integer
example: ~
default: "30"
auth_role_public:
description: |
Role that Anonymous (unauthenticated) users are granted. When set, the FAB auth manager
will allow access to the API server and UI without requiring a login, and anonymous
requests will be treated as members of the given role. Leave empty (the default) to
require authentication.

This replaces the previous ``AUTH_ROLE_PUBLIC`` setting in ``webserver_config.py``. When
both are set, this ``[fab] auth_role_public`` config takes precedence. Setting this
config also applies the equivalent ``AUTH_ROLE_PUBLIC`` to the Flask app used by the FAB
auth manager, so all FAB auth code paths (FastAPI-based API server and legacy Flask
views) honor it consistently.
version_added: 3.6.2
type: string
example: "Admin"
default: ""

auth-managers:
- airflow.providers.fab.auth_manager.fab_auth_manager.FabAuthManager
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -300,11 +300,57 @@ def is_logged_in(self) -> bool:
"""Return whether the user is logged in."""
user = self.get_user()
return bool(
self.appbuilder
and self.appbuilder.app.config.get("AUTH_ROLE_PUBLIC", None)
or (not user.is_anonymous and user.is_active)
(self.appbuilder and self._get_auth_role_public()) or (not user.is_anonymous and user.is_active)
)

def _get_auth_role_public(self) -> str | None:
"""
Return the role granted to anonymous users, or ``None`` if public access is disabled.

``providers/fab/www/app.py`` copies ``[fab] auth_role_public`` from the Airflow config
into ``AUTH_ROLE_PUBLIC`` on the Flask app when the app is created, so reading the
Flask config here covers both the new config key and the legacy
``AUTH_ROLE_PUBLIC`` entry in ``webserver_config.py``.
"""
if self.appbuilder is not None:
return self.appbuilder.app.config.get("AUTH_ROLE_PUBLIC", None)
return None

@provide_session
def build_public_user(self, *, session: Session = NEW_SESSION) -> AnonymousUser | None:
"""
Build an :class:`AnonymousUser` pre-populated with the configured public role.

Returns ``None`` when neither ``[fab] auth_role_public`` nor the legacy
``AUTH_ROLE_PUBLIC`` entry in ``webserver_config.py`` is set.

The role and its permissions are resolved via the database so the caller does not
need a Flask app/request context to use the returned user (see #60897).

:meta private:
"""
public_role_name = self._get_auth_role_public()
if not public_role_name:
return None

user = AnonymousUser()
role = session.scalar(select(Role).where(Role.name == public_role_name))
if role is not None:
# ``AnonymousUser.roles``/``perms`` normally resolve lazily through Flask's
# ``current_app``. Writing ``_roles``/``_perms`` directly freezes a snapshot for
# the lifetime of a single FastAPI authorization check.
user._roles = {role}
user._perms = {(perm.action.name, perm.resource.name) for perm in role.permissions}
return user

def get_fastapi_middlewares(self) -> list[tuple[type, dict[str, Any]]]:
"""Register the FAB public-access middleware when public access is configured."""
if not self._get_auth_role_public():
return []
from airflow.providers.fab.auth_manager.middleware import FabAuthRolePublicMiddleware

return [(FabAuthRolePublicMiddleware, {})]

def create_token(self, headers: dict[str, str], body: dict[str, Any]) -> User | None:
"""
Create a new token from a payload.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from __future__ import annotations

from typing import TYPE_CHECKING

from starlette.middleware.base import BaseHTTPMiddleware

from airflow.api_fastapi.app import get_auth_manager
from airflow.api_fastapi.auth.managers.base_auth_manager import COOKIE_NAME_JWT_TOKEN

if TYPE_CHECKING:
from fastapi import Request


class FabAuthRolePublicMiddleware(BaseHTTPMiddleware):
"""
Attach an anonymous user to unauthenticated requests when public access is enabled.

When ``[fab] auth_role_public`` (or the legacy ``AUTH_ROLE_PUBLIC`` entry in
``webserver_config.py``) is set, requests that do not carry any authentication
token get a pre-populated :class:`AnonymousUser` assigned to
``request.state.user``. The FastAPI ``get_user`` dependency picks it up before
attempting JWT validation, which would otherwise reject the request with 401.
"""

async def dispatch(self, request: Request, call_next):
if not self._has_auth_token(request):
public_user = get_auth_manager().build_public_user()
if public_user is not None:
request.state.user = public_user
return await call_next(request)

@staticmethod
def _has_auth_token(request: Request) -> bool:
auth_header = request.headers.get("authorization")
if auth_header:
return True
return bool(request.cookies.get(COOKIE_NAME_JWT_TOKEN))
9 changes: 9 additions & 0 deletions providers/fab/src/airflow/providers/fab/www/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,15 @@ def remove_duplicate_date_header(response):
with flask_app.app_context():
flask_app.config.from_pyfile(webserver_config, silent=True)

# Bridge ``[fab] auth_role_public`` Airflow config into the Flask app config so legacy FAB
# code paths that read ``AUTH_ROLE_PUBLIC`` from ``current_app.config`` (e.g.
# ``AnonymousUser.roles``, basic_auth, kerberos_auth, security manager) stay in sync with
# the FastAPI-based auth flow. The Airflow config takes precedence over
# ``webserver_config.py`` when both are set so there is a single source of truth.
auth_role_public_conf = conf.get("fab", "auth_role_public", fallback="") or ""
if auth_role_public_conf:
flask_app.config["AUTH_ROLE_PUBLIC"] = auth_role_public_conf
Comment on lines +92 to +94
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice, since we are doing that, there is no need to check for conf.get("fab", "auth_role_public") in other places?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes — cleaned up. The only other caller was _get_auth_role_public in the FAB auth manager, and it now reads appbuilder.app.config["AUTH_ROLE_PUBLIC"] directly. The www/app.py bridge is the single place that reads the Airflow config, and everything else downstream reads the Flask config. There were no other conf.get("fab", "auth_role_public") call sites.


url = make_url(flask_app.config["SQLALCHEMY_DATABASE_URI"])
if url.drivername == "sqlite" and url.database and not isabs(url.database):
raise AirflowConfigException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,89 @@ def test_is_logged_in_with_inactive_user(self, mock_get_user, auth_manager_with_

assert auth_manager_with_appbuilder.is_logged_in() is False

@mock.patch.object(FabAuthManager, "get_user")
def test_is_logged_in_with_auth_role_public(self, mock_get_user, flask_app, auth_manager_with_appbuilder):
"""When ``AUTH_ROLE_PUBLIC`` is set on the Flask app, anonymous users are 'logged in'."""
user = Mock()
user.is_anonymous.return_value = True
user.is_active.return_value = False
mock_get_user.return_value = user

previous = flask_app.config.get("AUTH_ROLE_PUBLIC")
flask_app.config["AUTH_ROLE_PUBLIC"] = "Admin"
try:
assert auth_manager_with_appbuilder.is_logged_in() is True
finally:
flask_app.config["AUTH_ROLE_PUBLIC"] = previous

def test_build_public_user_returns_none_when_not_configured(
self, flask_app, auth_manager_with_appbuilder
):
"""Without ``AUTH_ROLE_PUBLIC`` set on the Flask app, there is no public user."""
previous = flask_app.config.get("AUTH_ROLE_PUBLIC")
flask_app.config["AUTH_ROLE_PUBLIC"] = None
try:
assert auth_manager_with_appbuilder.build_public_user() is None
finally:
flask_app.config["AUTH_ROLE_PUBLIC"] = previous

def test_build_public_user_returns_anonymous_user(self, flask_app, auth_manager_with_appbuilder):
"""``AUTH_ROLE_PUBLIC`` yields an :class:`AnonymousUser` with the role resolved from the DB."""
from airflow.providers.fab.auth_manager.models.anonymous_user import AnonymousUser

previous = flask_app.config.get("AUTH_ROLE_PUBLIC")
flask_app.config["AUTH_ROLE_PUBLIC"] = "Admin"
try:
user = auth_manager_with_appbuilder.build_public_user()
finally:
flask_app.config["AUTH_ROLE_PUBLIC"] = previous

assert isinstance(user, AnonymousUser)
assert len(user.roles) == 1
assert user.roles[0].name == "Admin"
# ``perms`` must be pre-populated so FastAPI can evaluate authorization outside a
# Flask request/app context.
assert user._perms, "Expected permissions to be pre-populated on the public user"

def test_build_public_user_with_unknown_role_returns_user_with_empty_perms(
self, flask_app, auth_manager_with_appbuilder
):
"""A misconfigured role name still yields an :class:`AnonymousUser` with empty perms."""
from airflow.providers.fab.auth_manager.models.anonymous_user import AnonymousUser

previous = flask_app.config.get("AUTH_ROLE_PUBLIC")
flask_app.config["AUTH_ROLE_PUBLIC"] = "DoesNotExist"
try:
user = auth_manager_with_appbuilder.build_public_user()
finally:
flask_app.config["AUTH_ROLE_PUBLIC"] = previous

assert isinstance(user, AnonymousUser)
# Role not found in the DB, so no pre-populated perms.
assert user._perms == set()

def test_get_fastapi_middlewares_disabled(self, flask_app, auth_manager_with_appbuilder):
"""No middleware is registered when public access is not configured."""
previous = flask_app.config.get("AUTH_ROLE_PUBLIC")
flask_app.config["AUTH_ROLE_PUBLIC"] = None
try:
assert auth_manager_with_appbuilder.get_fastapi_middlewares() == []
finally:
flask_app.config["AUTH_ROLE_PUBLIC"] = previous

def test_get_fastapi_middlewares_enabled(self, flask_app, auth_manager_with_appbuilder):
"""``FabAuthRolePublicMiddleware`` is registered when public access is configured."""
from airflow.providers.fab.auth_manager.middleware import FabAuthRolePublicMiddleware

previous = flask_app.config.get("AUTH_ROLE_PUBLIC")
flask_app.config["AUTH_ROLE_PUBLIC"] = "Admin"
try:
middlewares = auth_manager_with_appbuilder.get_fastapi_middlewares()
finally:
flask_app.config["AUTH_ROLE_PUBLIC"] = previous

assert middlewares == [(FabAuthRolePublicMiddleware, {})]

@pytest.mark.parametrize(
("auth_type", "method"),
[
Expand Down
Loading