-
Notifications
You must be signed in to change notification settings - Fork 16.6k
Description
Apache Airflow version
main (development)
If "Other Airflow 3 version" selected, which one?
No response
What happened?
When TriggerDagRunOperator operator raises DagRunTriggerException in task-sdk it's handled to trigger the dag but when the target dag is invalid then API server returns 404 which is re-raised. Since the Exception is raised under a block that handles the DagRunTriggerException state variable is not set with a value causing finally block to give UnboundLocalError. When TriggerDagRunOperator tries to rerun a dag with same run_id then the conflict status code response from API server is handled. 404 can also be handled in similar manner but this feels like not something limited to TriggerDagRunOperator but in turn could be caused by others too. The other approach would be to initialize state with a sentinel value to emit stats only when it's defined.
_handle_trigger_dag_run raising a ServerResponseError
airflow/task-sdk/src/airflow/sdk/execution_time/task_runner.py
Lines 1265 to 1266 in fabc5c3
| except DagRunTriggerException as drte: | |
| msg, state = _handle_trigger_dag_run(drte, context, ti, log) |
finally block accessing state that is not initialized
airflow/task-sdk/src/airflow/sdk/execution_time/task_runner.py
Lines 1323 to 1329 in fabc5c3
| finally: | |
| Stats.incr( | |
| f"ti.finish.{ti.dag_id}.{ti.task_id}.{state.value}", | |
| tags=stats_tags, | |
| ) | |
| # Same metric with tagging | |
| Stats.incr("ti.finish", tags={**stats_tags, "state": state.value}) |
::group::Log message source details sources=["/home/karthikeyan/airflow/logs/dag_id=test_non_existent_triggerdagrun/run_id=manual__2026-03-07T17:50:19.584252+00:00/task_id=trigger_dag/attempt=4.log"]
::endgroup::
[2026-03-07T18:05:25.663612Z] INFO - DAG bundles loaded: dags-folder-1, example_dags
[2026-03-07T18:05:25.664236Z] INFO - Filling up the DagBag from /home/karthikeyan/airflow/dags/non_existent_dagrun.py
[2026-03-07T18:05:25.717248Z] INFO - Task instance is in running state
[2026-03-07T18:05:25.717448Z] INFO - Previous state of the Task instance: TaskInstanceState.QUEUED
[2026-03-07T18:05:25.717663Z] INFO - Current task name:trigger_dag
[2026-03-07T18:05:25.717759Z] INFO - Dag name:test_non_existent_triggerdagrun
[2026-03-07T18:05:25.718240Z] INFO - Triggering Dag Run. trigger_dag_id=nonexistent_dag_id
[2026-03-07T18:05:25.734854Z] ERROR - Top level errorUnboundLocalError: cannot access local variable 'state' where it is not associated with a value
File /home/karthikeyan/stuff/python/airflow/task-sdk/src/airflow/sdk/execution_time/task_runner.py, line 1827 in main
File /home/karthikeyan/stuff/python/airflow/task-sdk/src/airflow/sdk/execution_time/task_runner.py, line 1325 in run
,AirflowRuntimeError: API_SERVER_ERROR: {'status_code': 404, 'message': 'Server returned error', 'detail': {'detail': {'reason': 'not_found', 'message': "Dag with dag_id: 'nonexistent_dag_id' not found"}}}
File /home/karthikeyan/stuff/python/airflow/task-sdk/src/airflow/sdk/execution_time/task_runner.py, line 1266 in run
File /home/karthikeyan/stuff/python/airflow/task-sdk/src/airflow/sdk/execution_time/task_runner.py, line 1396 in _handle_trigger_dag_run
File /home/karthikeyan/stuff/python/airflow/task-sdk/src/airflow/sdk/execution_time/comms.py, line 218 in send
File /home/karthikeyan/stuff/python/airflow/task-sdk/src/airflow/sdk/execution_time/comms.py, line 312 in _get_response
File /home/karthikeyan/stuff/python/airflow/task-sdk/src/airflow/sdk/execution_time/comms.py, line 299 in _from_frame
,DagRunTriggerException:
File /home/karthikeyan/stuff/python/airflow/task-sdk/src/airflow/sdk/execution_time/task_runner.py, line 1236 in run
File /home/karthikeyan/stuff/python/airflow/task-sdk/src/airflow/sdk/execution_time/task_runner.py, line 1653 in _execute_task
File /home/karthikeyan/stuff/python/airflow/task-sdk/src/airflow/sdk/bases/operator.py, line 443 in wrapper
File /home/karthikeyan/stuff/python/airflow/providers/standard/src/airflow/providers/standard/operators/trigger_dagrun.py, line 269 in execute
File /home/karthikeyan/stuff/python/airflow/providers/standard/src/airflow/providers/standard/operators/trigger_dagrun.py, line 297 in _trigger_dag_af_3
[2026-03-07T18:05:25.745036Z] WARNING - Process exited abnormally exit_code=1
What you think should happen instead?
No response
How to reproduce
- Trigger a dag with TriggerDagRunOperator and a target dag that doesn't exist.
from __future__ import annotations
from airflow.providers.standard.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.sdk import dag
@dag()
def test_non_existent_triggerdagrun():
trigger_dag = TriggerDagRunOperator(
task_id="trigger_dag", trigger_dag_id="nonexistent_dag_id", wait_for_completion=True
)
test_non_existent_triggerdagrun()test case in tests/task_sdk/api/test_client.py
def test_trigger_dag_not_found(self):
"""Test that if the dag is not found, the client returns an error"""
def handle_request(request: httpx.Request) -> httpx.Response:
if request.url.path == "/dag-runs/test_nonexistent_dag/test_run_id":
return httpx.Response(
status_code=404,
json={
"detail": {
"reason": "not_found",
"message": "Dag with dag_id: 'test_nonexistent_dag' not found",
}
},
)
return httpx.Response(status_code=422)
client = make_client(transport=httpx.MockTransport(handle_request))
result = client.dag_runs.trigger(dag_id="test_nonexistent_dag", run_id="test_run_id")
assert result == ErrorResponse(error=ErrorType.DAG_NOT_FOUND)Operating System
Ubuntu 20.04
Versions of Apache Airflow Providers
No response
Deployment
Virtualenv installation
Deployment details
No response
Anything else?
No response
Are you willing to submit PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project's Code of Conduct