diff --git a/airflow-core/newsfragments/67372.bugfix.rst b/airflow-core/newsfragments/67372.bugfix.rst new file mode 100644 index 0000000000000..3cd5c2f6368a0 --- /dev/null +++ b/airflow-core/newsfragments/67372.bugfix.rst @@ -0,0 +1 @@ +Fix TaskInstanceHistory missing host/metadata when task is retried from a non-RUNNING state (e.g. killed externally or failed in queued/deferred status) diff --git a/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py b/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py index c2d67926a21e8..340ff5fa891f9 100644 --- a/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py +++ b/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py @@ -570,6 +570,10 @@ def _create_ti_state_update_query_and_update_state( _handle_fail_fast_for_dag(ti=ti, dag_id=dag_id, session=session, dag_bag=dag_bag) elif isinstance(ti_patch_payload, TIRetryStatePayload): if ti is not None: + ti.end_date = ti_patch_payload.end_date + ti.set_duration() + if ti_patch_payload.rendered_map_index is not None: + ti._rendered_map_index = ti_patch_payload.rendered_map_index ti.prepare_db_for_next_try(session) # Store retry policy overrides so next_retry_datetime() can read them. # These are cleared when the task enters RUNNING (ti_run). diff --git a/airflow-core/src/airflow/models/taskinstancehistory.py b/airflow-core/src/airflow/models/taskinstancehistory.py index 4989b949a523a..9c976bca2d097 100644 --- a/airflow-core/src/airflow/models/taskinstancehistory.py +++ b/airflow-core/src/airflow/models/taskinstancehistory.py @@ -211,7 +211,7 @@ def record_ti(ti: TaskInstance, session: Session = NEW_SESSION) -> None: ti_history_state = ti.state if ti.state not in State.finished: ti_history_state = TaskInstanceState.FAILED - ti.end_date = timezone.utcnow() + ti.end_date = ti.end_date or timezone.utcnow() ti.set_duration() ti_history = TaskInstanceHistory(ti, state=ti_history_state) session.add(ti_history) diff --git a/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py b/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py index 2d15e548a13a4..89338309952a6 100644 --- a/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py +++ b/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py @@ -1616,7 +1616,9 @@ def test_ti_update_state_handle_retry(self, client, session, create_task_instanc ti = create_task_instance( task_id="test_ti_update_state_to_retry", state=State.RUNNING, + hostname="random-hostname", ) + ti.start_date = DEFAULT_START_DATE session.commit() response = client.patch( @@ -1624,6 +1626,7 @@ def test_ti_update_state_handle_retry(self, client, session, create_task_instanc json={ "state": State.UP_FOR_RETRY, "end_date": DEFAULT_END_DATE.isoformat(), + "rendered_map_index": "retry_label_abc", }, ) @@ -1645,6 +1648,12 @@ def test_ti_update_state_handle_retry(self, client, session, create_task_instanc ).one() assert tih.task_instance_id assert tih.task_instance_id != ti.id + assert tih.state == State.FAILED + assert tih.hostname == "random-hostname" + assert tih.start_date == DEFAULT_START_DATE + assert tih.end_date == DEFAULT_END_DATE + assert tih.duration == 3600 + assert tih.rendered_map_index == "retry_label_abc" def test_ti_update_state_retry_with_policy_overrides(self, client, session, create_task_instance): """Test that retry_delay_seconds and retry_reason from a RetryPolicy are stored on the TI.""" diff --git a/airflow-core/tests/unit/jobs/test_scheduler_job.py b/airflow-core/tests/unit/jobs/test_scheduler_job.py index 3580136ad70b1..7ddc0f567f995 100644 --- a/airflow-core/tests/unit/jobs/test_scheduler_job.py +++ b/airflow-core/tests/unit/jobs/test_scheduler_job.py @@ -4862,6 +4862,8 @@ def test_adopt_or_reset_resettable_tasks(self, dag_maker, adoptable_state, sessi ti = dr1.get_task_instances(session=session)[0] ti.state = adoptable_state ti.queued_by_job_id = old_job.id + ti.hostname = "random-hostname" + ti.start_date = DEFAULT_DATE old_ti_id = ti.id old_try_number = ti.try_number session.merge(ti) @@ -4873,19 +4875,18 @@ def test_adopt_or_reset_resettable_tasks(self, dag_maker, adoptable_state, sessi ti.refresh_from_db(session=session) assert ti.id != old_ti_id - assert ( - session.scalar( - select(TaskInstanceHistory).where( - TaskInstanceHistory.dag_id == ti.dag_id, - TaskInstanceHistory.task_id == ti.task_id, - TaskInstanceHistory.run_id == ti.run_id, - TaskInstanceHistory.map_index == ti.map_index, - TaskInstanceHistory.try_number == old_try_number, - TaskInstanceHistory.task_instance_id == old_ti_id, - ) + tih = session.scalar( + select(TaskInstanceHistory).where( + TaskInstanceHistory.dag_id == ti.dag_id, + TaskInstanceHistory.task_id == ti.task_id, + TaskInstanceHistory.run_id == ti.run_id, + TaskInstanceHistory.map_index == ti.map_index, + TaskInstanceHistory.try_number == old_try_number, + TaskInstanceHistory.task_instance_id == old_ti_id, ) - is not None ) + assert tih is not None + assert tih.hostname == "random-hostname" def test_adopt_or_reset_orphaned_tasks_external_triggered_dag(self, dag_maker, session): dag_id = "test_reset_orphaned_tasks_external_triggered_dag"