Fix missing task.duration metric for tasks that get skipped after starting.#67943
Open
myps6415 wants to merge 1 commit into
Open
Fix missing task.duration metric for tasks that get skipped after starting.#67943myps6415 wants to merge 1 commit into
myps6415 wants to merge 1 commit into
Conversation
task.duration (and its registry-derived legacy name dag.<dag_id>.<task_id>.duration) was emitted for tasks ending in SUCCESS or FAILED but missing for SKIPPED tasks, despite being documented as available for all terminal states. The metric is emitted from finalize() in the Task SDK task_runner, gated on `if ti.start_date and ti.end_date:`. The SUCCESS and FAILED exception handlers set ti.end_date on the local RuntimeTaskInstance before constructing the outbound TaskState message, so the guard passes. The AirflowSkipException handler (and the SKIPPED branch of DagRunTriggerException with skip_when_already_exists=True) set end_date only on the outbound TaskState message, leaving ti.end_date as None — so finalize()'s guard failed and the metric was never emitted for skipped tasks. Set ti.end_date on the local instance in both SKIPPED handlers, mirroring the AirflowFailException and _handle_current_task_success patterns. The TaskState message references ti.end_date to keep the local instance and the outbound message in sync.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
task.duration(and its registry-derived legacy namedag.<dag_id>.<task_id>.duration) was emitted for tasks ending inSUCCESSorFAILEDbut missing entirely forSKIPPEDtasks, despite the metric being documented as available for all terminal states. Reproducible withShortCircuitOperator,BranchPythonOperator, andBashOperatorwithskip_on_exit_code(narrowed in #61849 by @shivaam).Fix
The metric is emitted from
finalize()in the Task SDKtask_runner, gated onif ti.start_date and ti.end_date:. TheSUCCESSandFAILEDexception handlers setti.end_dateon the localRuntimeTaskInstancebefore constructing the outboundTaskStatemessage, so the guard passes. The twoSKIPPEDhandlers (AirflowSkipException, and theSKIPPEDbranch ofDagRunTriggerExceptionwithskip_when_already_exists=True) setend_dateonly on the outboundTaskStatemessage, leavingti.end_dateasNone— sofinalize()'s guard failed for skipped tasks.Set
ti.end_dateon the local instance in both handlers, mirroring theAirflowFailExceptionand_handle_current_task_successpatterns. TheTaskStatemessage referencesti.end_dateto keep the local instance and the outbound message in sync.Test plan
test_task_duration_metric_emitted_for_terminal_statescoverssuccess/skipped/failed— verifiesstats.timing("task.duration", ...)is called with the correct tags and that the legacy dotted form is also emitted via themetrics_template.yamlregistry.[skipped]parametrize case fails without the fix and passes with it;[success]and[failed]pass in both cases.test_run_basic_skipped,test_task_runner_calls_listeners_skipped, etc.) still pass — no regression.ruff format/ruff check/mypy-task-sdk/prek run --from-ref upstream/main --stage pre-commitall green.Note:
test_handle_trigger_dag_run_conflict[True-skipped]was already failing onupstream/maindue to a mock-assertion mismatch (the test usesmock.call.send(msg=TriggerDagRun(...))as kwarg but_handle_trigger_dag_runcallsSUPERVISOR_COMMS.send(TriggerDagRun(...))positionally). Unrelated to this PR.closes: #61849
Was generative AI tooling used to co-author this PR?
Generated-by: Claude Code (Opus 4.7) following the guidelines