From b0e1530a45fec62c536b7641890d0e40474d83f5 Mon Sep 17 00:00:00 2001 From: Jarek Potiuk Date: Thu, 28 May 2026 23:48:44 +0200 Subject: [PATCH 1/2] Apply per-file authorization to dag-source endpoint A single source file can define multiple Dags. The /dagSources/{dag_id} endpoint previously returned the file's full source code as soon as the caller had CODE access to dag_id, even when the caller was not authorized to read every other Dag defined in the same file. Apply the same per-file authorization overlay already used by the import-errors endpoint (apache/airflow#65329): enumerate the Dags sharing the (relative_fileloc, bundle_name) of the requested Dag, intersect with the caller's readable Dag set, and redact the source when any co-located Dag is not readable. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../core_api/routes/public/dag_sources.py | 33 ++++++++++- .../routes/public/test_dag_sources.py | 59 +++++++++++++++++++ 2 files changed, 90 insertions(+), 2 deletions(-) diff --git a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_sources.py b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_sources.py index 26920007672ad..6723938ec1bfe 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_sources.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_sources.py @@ -17,7 +17,9 @@ from __future__ import annotations from fastapi import Depends, HTTPException, Response, status +from sqlalchemy import select +from airflow.api_fastapi.app import get_auth_manager from airflow.api_fastapi.auth.managers.models.resource_details import DagAccessEntity from airflow.api_fastapi.common.db.common import SessionDep from airflow.api_fastapi.common.headers import HeaderAcceptJsonOrText @@ -25,9 +27,11 @@ from airflow.api_fastapi.common.types import Mimetype from airflow.api_fastapi.core_api.datamodels.dag_sources import DAGSourceResponse from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc -from airflow.api_fastapi.core_api.security import requires_access_dag +from airflow.api_fastapi.core_api.security import GetUserDep, requires_access_dag +from airflow.models import DagModel from airflow.models.dag_version import DagVersion +REDACTED_SOURCE = "REDACTED - you do not have read permission on all Dags in the file" dag_sources_router = AirflowRouter(tags=["DagSource"], prefix="/dagSources") @@ -55,6 +59,7 @@ def get_dag_source( accept: HeaderAcceptJsonOrText, dag_id: str, session: SessionDep, + user: GetUserDep, version_number: int | None = None, ): """Get source code using file token.""" @@ -69,9 +74,33 @@ def get_dag_source( status.HTTP_404_NOT_FOUND, detail=f"Code not found. dag_id='{dag_id}' version_number='{version_number}'", ) + + # Per-file authorization overlay on top of the ``DagAccessEntity.CODE`` + # check above: a single source file may define multiple Dags, and the + # caller having CODE access to ``dag_id`` does not imply they may read + # every other Dag co-located in the file. Match the file by + # ``(relative_fileloc, bundle_name)`` -- the same keying + # ``import_error.py`` uses for its equivalent check -- and redact the + # response when any co-located Dag is not in the caller's readable set. + content = dag_version.dag_code.source_code + dag_model = dag_version.dag_model + if dag_model is not None and dag_model.relative_fileloc: + file_dag_ids = set( + session.scalars( + select(DagModel.dag_id).where( + DagModel.relative_fileloc == dag_model.relative_fileloc, + DagModel.bundle_name == dag_model.bundle_name, + ) + ).all() + ) + if file_dag_ids: + readable_dag_ids = get_auth_manager().get_authorized_dag_ids(user=user) + if not file_dag_ids.issubset(readable_dag_ids): + content = REDACTED_SOURCE + dag_source_model = DAGSourceResponse( dag_id=dag_id, - content=dag_version.dag_code.source_code, + content=content, version_number=dag_version.version_number, dag_display_name=dag_version.dag_model.dag_display_name, ) diff --git a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_sources.py b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_sources.py index 59970da27ea6b..e7690ea47d393 100644 --- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_sources.py +++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_sources.py @@ -18,12 +18,15 @@ from __future__ import annotations import json +from unittest import mock import pendulum import pytest from httpx import Response from sqlalchemy import select +from airflow.api_fastapi.core_api.routes.public.dag_sources import REDACTED_SOURCE +from airflow.models import DagModel from airflow.models.dagbag import DBDagBag from airflow.models.dagcode import DagCode from airflow.utils.state import DagRunState @@ -180,3 +183,59 @@ def test_should_respond_404(self, test_client): url = f"{API_PREFIX}/{wrong_fileloc}" response = test_client.get(url, headers={"Accept": "application/json"}) assert response.status_code == 404 + + @pytest.fixture + def colocated_unreadable_dag(self, session, test_dag): + """Insert a second ``DagModel`` sharing the requested Dag's source file.""" + requested = session.scalar(select(DagModel).where(DagModel.dag_id == TEST_DAG_ID)) + colocated = DagModel( + dag_id="other_dag_in_same_file", + fileloc=requested.fileloc, + relative_fileloc=requested.relative_fileloc, + bundle_name=requested.bundle_name, + is_paused=False, + ) + session.add(colocated) + session.commit() + return colocated + + @mock.patch("airflow.api_fastapi.core_api.routes.public.dag_sources.get_auth_manager") + def test_source_is_redacted_when_caller_cannot_read_all_dags_in_file( + self, mock_get_auth_manager, test_client, test_dag, colocated_unreadable_dag + ): + mock_get_auth_manager.return_value.get_authorized_dag_ids.return_value = {TEST_DAG_ID} + + response: Response = test_client.get( + f"{API_PREFIX}/{TEST_DAG_ID}", headers={"Accept": "application/json"} + ) + + assert response.status_code == 200 + assert response.json() == { + "content": REDACTED_SOURCE, + "dag_id": TEST_DAG_ID, + "version_number": 1, + "dag_display_name": TEST_DAG_DISPLAY_NAME, + } + mock_get_auth_manager.return_value.get_authorized_dag_ids.assert_called_once_with(user=mock.ANY) + + @mock.patch("airflow.api_fastapi.core_api.routes.public.dag_sources.get_auth_manager") + def test_source_is_returned_when_caller_can_read_all_dags_in_file( + self, mock_get_auth_manager, test_client, test_dag, colocated_unreadable_dag + ): + mock_get_auth_manager.return_value.get_authorized_dag_ids.return_value = { + TEST_DAG_ID, + colocated_unreadable_dag.dag_id, + } + dag_content = self._get_dag_file_code(test_dag.fileloc) + + response: Response = test_client.get( + f"{API_PREFIX}/{TEST_DAG_ID}", headers={"Accept": "application/json"} + ) + + assert response.status_code == 200 + assert response.json() == { + "content": dag_content, + "dag_id": TEST_DAG_ID, + "version_number": 1, + "dag_display_name": TEST_DAG_DISPLAY_NAME, + } From bce61c5601a47fa97bdc0d30768a76764d775478 Mon Sep 17 00:00:00 2001 From: Jarek Potiuk Date: Fri, 29 May 2026 00:19:46 +0200 Subject: [PATCH 2/2] Document per-file authorization boundary for dag-source endpoint Add a Security Model subsection that describes the per-Dag read scope the dag-source retrieval endpoint enforces, and the known limitation around historical-version retrieval: the per-Dag scope is evaluated against the current file membership, which may differ from the file's contents at the time the requested version was stored. Deployments that rely on per-Dag read scoping for source isolation should keep one Dag per source file, or restrict DagAccessEntity.CODE accordingly. Co-Authored-By: Claude Opus 4.7 (1M context) --- airflow-core/docs/security/security_model.rst | 25 +++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/airflow-core/docs/security/security_model.rst b/airflow-core/docs/security/security_model.rst index 27294d1f99a4c..d69c19824d5c8 100644 --- a/airflow-core/docs/security/security_model.rst +++ b/airflow-core/docs/security/security_model.rst @@ -234,6 +234,31 @@ boundaries will be improved in future versions of Airflow. Until then, you shoul have access to all Dags and shared resources, and can modify their state regardless of team assignment. +Per-Dag read access and source-code retrieval +--------------------------------------------- + +The dag-source retrieval endpoint (``GET /api/v2/dagSources/{dag_id}``) honors +per-Dag read scoping for the **current** Dag-to-file mapping: if the file +backing the requested Dag also defines other Dags the caller is not authorized +to read, the endpoint returns a redacted placeholder instead of the source. + +The endpoint also supports retrieving historical source via the optional +``version_number`` query parameter. For historical versions, the per-Dag scope +is enforced using the **current** file membership, which may differ from the +file's contents at the time the requested version was stored. As a consequence, +requesting an older version may return source containing a Dag that has since +been removed from the file, even if the caller does not currently have read +access to that removed Dag. Conversely, requesting an older version may return +the redacted placeholder when a later-added co-located Dag is not in the +caller's readable set, even though the requested historical source predates +that addition. + +Deployments that rely on per-Dag read scoping for source isolation should +either keep one Dag per source file, or restrict ``DagAccessEntity.CODE`` to +roles that are trusted to read every Dag that has ever co-existed in any +source file. + + Security contexts for Dag author submitted code -----------------------------------------------