From 3353f9ddf272a12682e2555bac0e63f9baa6981e Mon Sep 17 00:00:00 2001 From: Deepak Kumar Date: Sun, 24 May 2026 14:50:31 -0700 Subject: [PATCH] API: Return 400 instead of 500 from materialize_asset for invalid validation input The materialize_asset POST endpoint passed user input (dag_run_id, logical_date/data_interval pairing, partition_key) straight through to MaterializeAssetBody.validate_context() and dag.create_dagrun(), which raise ValueError / ParamValidationError on invalid input (e.g. dag_run_id containing '..', invalid partition_key type, logical_date/data_interval mismatch). Those exceptions escaped uncaught and were returned as 500 Internal Server Error, even though the route's OpenAPI spec already documents 400 for this case. Wrap the validate_context / create_dagrun calls in try/except and re-raise as HTTPException(400) with the validator's message in the detail, matching the sibling pattern in dag_run.trigger_dag_run. Regression test asserts dag_run_id='bad..id' returns 400, not 500. --- .../core_api/routes/public/assets.py | 34 +++++++++++-------- .../core_api/routes/public/test_assets.py | 13 +++++++ 2 files changed, 32 insertions(+), 15 deletions(-) diff --git a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/assets.py b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/assets.py index db306e189e3b4..5e3afa8b3c00a 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/assets.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/assets.py @@ -73,6 +73,7 @@ from airflow.api_fastapi.logging.decorators import action_logging from airflow.assets.manager import asset_manager from airflow.configuration import conf +from airflow.exceptions import ParamValidationError from airflow.models.asset import ( AssetAliasModel, AssetDagRunQueue, @@ -443,21 +444,24 @@ def materialize_asset( f"Dag with dag_id: '{dag_id}' does not allow asset materialization runs", ) - params = (body or MaterializeAssetBody()).validate_context(dag) - return dag.create_dagrun( - run_id=params["run_id"], - logical_date=params["logical_date"], - data_interval=params["data_interval"], - run_after=params["run_after"], - conf=params["conf"], - run_type=DagRunType.ASSET_MATERIALIZATION, - triggered_by=DagRunTriggeredByType.REST_API, - triggering_user_name=user.get_name(), - state=DagRunState.QUEUED, - partition_key=params["partition_key"], - note=params["note"], - session=session, - ) + try: + params = (body or MaterializeAssetBody()).validate_context(dag) + return dag.create_dagrun( + run_id=params["run_id"], + logical_date=params["logical_date"], + data_interval=params["data_interval"], + run_after=params["run_after"], + conf=params["conf"], + run_type=DagRunType.ASSET_MATERIALIZATION, + triggered_by=DagRunTriggeredByType.REST_API, + triggering_user_name=user.get_name(), + state=DagRunState.QUEUED, + partition_key=params["partition_key"], + note=params["note"], + session=session, + ) + except (ParamValidationError, ValueError) as e: + raise HTTPException(status.HTTP_400_BAD_REQUEST, str(e)) from e @assets_router.get( diff --git a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_assets.py b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_assets.py index 45ae5cbf61a11..298949fce1619 100644 --- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_assets.py +++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_assets.py @@ -1609,6 +1609,19 @@ def test_should_respond_403_when_user_cannot_trigger_dag(self, test_client): user=mock.ANY, ) + @pytest.mark.usefixtures("configure_git_connection_for_dag_bundle") + def test_should_respond_400_on_invalid_dag_run_id(self, test_client): + """A dag_run_id containing '..' triggers ValueError in DagRun.validate_run_id. + + It must surface as 400 BAD_REQUEST, not 500 INTERNAL_SERVER_ERROR. + """ + response = test_client.post( + "/assets/1/materialize", + json={"dag_run_id": "bad..id"}, + ) + assert response.status_code == 400 + assert "must not contain '..'" in response.json()["detail"] + class TestGetAssetQueuedEvents(TestQueuedEventEndpoint): @pytest.mark.usefixtures("time_freezer")