Skip to content

Preserve sensor start_date when scheduler advances reschedule to queued#66790

Open
1fanwang wants to merge 1 commit into
apache:mainfrom
1fanwang:fix/reschedule-start-date-multi-scheduler
Open

Preserve sensor start_date when scheduler advances reschedule to queued#66790
1fanwang wants to merge 1 commit into
apache:mainfrom
1fanwang:fix/reschedule-start-date-multi-scheduler

Conversation

@1fanwang
Copy link
Copy Markdown
Contributor

@1fanwang 1fanwang commented May 12, 2026

Fix for the metric skew called out in #66784. On our LinkedIn DI cluster every poke of a reschedule-mode sensor resets start_date to utcnow(), which inflates first_task_scheduling_delay by the cumulative wait of all reschedules. Our scheduling-delay dashboards skew high enough that they mask real scheduling delay. This PR preserves the original start_date across reschedule → queued transitions.

Problem

A sensor in mode="reschedule" loses its first-poke start_date on every re-execution. The supervisor sends start_date=utcnow() as part of the ti_run execution-API payload on each poke, and the endpoint writes that value through unconditionally. Net effect: dagrun.first_task_scheduling_delay (computed from start_date - queued_at) collapses to ~0 for any DAG fronted by a reschedule-mode sensor, even when the sensor waited minutes or hours.

A guard already exists for deferred tasks at airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py:180-182:

if ti.next_kwargs:
    data.pop("start_date")

There is no equivalent for rescheduled tasks. A legacy guard exists in _check_and_change_state_before_execution (airflow-core/src/airflow/models/taskinstance.py:1315) but is gated on ti.state == UP_FOR_RESCHEDULE, which never holds at the time the worker runs the check: the scheduler transitions UP_FOR_RESCHEDULE -> QUEUED before the worker picks up the task, so refresh_from_db returns QUEUED and the lookup is skipped.

Fix

Two changes:

  1. task_instances.py::ti_run (production path) — When start_date is present in the payload and the task has prior TaskReschedule rows, restore start_date from the first row instead of accepting utcnow(). Mirror the same value into context.start_date on the response so the supervisor pins context["ti"].start_date to the first poke as well.

  2. taskinstance.py::_check_and_change_state_before_execution (legacy / test-utility path) — Drop the ti.state == UP_FOR_RESCHEDULE gate. The lookup is scoped by ti.id, and prepare_db_for_next_try clears TaskReschedule rows and rotates ti.id on each retry, so try-number scoping is implicit and the query is harmless for non-rescheduled tasks.

Why try_number scoping is implicit

prepare_db_for_next_try (airflow-core/src/airflow/models/taskinstance.py:973-979):

def prepare_db_for_next_try(self, session: Session):
    ...
    session.execute(delete(TaskReschedule).filter_by(ti_id=self.id))
    self.id = uuid7()

On every retry, all TaskReschedule rows for the previous try's ti.id are deleted and the TI gets a fresh UUID. So rows with ti_id == current task_instance_id always belong to the current try. No additional try_number filter is needed.

Reproducer

from airflow.providers.standard.sensors.python import PythonSensor

PythonSensor(
    task_id="poll",
    mode="reschedule",
    poke_interval=10,
    timeout=300,
    python_callable=lambda: False,  # forces reschedule until timeout
)

Before the fix, ti.start_date advances by ~poke_interval on every reschedule. After the fix, ti.start_date stays pinned at the first-poke value and dagrun.first_task_scheduling_delay reflects the actual wait.

Tests

  • airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py::TestTIRunState::test_ti_run_restores_start_date_for_rescheduled_task -- the API path restores start_date from TaskReschedule on a subsequent poke
  • airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py::TestTIRunState::test_ti_run_uses_payload_start_date_when_no_reschedule_rows -- non-rescheduled tasks preserve the payload value
  • airflow-core/tests/unit/models/test_taskinstance.py::TestTaskInstance::test_check_and_change_state_before_execution_restores_reschedule_start_date -- the legacy method restores start_date even when the scheduler has already advanced state to QUEUED

Prior effort

Same fix was attempted earlier in #64816 by @peachchen0716 — converged on the same two changes (reschedule guard in ti_run + drop the UP_FOR_RESCHEDULE gate in _check_and_change_state_before_execution). That PR was converted to draft on 2026-04-22 over merge conflicts and unresolved review comments, and auto-closed on 2026-05-05 after author inactivity. This PR uses the existing TR.stmt_for_task_instance helper in place of the hand-rolled select there, and mirrors the restored value into context.start_date on the response so the supervisor pins context["ti"].start_date to the first poke as well. peachchen0716 is credited as co-author on the commit.

Closes #66784

@boring-cyborg boring-cyborg Bot added area:API Airflow's REST/HTTP API area:task-sdk labels May 12, 2026
@1fanwang 1fanwang force-pushed the fix/reschedule-start-date-multi-scheduler branch from d89cd36 to 7ac3847 Compare May 13, 2026 18:55
@choo121600 choo121600 added the ready for maintainer review Set after triaging when all criteria pass. label May 15, 2026
When a sensor runs in mode="reschedule", the supervisor sends
start_date=utcnow() on every poke. The ti_run execution-API endpoint
wrote that value through unconditionally, so ti.start_date drifted
forward on each re-execution. dagrun.first_task_scheduling_delay
(computed from start_date - queued_at) collapsed to ~0 for any DAG
fronted by a reschedule-mode sensor.

Add a reschedule guard in ti_run that mirrors the existing
deferral guard: when start_date is present in the payload and the task
has prior TaskReschedule rows, restore start_date from the first row
instead of accepting the supervisor's utcnow(). The lookup is scoped by
ti_id, and prepare_db_for_next_try clears TaskReschedule rows and
rotates ti.id on each retry, so try-number scoping is implicit and no
stale rows leak across tries. Also mirror context.start_date in the
response so the supervisor pins context["ti"].start_date to the first
poke as well.

Also drop the unreliable state guard on the same path in
_check_and_change_state_before_execution. In the normal multi-scheduler
flow the scheduler advances UP_FOR_RESCHEDULE -> QUEUED before the
worker calls this method, so refresh_from_db returns QUEUED and the
guard never fires. The TaskReschedule lookup is harmless for
non-rescheduled tasks (returns no rows), so the guard can go.

Closes apache#66784

Co-authored-by: peachchen0716 <peachchen0716@users.noreply.github.com>

Signed-off-by: 1fanwang <1fannnw@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:API Airflow's REST/HTTP API area:task-sdk ready for maintainer review Set after triaging when all criteria pass.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Sensor reschedule: start_date reset to utcnow() on every poke (inflates first_task_scheduling_delay)

2 participants