diff --git a/airflow-core/src/airflow/api_fastapi/execution_api/datamodels/taskinstance.py b/airflow-core/src/airflow/api_fastapi/execution_api/datamodels/taskinstance.py index 395f8281d69eb..e79dc16730510 100644 --- a/airflow-core/src/airflow/api_fastapi/execution_api/datamodels/taskinstance.py +++ b/airflow-core/src/airflow/api_fastapi/execution_api/datamodels/taskinstance.py @@ -335,8 +335,11 @@ def safe_extract_from_orm(cls, data: Any) -> Any: for field_name in cls.model_fields: if field_name in insp.dict: values[field_name] = insp.dict[field_name] - elif field_name == "state" and "_state" in insp.dict: - values["state"] = insp.dict["_state"] + elif field_name == "state": + if "_state" in insp.dict: + values["state"] = insp.dict["_state"] + elif not insp.detached and (state_val := data._state) is not None: + values["state"] = state_val if "consumed_asset_events" not in values: values["consumed_asset_events"] = [] diff --git a/airflow-core/tests/unit/cli/commands/test_task_command.py b/airflow-core/tests/unit/cli/commands/test_task_command.py index b66384d5ad426..f39a8409bc243 100644 --- a/airflow-core/tests/unit/cli/commands/test_task_command.py +++ b/airflow-core/tests/unit/cli/commands/test_task_command.py @@ -307,6 +307,23 @@ def test_task_render_handles_detached_dagrun(self, dag_maker, session): with redirect_stdout(io.StringIO()): task_command.task_render(args) + @pytest.mark.db_test + def test_task_render_handles_expired_dagrun(self, dag_maker, session): + """Test that model_validate extracts state from an expired DagRun instance.""" + from airflow.api_fastapi.execution_api.datamodels.taskinstance import DagRun as DagRunPydantic + from airflow.utils.state import DagRunState + + with dag_maker(dag_id="test_expired", session=session): + pass + + dr = dag_maker.create_dagrun(state=DagRunState.RUNNING) + session.commit() + # After commit, SQLAlchemy expires all attributes — _state is no longer in insp.dict + # but the instance is still attached, so direct access triggers a lazy reload. + + pydantic_dr = DagRunPydantic.model_validate(dr) + assert pydantic_dr.state == DagRunState.RUNNING + @pytest.mark.usefixtures("testing_dag_bundle") def test_mapped_task_render(self): """