Emit task.queued_duration metric on QUEUED -> RUNNING in Task SDK path#67190
Open
guhyunwoo wants to merge 9 commits into
Open
Emit task.queued_duration metric on QUEUED -> RUNNING in Task SDK path#67190guhyunwoo wants to merge 9 commits into
guhyunwoo wants to merge 9 commits into
Conversation
In Airflow 2 the worker emitted task.queued_duration from
TaskInstance._check_and_change_state_before_execution on the
QUEUED -> RUNNING transition. In Airflow 3 the worker reaches RUNNING
through the Execution API endpoint
PATCH /execution/task-instances/{id}/run, which never called
TaskInstance.emit_state_change_metric — so task.queued_duration stopped
being emitted on every executor while task.scheduled_duration (still
emitted by the scheduler-side path) kept working.
Emit task.queued_duration inline from the run endpoint on the first
QUEUED -> RUNNING transition, preserving the legacy
"only on first try" semantics (skip when end_date is already set from
a previous attempt) and the same tag set (task_id, dag_id, queue).
closes: apache#66067
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
closes: #66067
task.queued_duration(legacy:dag.<dag_id>.<task_id>.queued_duration) stopped being emitted in Airflow 3 regardless of executor (LocalExecutor, CeleryExecutor, KubernetesExecutor). The companion (LocalExecutor, CeleryExecutor, KubernetesExecutor). The companion metrictask.scheduled_durationstill emits.In Airflow 2 the worker emitted
task.queued_durationfromTaskInstance._check_and_change_state_before_execution(airflow-core/src/airflow/models/taskinstance.py:1366) on the QUEUED → RUNNING transition. In Airflow 3 the worker no longer touches the DB directly — it reaches RUNNING through the Execution API endpointPATCH /execution/task-instances/{id}/run(airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py), which never calledTaskInstance.emit_state_change_metric. The companion metric kept working because the scheduler still emitstask.scheduled_durationdirectly fromscheduler_job_runner.py:990when it queues the TI — that path was not affected by the worker/API split.Emit
task.queued_durationinline from the run endpoint on the first QUEUED → RUNNING transition. The legacyTaskInstance.emit_state_change_metricis not reused here because the handler operates on a partial-columnRow(not a TI ORM instance) and the update later resetsend_datetoNone, which would defeat the method's "only on first try" guard for retries. Mirroring the guard inline (end_date is None and queued_dttm is not None) preserves the legacy semantics — including not emitting on retries — and uses the same tag set (task_id,dag_id,queue).Two regression tests in
airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py:test_ti_run_emits_queued_duration_metric— first transition emits with the expected metric name, timing (now - queued_dttm), and tags.test_ti_run_skips_queued_duration_metric_on_retry— a retry (previous attempt'send_datepopulated) does not emit, matching legacy behavior.Was generative AI tooling used to co-author this PR?
{pr_number}.significant.rst, in airflow-core/newsfragments. You can add this file in a follow-up commit after the PR is created so you know the PR number.