diff --git a/airflow-core/docs/core-concepts/auth-manager/index.rst b/airflow-core/docs/core-concepts/auth-manager/index.rst index 71db38330303a..2e787c9e7fb6b 100644 --- a/airflow-core/docs/core-concepts/auth-manager/index.rst +++ b/airflow-core/docs/core-concepts/auth-manager/index.rst @@ -136,7 +136,6 @@ These authorization methods are: Also, ``is_authorized_dag`` is called for any entity related to Dags (e.g. task instances, Dag runs, ...). This information is passed in ``access_entity``. Example: ``auth_manager.is_authorized_dag(method="GET", access_entity=DagAccessEntity.Run, details=DagDetails(id="dag-1"))`` asks whether the user has permission to read the Dag runs of the Dag "dag-1". -* ``is_authorized_backfill``: Return whether the user is authorized to access Airflow backfills. Some details about the backfill can be provided (e.g. the backfill ID). * ``is_authorized_asset``: Return whether the user is authorized to access Airflow assets. Some details about the asset can be provided (e.g. the asset ID). * ``is_authorized_asset_alias``: Return whether the user is authorized to access Airflow asset aliases. Some details about the asset alias can be provided (e.g. the asset alias ID). * ``is_authorized_pool``: Return whether the user is authorized to access Airflow pools. Some details about the pool can be provided (e.g. the pool name). diff --git a/airflow-core/newsfragments/61400.significant.rst b/airflow-core/newsfragments/61400.significant.rst new file mode 100644 index 0000000000000..7bdee4390a7ea --- /dev/null +++ b/airflow-core/newsfragments/61400.significant.rst @@ -0,0 +1,20 @@ +AuthManager Backfill permissions are now handled by the ``requires_access_dag`` on the ``DagAccessEntity.Run`` + +``is_authorized_backfill`` of the ``BaseAuthManager`` interface has been removed. Core will no longer call this method and their +provider counterpart implementation will be marked as deprecated. +Permissions for backfill operations are now checked against the ``DagAccessEntity.Run`` permission using the existing +``requires_access_dag`` decorator. In other words, if a user has permission to run a DAG, they can perform backfill operations on it. + +Please update your security policies to ensure that users who need to perform backfill operations have the appropriate ``DagAccessEntity.Run`` permissions. (Users +having the Backfill permissions without having the DagRun ones will no longer be able to perform backfill operations without any update) + +* Types of change + + * [ ] Dag changes + * [ ] Config changes + * [x] API changes + * [ ] CLI changes + * [x] Behaviour changes + * [ ] Plugin changes + * [ ] Dependency changes + * [ ] Code interface changes diff --git a/airflow-core/src/airflow/api_fastapi/auth/managers/base_auth_manager.py b/airflow-core/src/airflow/api_fastapi/auth/managers/base_auth_manager.py index 5e98dc86fb087..12b975cd28afd 100644 --- a/airflow-core/src/airflow/api_fastapi/auth/managers/base_auth_manager.py +++ b/airflow-core/src/airflow/api_fastapi/auth/managers/base_auth_manager.py @@ -29,7 +29,6 @@ from airflow.api_fastapi.auth.managers.models.base_user import BaseUser from airflow.api_fastapi.auth.managers.models.resource_details import ( - BackfillDetails, ConnectionDetails, DagDetails, PoolDetails, @@ -230,22 +229,6 @@ def is_authorized_dag( :param details: optional details about the DAG """ - @abstractmethod - def is_authorized_backfill( - self, - *, - method: ResourceMethod, - user: T, - details: BackfillDetails | None = None, - ) -> bool: - """ - Return whether the user is authorized to perform a given action on a backfill. - - :param method: the method to perform - :param user: the user to performing the action - :param details: optional details about the backfill - """ - @abstractmethod def is_authorized_asset( self, diff --git a/airflow-core/src/airflow/api_fastapi/auth/managers/models/resource_details.py b/airflow-core/src/airflow/api_fastapi/auth/managers/models/resource_details.py index 6a0041570a159..4ff380d2aedc5 100644 --- a/airflow-core/src/airflow/api_fastapi/auth/managers/models/resource_details.py +++ b/airflow-core/src/airflow/api_fastapi/auth/managers/models/resource_details.py @@ -48,7 +48,12 @@ class DagDetails: @dataclass class BackfillDetails: - """Represents the details of a backfill.""" + """ + Represents the details of a backfill. + + .. deprecated:: 3.1.8 + Use DagAccessEntity.Run instead for a dag level access control. + """ id: NonNegativeInt | None = None diff --git a/airflow-core/src/airflow/api_fastapi/auth/managers/simple/simple_auth_manager.py b/airflow-core/src/airflow/api_fastapi/auth/managers/simple/simple_auth_manager.py index f9f9c395e3fe2..1bdbcc0a3a233 100644 --- a/airflow-core/src/airflow/api_fastapi/auth/managers/simple/simple_auth_manager.py +++ b/airflow-core/src/airflow/api_fastapi/auth/managers/simple/simple_auth_manager.py @@ -36,7 +36,7 @@ from airflow.api_fastapi.app import AUTH_MANAGER_FASTAPI_APP_PREFIX from airflow.api_fastapi.auth.managers.base_auth_manager import BaseAuthManager -from airflow.api_fastapi.auth.managers.models.resource_details import BackfillDetails, TeamDetails +from airflow.api_fastapi.auth.managers.models.resource_details import TeamDetails from airflow.api_fastapi.auth.managers.simple.user import SimpleAuthManagerUser from airflow.api_fastapi.common.types import MenuItem from airflow.configuration import AIRFLOW_HOME, conf @@ -194,20 +194,6 @@ def is_authorized_dag( user=user, ) - def is_authorized_backfill( - self, - *, - method: ResourceMethod, - user: SimpleAuthManagerUser, - details: BackfillDetails | None = None, - ) -> bool: - return self._is_authorized( - method=method, - allow_get_role=SimpleAuthManagerRole.VIEWER, - allow_role=SimpleAuthManagerRole.OP, - user=user, - ) - def is_authorized_asset( self, *, diff --git a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/backfills.py b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/backfills.py index adbac360247c0..b106248f136ed 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/backfills.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/backfills.py @@ -25,7 +25,6 @@ from sqlalchemy.orm import joinedload from airflow._shared.timezones import timezone -from airflow.api_fastapi.auth.managers.models.resource_details import DagAccessEntity from airflow.api_fastapi.common.db.common import ( SessionDep, paginated_select, @@ -42,7 +41,7 @@ from airflow.api_fastapi.core_api.openapi.exceptions import ( create_openapi_http_exception_doc, ) -from airflow.api_fastapi.core_api.security import GetUserDep, requires_access_backfill, requires_access_dag +from airflow.api_fastapi.core_api.security import GetUserDep, requires_access_backfill from airflow.api_fastapi.logging.decorators import action_logging from airflow.exceptions import DagNotFound from airflow.models import DagRun @@ -121,7 +120,6 @@ def get_backfill( dependencies=[ Depends(action_logging()), Depends(requires_access_backfill(method="PUT")), - Depends(requires_access_dag(method="PUT", access_entity=DagAccessEntity.RUN)), ], ) def pause_backfill(backfill_id: NonNegativeInt, session: SessionDep) -> BackfillResponse: @@ -149,7 +147,6 @@ def pause_backfill(backfill_id: NonNegativeInt, session: SessionDep) -> Backfill dependencies=[ Depends(action_logging()), Depends(requires_access_backfill(method="PUT")), - Depends(requires_access_dag(method="PUT", access_entity=DagAccessEntity.RUN)), ], ) def unpause_backfill(backfill_id: NonNegativeInt, session: SessionDep) -> BackfillResponse: @@ -175,7 +172,6 @@ def unpause_backfill(backfill_id: NonNegativeInt, session: SessionDep) -> Backfi ), dependencies=[ Depends(action_logging()), - Depends(requires_access_dag(method="PUT", access_entity=DagAccessEntity.RUN)), Depends(requires_access_backfill(method="PUT")), ], ) @@ -222,7 +218,6 @@ def cancel_backfill(backfill_id: NonNegativeInt, session: SessionDep) -> Backfil responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND, status.HTTP_409_CONFLICT]), dependencies=[ Depends(action_logging()), - Depends(requires_access_dag(method="POST", access_entity=DagAccessEntity.RUN)), Depends(requires_access_backfill(method="POST")), ], ) @@ -270,7 +265,6 @@ def create_backfill( path="/dry_run", responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND, status.HTTP_409_CONFLICT]), dependencies=[ - Depends(requires_access_dag(method="POST", access_entity=DagAccessEntity.RUN)), Depends(requires_access_backfill(method="POST")), ], ) diff --git a/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/backfills.py b/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/backfills.py index c4c7ffe79a7fc..32b2891b9543a 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/backfills.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/backfills.py @@ -36,7 +36,7 @@ from airflow.api_fastapi.core_api.openapi.exceptions import ( create_openapi_http_exception_doc, ) -from airflow.api_fastapi.core_api.security import requires_access_backfill, requires_access_dag +from airflow.api_fastapi.core_api.security import requires_access_backfill from airflow.models.backfill import Backfill backfills_router = AirflowRouter(tags=["Backfill"], prefix="/backfills") @@ -47,7 +47,6 @@ responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]), dependencies=[ Depends(requires_access_backfill(method="GET")), - Depends(requires_access_dag(method="GET")), ], ) def list_backfills_ui( diff --git a/airflow-core/src/airflow/api_fastapi/core_api/security.py b/airflow-core/src/airflow/api_fastapi/core_api/security.py index 875513c8b2e0d..1a3f195084f71 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/security.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/security.py @@ -17,6 +17,7 @@ from __future__ import annotations from collections.abc import Callable +from json import JSONDecodeError from pathlib import Path from typing import TYPE_CHECKING, Annotated, cast from urllib.parse import ParseResult, unquote, urljoin, urlparse @@ -24,8 +25,8 @@ from fastapi import Depends, HTTPException, Request, status from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer, OAuth2PasswordBearer from jwt import ExpiredSignatureError, InvalidTokenError -from pydantic import NonNegativeInt -from sqlalchemy import or_ +from sqlalchemy import or_, select +from sqlalchemy.orm import Session from airflow.api_fastapi.app import get_auth_manager from airflow.api_fastapi.auth.managers.base_auth_manager import ( @@ -42,7 +43,6 @@ AccessView, AssetAliasDetails, AssetDetails, - BackfillDetails, ConfigurationDetails, ConnectionDetails, DagAccessEntity, @@ -50,6 +50,7 @@ PoolDetails, VariableDetails, ) +from airflow.api_fastapi.common.db.common import SessionDep from airflow.api_fastapi.core_api.base import OrmClause from airflow.api_fastapi.core_api.datamodels.common import ( BulkAction, @@ -64,6 +65,7 @@ from airflow.api_fastapi.core_api.datamodels.variables import VariableBody from airflow.configuration import conf from airflow.models import Connection, Pool, Variable +from airflow.models.backfill import Backfill from airflow.models.dag import DagModel, DagRun, DagTag from airflow.models.dagwarning import DagWarning from airflow.models.log import Log @@ -146,14 +148,19 @@ async def get_user( def requires_access_dag( - method: ResourceMethod, access_entity: DagAccessEntity | None = None + method: ResourceMethod, + access_entity: DagAccessEntity | None = None, + dag_id: str | None = None, ) -> Callable[[Request, BaseUser], None]: def inner( request: Request, user: GetUserDep, ) -> None: - dag_id = request.path_params.get("dag_id") or request.query_params.get("dag_id") - dag_id = dag_id if dag_id != "~" else None + nonlocal dag_id + if dag_id is None: + dag_id = request.path_params.get("dag_id") or request.query_params.get("dag_id") + dag_id = dag_id if dag_id != "~" else None + team_name = DagModel.get_team_name(dag_id) if dag_id else None _requires_access( @@ -263,17 +270,33 @@ def depends_permitted_dags_filter( ] -def requires_access_backfill(method: ResourceMethod) -> Callable[[Request, BaseUser], None]: - def inner( +def requires_access_backfill(method: ResourceMethod) -> Callable[[Request, BaseUser, Session], None]: + """Wrap ``requires_access_dag`` and extract the dag_id from the backfill_id.""" + + async def inner( request: Request, user: GetUserDep, + session: SessionDep, ) -> None: - backfill_id: NonNegativeInt | None = request.path_params.get("backfill_id") - - _requires_access( - is_authorized_callback=lambda: get_auth_manager().is_authorized_backfill( - method=method, details=BackfillDetails(id=backfill_id), user=user - ), + dag_id = None + + # Try to retrieve the dag_id from the backfill_id path param + backfill_id = request.path_params.get("backfill_id") + if backfill_id is not None: + backfill = session.scalars(select(Backfill).where(Backfill.id == backfill_id)).one_or_none() + dag_id = backfill.dag_id if backfill else None + + # Try to retrieve the dag_id from the request body (POST backfill) + if dag_id is None: + try: + dag_id = (await request.json()).get("dag_id") + except JSONDecodeError: + # Not a json body, ignore + pass + + requires_access_dag(method, DagAccessEntity.RUN, dag_id)( + request=request, + user=user, ) return inner diff --git a/airflow-core/tests/unit/api_fastapi/auth/managers/simple/test_simple_auth_manager.py b/airflow-core/tests/unit/api_fastapi/auth/managers/simple/test_simple_auth_manager.py index 2c67f79a5b879..1356a9fe1e8b4 100644 --- a/airflow-core/tests/unit/api_fastapi/auth/managers/simple/test_simple_auth_manager.py +++ b/airflow-core/tests/unit/api_fastapi/auth/managers/simple/test_simple_auth_manager.py @@ -135,7 +135,6 @@ def test_serialize_user(self, auth_manager): "is_authorized_dag", "is_authorized_asset", "is_authorized_asset_alias", - "is_authorized_backfill", "is_authorized_pool", "is_authorized_variable", ], @@ -191,7 +190,6 @@ def test_is_authorized_view_methods(self, auth_manager, api, kwargs, role, resul "is_authorized_connection", "is_authorized_asset", "is_authorized_asset_alias", - "is_authorized_backfill", "is_authorized_pool", "is_authorized_variable", ], @@ -237,7 +235,6 @@ def test_is_authorized_methods_user_role_required(self, auth_manager, api, role, "is_authorized_dag", "is_authorized_asset", "is_authorized_asset_alias", - "is_authorized_backfill", "is_authorized_pool", ], ) diff --git a/airflow-core/tests/unit/api_fastapi/auth/managers/test_base_auth_manager.py b/airflow-core/tests/unit/api_fastapi/auth/managers/test_base_auth_manager.py index e719ff4836b58..71769ef49a600 100644 --- a/airflow-core/tests/unit/api_fastapi/auth/managers/test_base_auth_manager.py +++ b/airflow-core/tests/unit/api_fastapi/auth/managers/test_base_auth_manager.py @@ -25,7 +25,6 @@ from airflow.api_fastapi.auth.managers.base_auth_manager import BaseAuthManager, T from airflow.api_fastapi.auth.managers.models.base_user import BaseUser from airflow.api_fastapi.auth.managers.models.resource_details import ( - BackfillDetails, ConnectionDetails, DagDetails, PoolDetails, @@ -92,15 +91,6 @@ def is_authorized_dag( ) -> bool: raise NotImplementedError() - def is_authorized_backfill( - self, - *, - method: ResourceMethod, - details: BackfillDetails | None = None, - user: BaseAuthManagerUserTest | None = None, - ) -> bool: - raise NotImplementedError() - def is_authorized_asset( self, *, diff --git a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_backfills.py b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_backfills.py index ade10293ef6d9..c55e284de62fd 100644 --- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_backfills.py +++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_backfills.py @@ -126,7 +126,7 @@ def test_list_backfill(self, test_client, session): session.add(b) session.commit() - with assert_queries_count(2): + with assert_queries_count(3): response = test_client.get(f"/backfills?dag_id={dag.dag_id}") assert response.status_code == 200 diff --git a/providers/amazon/src/airflow/providers/amazon/aws/auth_manager/aws_auth_manager.py b/providers/amazon/src/airflow/providers/amazon/aws/auth_manager/aws_auth_manager.py index aa23b4f2634f3..4d570760462cf 100644 --- a/providers/amazon/src/airflow/providers/amazon/aws/auth_manager/aws_auth_manager.py +++ b/providers/amazon/src/airflow/providers/amazon/aws/auth_manager/aws_auth_manager.py @@ -16,6 +16,7 @@ # under the License. from __future__ import annotations +import warnings from collections import defaultdict from collections.abc import Sequence from functools import cached_property @@ -27,6 +28,7 @@ from airflow.api_fastapi.app import AUTH_MANAGER_FASTAPI_APP_PREFIX from airflow.api_fastapi.auth.managers.base_auth_manager import BaseAuthManager from airflow.cli.cli_config import CLICommand +from airflow.exceptions import AirflowProviderDeprecationWarning from airflow.providers.amazon.aws.auth_manager.avp.entities import AvpEntities from airflow.providers.amazon.aws.auth_manager.avp.facade import ( AwsAuthManagerAmazonVerifiedPermissionsFacade, @@ -158,6 +160,12 @@ def is_authorized_dag( def is_authorized_backfill( self, *, method: ResourceMethod, user: AwsAuthManagerUser, details: BackfillDetails | None = None ) -> bool: + warnings.warn( + "Use ``is_authorized_dag`` on ``DagAccessEntity.RUN`` instead for a dag level access control.", + AirflowProviderDeprecationWarning, + stacklevel=2, + ) + backfill_id = details.id if details else None return self.avp_facade.is_authorized( method=method, entity_type=AvpEntities.BACKFILL, user=user, entity_id=backfill_id diff --git a/providers/fab/src/airflow/providers/fab/auth_manager/fab_auth_manager.py b/providers/fab/src/airflow/providers/fab/auth_manager/fab_auth_manager.py index aebd4fa6f149c..0ffcd6d0dcb3f 100644 --- a/providers/fab/src/airflow/providers/fab/auth_manager/fab_auth_manager.py +++ b/providers/fab/src/airflow/providers/fab/auth_manager/fab_auth_manager.py @@ -17,6 +17,7 @@ # under the License. from __future__ import annotations +import warnings from functools import cached_property from pathlib import Path from typing import TYPE_CHECKING, Any @@ -51,7 +52,7 @@ ) from airflow.api_fastapi.common.types import ExtraMenuItem, MenuItem from airflow.configuration import conf -from airflow.exceptions import AirflowConfigException +from airflow.exceptions import AirflowConfigException, AirflowProviderDeprecationWarning from airflow.models import Connection, DagModel, Pool, Variable from airflow.providers.common.compat.sdk import AirflowException from airflow.providers.fab.auth_manager.models import Permission, Role, User @@ -389,6 +390,11 @@ def is_authorized_backfill( user: User, details: BackfillDetails | None = None, ) -> bool: + warnings.warn( + "Use ``is_authorized_dag`` on ``DagAccessEntity.RUN`` instead for a dag level access control.", + AirflowProviderDeprecationWarning, + stacklevel=2, + ) return self._is_authorized(method=method, resource_type=RESOURCE_BACKFILL, user=user) def is_authorized_asset( diff --git a/providers/keycloak/src/airflow/providers/keycloak/auth_manager/keycloak_auth_manager.py b/providers/keycloak/src/airflow/providers/keycloak/auth_manager/keycloak_auth_manager.py index dc8de7c25954e..1daca0ffd7429 100644 --- a/providers/keycloak/src/airflow/providers/keycloak/auth_manager/keycloak_auth_manager.py +++ b/providers/keycloak/src/airflow/providers/keycloak/auth_manager/keycloak_auth_manager.py @@ -19,6 +19,7 @@ import json import logging import time +import warnings from base64 import urlsafe_b64decode from typing import TYPE_CHECKING, Any from urllib.parse import urljoin @@ -32,6 +33,7 @@ from airflow.api_fastapi.app import AUTH_MANAGER_FASTAPI_APP_PREFIX from airflow.api_fastapi.auth.managers.base_auth_manager import BaseAuthManager +from airflow.exceptions import AirflowProviderDeprecationWarning try: from airflow.api_fastapi.auth.managers.base_auth_manager import ExtendedResourceMethod @@ -220,6 +222,12 @@ def is_authorized_dag( def is_authorized_backfill( self, *, method: ResourceMethod, user: KeycloakAuthManagerUser, details: BackfillDetails | None = None ) -> bool: + warnings.warn( + "Use ``is_authorized_dag`` on ``DagAccessEntity.RUN`` instead for a dag level access control.", + AirflowProviderDeprecationWarning, + stacklevel=2, + ) + backfill_id = str(details.id) if details else None return self._is_authorized( method=method, resource_type=KeycloakResource.BACKFILL, user=user, resource_id=backfill_id