Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions airflow-core/docs/security/security_model.rst
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,31 @@ boundaries will be improved in future versions of Airflow. Until then, you shoul
have access to all Dags and shared resources, and can modify their state regardless of team assignment.


Per-Dag read access and source-code retrieval
---------------------------------------------

The dag-source retrieval endpoint (``GET /api/v2/dagSources/{dag_id}``) honors
per-Dag read scoping for the **current** Dag-to-file mapping: if the file
backing the requested Dag also defines other Dags the caller is not authorized
to read, the endpoint returns a redacted placeholder instead of the source.

The endpoint also supports retrieving historical source via the optional
``version_number`` query parameter. For historical versions, the per-Dag scope
is enforced using the **current** file membership, which may differ from the
file's contents at the time the requested version was stored. As a consequence,
requesting an older version may return source containing a Dag that has since
been removed from the file, even if the caller does not currently have read
access to that removed Dag. Conversely, requesting an older version may return
the redacted placeholder when a later-added co-located Dag is not in the
caller's readable set, even though the requested historical source predates
that addition.

Deployments that rely on per-Dag read scoping for source isolation should
either keep one Dag per source file, or restrict ``DagAccessEntity.CODE`` to
roles that are trusted to read every Dag that has ever co-existed in any
source file.


Security contexts for Dag author submitted code
-----------------------------------------------

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,21 @@
from __future__ import annotations

from fastapi import Depends, HTTPException, Response, status
from sqlalchemy import select

from airflow.api_fastapi.app import get_auth_manager
from airflow.api_fastapi.auth.managers.models.resource_details import DagAccessEntity
from airflow.api_fastapi.common.db.common import SessionDep
from airflow.api_fastapi.common.headers import HeaderAcceptJsonOrText
from airflow.api_fastapi.common.router import AirflowRouter
from airflow.api_fastapi.common.types import Mimetype
from airflow.api_fastapi.core_api.datamodels.dag_sources import DAGSourceResponse
from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc
from airflow.api_fastapi.core_api.security import requires_access_dag
from airflow.api_fastapi.core_api.security import GetUserDep, requires_access_dag
from airflow.models import DagModel
from airflow.models.dag_version import DagVersion

REDACTED_SOURCE = "REDACTED - you do not have read permission on all Dags in the file"
dag_sources_router = AirflowRouter(tags=["DagSource"], prefix="/dagSources")


Expand Down Expand Up @@ -55,6 +59,7 @@ def get_dag_source(
accept: HeaderAcceptJsonOrText,
dag_id: str,
session: SessionDep,
user: GetUserDep,
version_number: int | None = None,
):
"""Get source code using file token."""
Expand All @@ -69,9 +74,33 @@ def get_dag_source(
status.HTTP_404_NOT_FOUND,
detail=f"Code not found. dag_id='{dag_id}' version_number='{version_number}'",
)

# Per-file authorization overlay on top of the ``DagAccessEntity.CODE``
# check above: a single source file may define multiple Dags, and the
# caller having CODE access to ``dag_id`` does not imply they may read
# every other Dag co-located in the file. Match the file by
# ``(relative_fileloc, bundle_name)`` -- the same keying
# ``import_error.py`` uses for its equivalent check -- and redact the
# response when any co-located Dag is not in the caller's readable set.
content = dag_version.dag_code.source_code
dag_model = dag_version.dag_model
if dag_model is not None and dag_model.relative_fileloc:
file_dag_ids = set(
session.scalars(
select(DagModel.dag_id).where(
DagModel.relative_fileloc == dag_model.relative_fileloc,
DagModel.bundle_name == dag_model.bundle_name,
)
).all()
)
if file_dag_ids:
readable_dag_ids = get_auth_manager().get_authorized_dag_ids(user=user)
if not file_dag_ids.issubset(readable_dag_ids):
content = REDACTED_SOURCE

dag_source_model = DAGSourceResponse(
dag_id=dag_id,
content=dag_version.dag_code.source_code,
content=content,
version_number=dag_version.version_number,
dag_display_name=dag_version.dag_model.dag_display_name,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,15 @@
from __future__ import annotations

import json
from unittest import mock

import pendulum
import pytest
from httpx import Response
from sqlalchemy import select

from airflow.api_fastapi.core_api.routes.public.dag_sources import REDACTED_SOURCE
from airflow.models import DagModel
from airflow.models.dagbag import DBDagBag
from airflow.models.dagcode import DagCode
from airflow.utils.state import DagRunState
Expand Down Expand Up @@ -180,3 +183,59 @@ def test_should_respond_404(self, test_client):
url = f"{API_PREFIX}/{wrong_fileloc}"
response = test_client.get(url, headers={"Accept": "application/json"})
assert response.status_code == 404

@pytest.fixture
def colocated_unreadable_dag(self, session, test_dag):
"""Insert a second ``DagModel`` sharing the requested Dag's source file."""
requested = session.scalar(select(DagModel).where(DagModel.dag_id == TEST_DAG_ID))
colocated = DagModel(
dag_id="other_dag_in_same_file",
fileloc=requested.fileloc,
relative_fileloc=requested.relative_fileloc,
bundle_name=requested.bundle_name,
is_paused=False,
)
session.add(colocated)
session.commit()
return colocated

@mock.patch("airflow.api_fastapi.core_api.routes.public.dag_sources.get_auth_manager")
def test_source_is_redacted_when_caller_cannot_read_all_dags_in_file(
self, mock_get_auth_manager, test_client, test_dag, colocated_unreadable_dag
):
mock_get_auth_manager.return_value.get_authorized_dag_ids.return_value = {TEST_DAG_ID}

response: Response = test_client.get(
f"{API_PREFIX}/{TEST_DAG_ID}", headers={"Accept": "application/json"}
)

assert response.status_code == 200
assert response.json() == {
"content": REDACTED_SOURCE,
"dag_id": TEST_DAG_ID,
"version_number": 1,
"dag_display_name": TEST_DAG_DISPLAY_NAME,
}
mock_get_auth_manager.return_value.get_authorized_dag_ids.assert_called_once_with(user=mock.ANY)

@mock.patch("airflow.api_fastapi.core_api.routes.public.dag_sources.get_auth_manager")
def test_source_is_returned_when_caller_can_read_all_dags_in_file(
self, mock_get_auth_manager, test_client, test_dag, colocated_unreadable_dag
):
mock_get_auth_manager.return_value.get_authorized_dag_ids.return_value = {
TEST_DAG_ID,
colocated_unreadable_dag.dag_id,
}
dag_content = self._get_dag_file_code(test_dag.fileloc)

response: Response = test_client.get(
f"{API_PREFIX}/{TEST_DAG_ID}", headers={"Accept": "application/json"}
)

assert response.status_code == 200
assert response.json() == {
"content": dag_content,
"dag_id": TEST_DAG_ID,
"version_number": 1,
"dag_display_name": TEST_DAG_DISPLAY_NAME,
}
Loading