Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions airflow-core/newsfragments/67372.bugfix.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix TaskInstanceHistory missing host/metadata when task is retried from a non-RUNNING state (e.g. killed externally or failed in queued/deferred status)
Original file line number Diff line number Diff line change
Expand Up @@ -570,6 +570,10 @@ def _create_ti_state_update_query_and_update_state(
_handle_fail_fast_for_dag(ti=ti, dag_id=dag_id, session=session, dag_bag=dag_bag)
elif isinstance(ti_patch_payload, TIRetryStatePayload):
if ti is not None:
ti.end_date = ti_patch_payload.end_date
ti.set_duration()
if ti_patch_payload.rendered_map_index is not None:
ti._rendered_map_index = ti_patch_payload.rendered_map_index
ti.prepare_db_for_next_try(session)
# Store retry policy overrides so next_retry_datetime() can read them.
# These are cleared when the task enters RUNNING (ti_run).
Expand Down
2 changes: 1 addition & 1 deletion airflow-core/src/airflow/models/taskinstancehistory.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ def record_ti(ti: TaskInstance, session: Session = NEW_SESSION) -> None:
ti_history_state = ti.state
if ti.state not in State.finished:
ti_history_state = TaskInstanceState.FAILED
ti.end_date = timezone.utcnow()
ti.end_date = ti.end_date or timezone.utcnow()
ti.set_duration()
ti_history = TaskInstanceHistory(ti, state=ti_history_state)
session.add(ti_history)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1616,14 +1616,17 @@ def test_ti_update_state_handle_retry(self, client, session, create_task_instanc
ti = create_task_instance(
task_id="test_ti_update_state_to_retry",
state=State.RUNNING,
hostname="random-hostname",
)
ti.start_date = DEFAULT_START_DATE
session.commit()

response = client.patch(
f"/execution/task-instances/{ti.id}/state",
json={
"state": State.UP_FOR_RETRY,
"end_date": DEFAULT_END_DATE.isoformat(),
"rendered_map_index": "retry_label_abc",
},
)

Expand All @@ -1645,6 +1648,12 @@ def test_ti_update_state_handle_retry(self, client, session, create_task_instanc
).one()
assert tih.task_instance_id
assert tih.task_instance_id != ti.id
assert tih.state == State.FAILED
assert tih.hostname == "random-hostname"
assert tih.start_date == DEFAULT_START_DATE
assert tih.end_date == DEFAULT_END_DATE
assert tih.duration == 3600
assert tih.rendered_map_index == "retry_label_abc"

def test_ti_update_state_retry_with_policy_overrides(self, client, session, create_task_instance):
"""Test that retry_delay_seconds and retry_reason from a RetryPolicy are stored on the TI."""
Expand Down
23 changes: 12 additions & 11 deletions airflow-core/tests/unit/jobs/test_scheduler_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -4862,6 +4862,8 @@ def test_adopt_or_reset_resettable_tasks(self, dag_maker, adoptable_state, sessi
ti = dr1.get_task_instances(session=session)[0]
ti.state = adoptable_state
ti.queued_by_job_id = old_job.id
ti.hostname = "random-hostname"
ti.start_date = DEFAULT_DATE
old_ti_id = ti.id
old_try_number = ti.try_number
session.merge(ti)
Expand All @@ -4873,19 +4875,18 @@ def test_adopt_or_reset_resettable_tasks(self, dag_maker, adoptable_state, sessi

ti.refresh_from_db(session=session)
assert ti.id != old_ti_id
assert (
session.scalar(
select(TaskInstanceHistory).where(
TaskInstanceHistory.dag_id == ti.dag_id,
TaskInstanceHistory.task_id == ti.task_id,
TaskInstanceHistory.run_id == ti.run_id,
TaskInstanceHistory.map_index == ti.map_index,
TaskInstanceHistory.try_number == old_try_number,
TaskInstanceHistory.task_instance_id == old_ti_id,
)
tih = session.scalar(
select(TaskInstanceHistory).where(
TaskInstanceHistory.dag_id == ti.dag_id,
TaskInstanceHistory.task_id == ti.task_id,
TaskInstanceHistory.run_id == ti.run_id,
TaskInstanceHistory.map_index == ti.map_index,
TaskInstanceHistory.try_number == old_try_number,
TaskInstanceHistory.task_instance_id == old_ti_id,
)
is not None
)
assert tih is not None
assert tih.hostname == "random-hostname"

def test_adopt_or_reset_orphaned_tasks_external_triggered_dag(self, dag_maker, session):
dag_id = "test_reset_orphaned_tasks_external_triggered_dag"
Expand Down
Loading