Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion airflow-core/docs/core-concepts/auth-manager/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,6 @@ These authorization methods are:
Also, ``is_authorized_dag`` is called for any entity related to Dags (e.g. task instances, Dag runs, ...). This information is passed in ``access_entity``.
Example: ``auth_manager.is_authorized_dag(method="GET", access_entity=DagAccessEntity.Run, details=DagDetails(id="dag-1"))`` asks
whether the user has permission to read the Dag runs of the Dag "dag-1".
* ``is_authorized_backfill``: Return whether the user is authorized to access Airflow backfills. Some details about the backfill can be provided (e.g. the backfill ID).
* ``is_authorized_asset``: Return whether the user is authorized to access Airflow assets. Some details about the asset can be provided (e.g. the asset ID).
* ``is_authorized_asset_alias``: Return whether the user is authorized to access Airflow asset aliases. Some details about the asset alias can be provided (e.g. the asset alias ID).
* ``is_authorized_pool``: Return whether the user is authorized to access Airflow pools. Some details about the pool can be provided (e.g. the pool name).
Expand Down
20 changes: 20 additions & 0 deletions airflow-core/newsfragments/61400.significant.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
AuthManager Backfill permissions are now handled by the ``requires_access_dag`` on the ``DagAccessEntity.Run``

``is_authorized_backfill`` of the ``BaseAuthManager`` interface has been removed. Core will no longer call this method and their
provider counterpart implementation will be marked as deprecated.
Permissions for backfill operations are now checked against the ``DagAccessEntity.Run`` permission using the existing
``requires_access_dag`` decorator. In other words, if a user has permission to run a DAG, they can perform backfill operations on it.

Please update your security policies to ensure that users who need to perform backfill operations have the appropriate ``DagAccessEntity.Run`` permissions. (Users
having the Backfill permissions without having the DagRun ones will no longer be able to perform backfill operations without any update)

* Types of change

* [ ] Dag changes
* [ ] Config changes
* [x] API changes
* [ ] CLI changes
* [x] Behaviour changes
* [ ] Plugin changes
* [ ] Dependency changes
* [ ] Code interface changes
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@

from airflow.api_fastapi.auth.managers.models.base_user import BaseUser
from airflow.api_fastapi.auth.managers.models.resource_details import (
BackfillDetails,
ConnectionDetails,
DagDetails,
PoolDetails,
Expand Down Expand Up @@ -230,22 +229,6 @@ def is_authorized_dag(
:param details: optional details about the DAG
"""

@abstractmethod
def is_authorized_backfill(
self,
*,
method: ResourceMethod,
user: T,
details: BackfillDetails | None = None,
) -> bool:
"""
Return whether the user is authorized to perform a given action on a backfill.

:param method: the method to perform
:param user: the user to performing the action
:param details: optional details about the backfill
"""

@abstractmethod
def is_authorized_asset(
self,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,12 @@ class DagDetails:

@dataclass
class BackfillDetails:
"""Represents the details of a backfill."""
"""
Represents the details of a backfill.
.. deprecated:: 3.1.8
Use DagAccessEntity.Run instead for a dag level access control.
"""

id: NonNegativeInt | None = None

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@

from airflow.api_fastapi.app import AUTH_MANAGER_FASTAPI_APP_PREFIX
from airflow.api_fastapi.auth.managers.base_auth_manager import BaseAuthManager
from airflow.api_fastapi.auth.managers.models.resource_details import BackfillDetails, TeamDetails
from airflow.api_fastapi.auth.managers.models.resource_details import TeamDetails
from airflow.api_fastapi.auth.managers.simple.user import SimpleAuthManagerUser
from airflow.api_fastapi.common.types import MenuItem
from airflow.configuration import AIRFLOW_HOME, conf
Expand Down Expand Up @@ -194,20 +194,6 @@ def is_authorized_dag(
user=user,
)

def is_authorized_backfill(
self,
*,
method: ResourceMethod,
user: SimpleAuthManagerUser,
details: BackfillDetails | None = None,
) -> bool:
return self._is_authorized(
method=method,
allow_get_role=SimpleAuthManagerRole.VIEWER,
allow_role=SimpleAuthManagerRole.OP,
user=user,
)

def is_authorized_asset(
self,
*,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
from sqlalchemy.orm import joinedload

from airflow._shared.timezones import timezone
from airflow.api_fastapi.auth.managers.models.resource_details import DagAccessEntity
from airflow.api_fastapi.common.db.common import (
SessionDep,
paginated_select,
Expand All @@ -42,7 +41,7 @@
from airflow.api_fastapi.core_api.openapi.exceptions import (
create_openapi_http_exception_doc,
)
from airflow.api_fastapi.core_api.security import GetUserDep, requires_access_backfill, requires_access_dag
from airflow.api_fastapi.core_api.security import GetUserDep, requires_access_backfill
from airflow.api_fastapi.logging.decorators import action_logging
from airflow.exceptions import DagNotFound
from airflow.models import DagRun
Expand Down Expand Up @@ -121,7 +120,6 @@ def get_backfill(
dependencies=[
Depends(action_logging()),
Depends(requires_access_backfill(method="PUT")),
Depends(requires_access_dag(method="PUT", access_entity=DagAccessEntity.RUN)),
],
)
def pause_backfill(backfill_id: NonNegativeInt, session: SessionDep) -> BackfillResponse:
Expand Down Expand Up @@ -149,7 +147,6 @@ def pause_backfill(backfill_id: NonNegativeInt, session: SessionDep) -> Backfill
dependencies=[
Depends(action_logging()),
Depends(requires_access_backfill(method="PUT")),
Depends(requires_access_dag(method="PUT", access_entity=DagAccessEntity.RUN)),
],
)
def unpause_backfill(backfill_id: NonNegativeInt, session: SessionDep) -> BackfillResponse:
Expand All @@ -175,7 +172,6 @@ def unpause_backfill(backfill_id: NonNegativeInt, session: SessionDep) -> Backfi
),
dependencies=[
Depends(action_logging()),
Depends(requires_access_dag(method="PUT", access_entity=DagAccessEntity.RUN)),
Depends(requires_access_backfill(method="PUT")),
],
)
Expand Down Expand Up @@ -222,7 +218,6 @@ def cancel_backfill(backfill_id: NonNegativeInt, session: SessionDep) -> Backfil
responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND, status.HTTP_409_CONFLICT]),
dependencies=[
Depends(action_logging()),
Depends(requires_access_dag(method="POST", access_entity=DagAccessEntity.RUN)),
Depends(requires_access_backfill(method="POST")),
],
)
Expand Down Expand Up @@ -270,7 +265,6 @@ def create_backfill(
path="/dry_run",
responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND, status.HTTP_409_CONFLICT]),
dependencies=[
Depends(requires_access_dag(method="POST", access_entity=DagAccessEntity.RUN)),
Depends(requires_access_backfill(method="POST")),
],
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
from airflow.api_fastapi.core_api.openapi.exceptions import (
create_openapi_http_exception_doc,
)
from airflow.api_fastapi.core_api.security import requires_access_backfill, requires_access_dag
from airflow.api_fastapi.core_api.security import requires_access_backfill
from airflow.models.backfill import Backfill

backfills_router = AirflowRouter(tags=["Backfill"], prefix="/backfills")
Expand All @@ -47,7 +47,6 @@
responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]),
dependencies=[
Depends(requires_access_backfill(method="GET")),
Depends(requires_access_dag(method="GET")),
],
)
def list_backfills_ui(
Expand Down
59 changes: 43 additions & 16 deletions airflow-core/src/airflow/api_fastapi/core_api/security.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,17 @@
# under the License.
from __future__ import annotations

from collections.abc import Callable
from collections.abc import Callable, Coroutine
from json import JSONDecodeError
from pathlib import Path
from typing import TYPE_CHECKING, Annotated, cast
from typing import TYPE_CHECKING, Annotated, Any, cast
from urllib.parse import ParseResult, unquote, urljoin, urlparse

from fastapi import Depends, HTTPException, Request, status
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer, OAuth2PasswordBearer
from jwt import ExpiredSignatureError, InvalidTokenError
from pydantic import NonNegativeInt
from sqlalchemy import or_
from sqlalchemy import or_, select
from sqlalchemy.orm import Session

from airflow.api_fastapi.app import get_auth_manager
from airflow.api_fastapi.auth.managers.base_auth_manager import (
Expand All @@ -42,14 +43,14 @@
AccessView,
AssetAliasDetails,
AssetDetails,
BackfillDetails,
ConfigurationDetails,
ConnectionDetails,
DagAccessEntity,
DagDetails,
PoolDetails,
VariableDetails,
)
from airflow.api_fastapi.common.db.common import SessionDep
from airflow.api_fastapi.core_api.base import OrmClause
from airflow.api_fastapi.core_api.datamodels.common import (
BulkAction,
Expand All @@ -64,6 +65,7 @@
from airflow.api_fastapi.core_api.datamodels.variables import VariableBody
from airflow.configuration import conf
from airflow.models import Connection, Pool, Variable
from airflow.models.backfill import Backfill
from airflow.models.dag import DagModel, DagRun, DagTag
from airflow.models.dagwarning import DagWarning
from airflow.models.log import Log
Expand Down Expand Up @@ -146,14 +148,21 @@ async def get_user(


def requires_access_dag(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you cover these changes in unit tests as well?

method: ResourceMethod, access_entity: DagAccessEntity | None = None
method: ResourceMethod,
access_entity: DagAccessEntity | None = None,
param_dag_id: str | None = None,
) -> Callable[[Request, BaseUser], None]:
def inner(
request: Request,
user: GetUserDep,
) -> None:
dag_id = request.path_params.get("dag_id") or request.query_params.get("dag_id")
dag_id = dag_id if dag_id != "~" else None
# Required for the closure to capture the dag_id but still be able to mutate it.
# Prevent from using a nonlocal statement causing test failures.
dag_id = param_dag_id
if dag_id is None:
dag_id = request.path_params.get("dag_id") or request.query_params.get("dag_id")
dag_id = dag_id if dag_id != "~" else None

team_name = DagModel.get_team_name(dag_id) if dag_id else None

_requires_access(
Expand Down Expand Up @@ -263,17 +272,35 @@ def depends_permitted_dags_filter(
]


def requires_access_backfill(method: ResourceMethod) -> Callable[[Request, BaseUser], None]:
def inner(
def requires_access_backfill(
method: ResourceMethod,
) -> Callable[[Request, BaseUser, Session], Coroutine[Any, Any, None]]:
"""Wrap ``requires_access_dag`` and extract the dag_id from the backfill_id."""

async def inner(
request: Request,
user: GetUserDep,
session: SessionDep,
) -> None:
backfill_id: NonNegativeInt | None = request.path_params.get("backfill_id")

_requires_access(
is_authorized_callback=lambda: get_auth_manager().is_authorized_backfill(
method=method, details=BackfillDetails(id=backfill_id), user=user
),
dag_id = None

# Try to retrieve the dag_id from the backfill_id path param
backfill_id = request.path_params.get("backfill_id")
if backfill_id is not None:
backfill = session.scalars(select(Backfill).where(Backfill.id == backfill_id)).one_or_none()
dag_id = backfill.dag_id if backfill else None

# Try to retrieve the dag_id from the request body (POST backfill)
if dag_id is None:
try:
dag_id = (await request.json()).get("dag_id")
except JSONDecodeError:
# Not a json body, ignore
pass

requires_access_dag(method, DagAccessEntity.RUN, dag_id)(
request,
user,
)

return inner
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,6 @@ def test_serialize_user(self, auth_manager):
"is_authorized_dag",
"is_authorized_asset",
"is_authorized_asset_alias",
"is_authorized_backfill",
"is_authorized_pool",
"is_authorized_variable",
],
Expand Down Expand Up @@ -191,7 +190,6 @@ def test_is_authorized_view_methods(self, auth_manager, api, kwargs, role, resul
"is_authorized_connection",
"is_authorized_asset",
"is_authorized_asset_alias",
"is_authorized_backfill",
"is_authorized_pool",
"is_authorized_variable",
],
Expand Down Expand Up @@ -237,7 +235,6 @@ def test_is_authorized_methods_user_role_required(self, auth_manager, api, role,
"is_authorized_dag",
"is_authorized_asset",
"is_authorized_asset_alias",
"is_authorized_backfill",
"is_authorized_pool",
],
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
from airflow.api_fastapi.auth.managers.base_auth_manager import BaseAuthManager, T
from airflow.api_fastapi.auth.managers.models.base_user import BaseUser
from airflow.api_fastapi.auth.managers.models.resource_details import (
BackfillDetails,
ConnectionDetails,
DagDetails,
PoolDetails,
Expand Down Expand Up @@ -92,15 +91,6 @@ def is_authorized_dag(
) -> bool:
raise NotImplementedError()

def is_authorized_backfill(
self,
*,
method: ResourceMethod,
details: BackfillDetails | None = None,
user: BaseAuthManagerUserTest | None = None,
) -> bool:
raise NotImplementedError()

def is_authorized_asset(
self,
*,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ def test_list_backfill(self, test_client, session):
session.add(b)
session.commit()

with assert_queries_count(2):
with assert_queries_count(3):
response = test_client.get(f"/backfills?dag_id={dag.dag_id}")

assert response.status_code == 200
Expand Down
Loading