From 3a99df8ab6003bd3fa4350c336bc600c31246e8e Mon Sep 17 00:00:00 2001 From: Dheeraj Turaga Date: Mon, 4 May 2026 16:51:11 -0500 Subject: [PATCH 1/2] [v3-2-test] Propagate triggering user to child DAG runs via TriggerDagRunOperator (#65747) Previously, when a task used TriggerDagRunOperator to trigger a child DAG, the child run's triggering_user_name was left unset and the UI displayed it as anonymous. The information was already captured on the parent DagRun but never forwarded through the Execution API. The POST /execution/dag-runs/{dag_id}/{run_id} endpoint now looks up the calling task instance's parent DagRun and forwards its triggering_user_name to the new child run. Chains of TriggerDagRunOperator runs now show the original human user end-to-end (cherry picked from commit f06df11b29292e6b9c023d3308092360eb06dec1) Co-authored-by: Dheeraj Turaga --- .../execution_api/routes/dag_runs.py | 10 +++++ .../versions/head/test_dag_runs.py | 41 +++++++++++++++++++ 2 files changed, 51 insertions(+) diff --git a/airflow-core/src/airflow/api_fastapi/execution_api/routes/dag_runs.py b/airflow-core/src/airflow/api_fastapi/execution_api/routes/dag_runs.py index 69a06eca9b5e1..f1fd01dac7199 100644 --- a/airflow-core/src/airflow/api_fastapi/execution_api/routes/dag_runs.py +++ b/airflow-core/src/airflow/api_fastapi/execution_api/routes/dag_runs.py @@ -32,9 +32,12 @@ from airflow.api_fastapi.compat import HTTP_422_UNPROCESSABLE_CONTENT from airflow.api_fastapi.execution_api.datamodels.dagrun import DagRunStateResponse, TriggerDAGRunPayload from airflow.api_fastapi.execution_api.datamodels.taskinstance import DagRun +from airflow.api_fastapi.execution_api.datamodels.token import TIToken +from airflow.api_fastapi.execution_api.security import CurrentTIToken from airflow.exceptions import DagRunAlreadyExists from airflow.models.dag import DagModel from airflow.models.dagrun import DagRun as DagRunModel +from airflow.models.taskinstance import TaskInstance from airflow.utils.state import DagRunState from airflow.utils.types import DagRunTriggeredByType, DagRunType @@ -94,6 +97,7 @@ def trigger_dag_run( run_id: str, payload: TriggerDAGRunPayload, session: SessionDep, + token: TIToken = CurrentTIToken, ) -> None: """Trigger a Dag run.""" dm = session.scalar(select(DagModel).where(~DagModel.is_stale, DagModel.dag_id == dag_id).limit(1)) @@ -123,6 +127,11 @@ def trigger_dag_run( }, ) + # Inherit triggering_user_name from the calling task's DagRun so chains of + # TriggerDagRunOperator preserve the original human user across child runs. + parent_ti = session.get(TaskInstance, token.id) + triggering_user_name = parent_ti.dag_run.triggering_user_name if parent_ti else None + try: trigger_dag( dag_id=dag_id, @@ -130,6 +139,7 @@ def trigger_dag_run( conf=payload.conf, logical_date=payload.logical_date, triggered_by=DagRunTriggeredByType.OPERATOR, + triggering_user_name=triggering_user_name, replace_microseconds=False, partition_key=payload.partition_key, note=payload.note, diff --git a/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_dag_runs.py b/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_dag_runs.py index a29103139510c..5191770c5f31e 100644 --- a/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_dag_runs.py +++ b/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_dag_runs.py @@ -19,9 +19,12 @@ import pytest import time_machine +from fastapi import Request from sqlalchemy import select, update from airflow._shared.timezones import timezone +from airflow.api_fastapi.execution_api.datamodels.token import TIClaims, TIToken +from airflow.api_fastapi.execution_api.security import require_auth from airflow.models import DagModel from airflow.models.dagrun import DagRun from airflow.providers.standard.operators.empty import EmptyOperator @@ -191,6 +194,44 @@ def test_trigger_dag_run_already_exists(self, client, session, dag_maker): } } + @pytest.mark.parametrize("parent_triggering_user_name", ["alice", None]) + def test_trigger_dag_run_inherits_triggering_user_name( + self, client, exec_app, session, dag_maker, parent_triggering_user_name + ): + """Child DAG run inherits triggering_user_name from the calling task's parent run.""" + parent_dag_id = "parent_dag_inherits" + parent_run_id = "parent_run" + child_dag_id = "child_dag_inherits" + child_run_id = "child_run" + logical_date = timezone.datetime(2025, 2, 20) + + with dag_maker(dag_id=parent_dag_id, session=session, serialized=True): + EmptyOperator(task_id="trigger_task") + parent_run = dag_maker.create_dagrun( + run_id=parent_run_id, triggering_user_name=parent_triggering_user_name + ) + parent_ti = parent_run.task_instances[0] + + with dag_maker(dag_id=child_dag_id, session=session, serialized=True): + EmptyOperator(task_id="child_task") + session.commit() + + async def auth_as_parent_ti(request: Request) -> TIToken: + return TIToken(id=parent_ti.id, claims=TIClaims(scope="execution")) + + exec_app.dependency_overrides[require_auth] = auth_as_parent_ti + try: + response = client.post( + f"/execution/dag-runs/{child_dag_id}/{child_run_id}", + json={"logical_date": logical_date.isoformat()}, + ) + finally: + exec_app.dependency_overrides.pop(require_auth, None) + + assert response.status_code == 204 + child_run = session.scalars(select(DagRun).where(DagRun.run_id == child_run_id)).one() + assert child_run.triggering_user_name == parent_triggering_user_name + class TestDagRunClear: def setup_method(self): From e5d66b948c1f0437908cf7bc4923ef4215cb5fac Mon Sep 17 00:00:00 2001 From: Jarek Potiuk Date: Tue, 12 May 2026 05:21:55 +0200 Subject: [PATCH 2/2] [v3-2-test] Drop TIClaims usage in backport test MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The backport's test imports TIClaims from airflow.api_fastapi.execution_api.datamodels.token, but TIClaims was added on main by #63604 ("Validate task identity token claims with a typed schema") which was not backported to v3-2-test. On v3-2-test, TIToken.claims is still typed as `dict[str, Any]` (loose dict), so the test can construct it with a plain dict instead of the TIClaims model. Drops the TIClaims import and replaces TIClaims(scope="execution") with {"scope": "execution"}. This is the same shape of fix as #66743 — backport's test references a symbol that exists only on main. Inlining the adapted construction avoids a feature backport (#63604) just to satisfy a test fixture's typed argument. --- .../api_fastapi/execution_api/versions/head/test_dag_runs.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_dag_runs.py b/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_dag_runs.py index 5191770c5f31e..8d25936f77be4 100644 --- a/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_dag_runs.py +++ b/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_dag_runs.py @@ -23,7 +23,7 @@ from sqlalchemy import select, update from airflow._shared.timezones import timezone -from airflow.api_fastapi.execution_api.datamodels.token import TIClaims, TIToken +from airflow.api_fastapi.execution_api.datamodels.token import TIToken from airflow.api_fastapi.execution_api.security import require_auth from airflow.models import DagModel from airflow.models.dagrun import DagRun @@ -217,7 +217,7 @@ def test_trigger_dag_run_inherits_triggering_user_name( session.commit() async def auth_as_parent_ti(request: Request) -> TIToken: - return TIToken(id=parent_ti.id, claims=TIClaims(scope="execution")) + return TIToken(id=parent_ti.id, claims={"scope": "execution"}) exec_app.dependency_overrides[require_auth] = auth_as_parent_ti try: