diff --git a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/assets.py b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/assets.py index db306e189e3b4..5e3afa8b3c00a 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/assets.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/assets.py @@ -73,6 +73,7 @@ from airflow.api_fastapi.logging.decorators import action_logging from airflow.assets.manager import asset_manager from airflow.configuration import conf +from airflow.exceptions import ParamValidationError from airflow.models.asset import ( AssetAliasModel, AssetDagRunQueue, @@ -443,21 +444,24 @@ def materialize_asset( f"Dag with dag_id: '{dag_id}' does not allow asset materialization runs", ) - params = (body or MaterializeAssetBody()).validate_context(dag) - return dag.create_dagrun( - run_id=params["run_id"], - logical_date=params["logical_date"], - data_interval=params["data_interval"], - run_after=params["run_after"], - conf=params["conf"], - run_type=DagRunType.ASSET_MATERIALIZATION, - triggered_by=DagRunTriggeredByType.REST_API, - triggering_user_name=user.get_name(), - state=DagRunState.QUEUED, - partition_key=params["partition_key"], - note=params["note"], - session=session, - ) + try: + params = (body or MaterializeAssetBody()).validate_context(dag) + return dag.create_dagrun( + run_id=params["run_id"], + logical_date=params["logical_date"], + data_interval=params["data_interval"], + run_after=params["run_after"], + conf=params["conf"], + run_type=DagRunType.ASSET_MATERIALIZATION, + triggered_by=DagRunTriggeredByType.REST_API, + triggering_user_name=user.get_name(), + state=DagRunState.QUEUED, + partition_key=params["partition_key"], + note=params["note"], + session=session, + ) + except (ParamValidationError, ValueError) as e: + raise HTTPException(status.HTTP_400_BAD_REQUEST, str(e)) from e @assets_router.get( diff --git a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_assets.py b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_assets.py index 45ae5cbf61a11..298949fce1619 100644 --- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_assets.py +++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_assets.py @@ -1609,6 +1609,19 @@ def test_should_respond_403_when_user_cannot_trigger_dag(self, test_client): user=mock.ANY, ) + @pytest.mark.usefixtures("configure_git_connection_for_dag_bundle") + def test_should_respond_400_on_invalid_dag_run_id(self, test_client): + """A dag_run_id containing '..' triggers ValueError in DagRun.validate_run_id. + + It must surface as 400 BAD_REQUEST, not 500 INTERNAL_SERVER_ERROR. + """ + response = test_client.post( + "/assets/1/materialize", + json={"dag_run_id": "bad..id"}, + ) + assert response.status_code == 400 + assert "must not contain '..'" in response.json()["detail"] + class TestGetAssetQueuedEvents(TestQueuedEventEndpoint): @pytest.mark.usefixtures("time_freezer")