Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,14 @@ def trigger_dag_run(
"message": f"A run already exists for Dag '{dag_id}' with run_id '{run_id}'",
},
)
except ValueError as e:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Catching all instances of ValueError is a bit too broad. Since it wraps full trigger_dag invocation, this could convert unrelated internal ValueErrors into 400 responses and mask real server-side issues.

Could you elaborate more on the scenarios under which you expected to encounter a ValueError? Maybe it is better to narrow this to that?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @SameerMesiah97 ,
I have tried to cover these ValueErrors https://github.com/apache/airflow/blob/main/airflow-core/src/airflow/serialization/definitions/dag.py#L544-L581 from create_dagrun function which run inside trigger_dag function under trigger_dar_run API call. Because in the current time users see this error:

Top level error
Traceback (most recent call last):
  File "/opt/python3.11/lib/python3.11/site-packages/airflow/sdk/execution_time/task_runner.py", line 920, in run
    result = _execute_task(context=context, ti=ti, log=log)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/python3.11/lib/python3.11/site-packages/airflow/sdk/execution_time/task_runner.py", line 1307, in _execute_task
    result = ctx.run(execute, context=context)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/python3.11/lib/python3.11/site-packages/airflow/sdk/bases/operator.py", line 416, in wrapper
    return func(self, *args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/python3.11/lib/python3.11/site-packages/airflow/providers/standard/operators/trigger_dagrun.py", line 258, in execute
    self._trigger_dag_af_3(
  File "/opt/python3.11/lib/python3.11/site-packages/airflow/providers/standard/operators/trigger_dagrun.py", line 269, in _trigger_dag_af_3
    raise DagRunTriggerException(
airflow.exceptions.DagRunTriggerException

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/opt/python3.11/lib/python3.11/site-packages/airflow/sdk/execution_time/task_runner.py", line 1450, in main
    state, _, error = run(ti, context, log)
                      ^^^^^^^^^^^^^^^^^^^^^
  File "/opt/python3.11/lib/python3.11/site-packages/airflow/composer/patches/metrics/monkey_patching/airflow_sdk_execution_time_task_runner.py", line 36, in wrapper
    state, msg, error = f(ti, *args, **kwargs)
                        ^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/python3.11/lib/python3.11/site-packages/airflow/sdk/execution_time/task_runner.py", line 940, in run
    msg, state = _handle_trigger_dag_run(drte, context, ti, log)
                 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/python3.11/lib/python3.11/site-packages/airflow/sdk/execution_time/task_runner.py", line 1053, in _handle_trigger_dag_run
    comms_msg = SUPERVISOR_COMMS.send(
                ^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/python3.11/lib/python3.11/site-packages/airflow/sdk/execution_time/comms.py", line 207, in send
    return self._get_response()
           ^^^^^^^^^^^^^^^^^^^^
  File "/opt/python3.11/lib/python3.11/site-packages/airflow/sdk/execution_time/comms.py", line 271, in _get_response
    return self._from_frame(frame)
           ^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/python3.11/lib/python3.11/site-packages/airflow/sdk/execution_time/comms.py", line 258, in _from_frame
    raise AirflowRuntimeError(error=err)
airflow.sdk.exceptions.AirflowRuntimeError: API_SERVER_ERROR: {'status_code': 500, 'message': 'Server returned error', 'detail': {'message': 'Internal server error', 'correlation-id': '002dab06-ee5b-8be7-a321-7b13b5311915'}}

with 500 status code and unclear error message:

{'status_code': 500, 'message': 'Server returned error', 'detail': {'message': 'Internal server error', 'correlation-id': '002dab06-ee5b-8be7-a321-7b13b5311915'}}

raise HTTPException(
status.HTTP_400_BAD_REQUEST,
detail={
"reason": "value_error",
"message": str(e),
},
) from e


@router.post(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

from __future__ import annotations

import re

import pytest
import time_machine
from fastapi import Request
Expand Down Expand Up @@ -260,6 +262,31 @@ async def auth_as_parent_ti(request: Request) -> TIToken:
child_run = session.scalars(select(DagRun).where(DagRun.run_id == child_run_id)).one()
assert child_run.triggering_user_name == parent_triggering_user_name

def test_trigger_dag_run_value_error(self, client, session, dag_maker):
"""Test that error is raised when a DAG Run has ValueError."""

dag_id = "test_trigger_dag_run_value_error"
run_id = "manual__{test_run_id}"
logical_date = timezone.datetime(2026, 5, 22)

with dag_maker(dag_id=dag_id, session=session, serialized=True):
EmptyOperator(task_id="test_task")

session.commit()

response = client.post(
f"/execution/dag-runs/{dag_id}/{run_id}",
json={"logical_date": logical_date.isoformat()},
)

detail_message = response.json()["detail"]["message"]
detail_reason = response.json()["detail"]["reason"]

pattern = r"The run_id provided '.*' does not match regex pattern '.*' or '.*'"
assert re.search(pattern, detail_message)
assert detail_reason == "value_error"
assert response.status_code == 400


class TestDagRunClear:
def setup_method(self):
Expand Down
Loading