Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -335,8 +335,11 @@ def safe_extract_from_orm(cls, data: Any) -> Any:
for field_name in cls.model_fields:
if field_name in insp.dict:
values[field_name] = insp.dict[field_name]
elif field_name == "state" and "_state" in insp.dict:
values["state"] = insp.dict["_state"]
elif field_name == "state":
if "_state" in insp.dict:
values["state"] = insp.dict["_state"]
elif not insp.detached and (state_val := data._state) is not None:
values["state"] = state_val

if "consumed_asset_events" not in values:
values["consumed_asset_events"] = []
Expand Down
17 changes: 17 additions & 0 deletions airflow-core/tests/unit/cli/commands/test_task_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,23 @@ def test_task_render_handles_detached_dagrun(self, dag_maker, session):
with redirect_stdout(io.StringIO()):
task_command.task_render(args)

@pytest.mark.db_test
def test_task_render_handles_expired_dagrun(self, dag_maker, session):
"""Test that model_validate extracts state from an expired DagRun instance."""
from airflow.api_fastapi.execution_api.datamodels.taskinstance import DagRun as DagRunPydantic
from airflow.utils.state import DagRunState

with dag_maker(dag_id="test_expired", session=session):
pass

dr = dag_maker.create_dagrun(state=DagRunState.RUNNING)
session.commit()
# After commit, SQLAlchemy expires all attributes — _state is no longer in insp.dict
# but the instance is still attached, so direct access triggers a lazy reload.

pydantic_dr = DagRunPydantic.model_validate(dr)
assert pydantic_dr.state == DagRunState.RUNNING

@pytest.mark.usefixtures("testing_dag_bundle")
def test_mapped_task_render(self):
"""
Expand Down
Loading