Add duration, queued_dttm, and start_date to RuntimeTaskInstance for listener hooks#64600
Add duration, queued_dttm, and start_date to RuntimeTaskInstance for listener hooks#64600dandanseo123 wants to merge 1 commit intoapache:mainfrom
Conversation
There was a problem hiding this comment.
Pull request overview
This PR aims to restore AF2-era listener-hook compatibility in AF3 by exposing task-level runtime timing fields on RuntimeTaskInstance, specifically duration (computed) and queued_dttm (propagated from the Execution API).
Changes:
- Add
RuntimeTaskInstance.durationas a computed property based onstart_date/end_dateand extendRuntimeTaskInstanceProtocolaccordingly. - Add
queued_dttmtoRuntimeTaskInstanceand flow it from the Execution APIti_runresponse (TaskInstance.queued_dttm→TIRunContext→StartupDetails→parse()). - Add/extend unit tests for both Task SDK runtime objects and the Execution API response payload.
Reviewed changes
Copilot reviewed 7 out of 7 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
| task-sdk/src/airflow/sdk/execution_time/task_runner.py | Adds queued_dttm, adds computed duration, and threads queued_dttm into parse(); refactors template-context caching. |
| task-sdk/src/airflow/sdk/types.py | Extends RuntimeTaskInstanceProtocol with queued_dttm and duration. |
| task-sdk/src/airflow/sdk/api/datamodels/_generated.py | Adds queued_dttm to generated TIRunContext SDK model. |
| task-sdk/tests/task_sdk/execution_time/test_task_runner.py | Adds Task SDK tests for duration and queued_dttm. |
| airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py | Includes queued_dttm in the ti_run query and conditionally sets it on TIRunContext. |
| airflow-core/src/airflow/api_fastapi/execution_api/datamodels/taskinstance.py | Adds queued_dttm field to server-side TIRunContext. |
| airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py | Adds tests asserting queued_dttm inclusion/omission in ti_run responses. |
Comments suppressed due to low confidence (1)
task-sdk/src/airflow/sdk/api/datamodels/_generated.py:666
task-sdk/src/airflow/sdk/api/datamodels/_generated.pyis generated (see header). Addingqueued_dttmhere should come from regenerating the SDK models from the updated Execution API OpenAPI schema (after introducing the schema change via a new Cadwyn version). Otherwise it’s easy for this file to drift from the actual server schema/version.
Suggested fix: regenerate the Task SDK datamodels via the repo’s OpenAPI/datamodel-codegen workflow, rather than hand-editing this generated module, and ensure the SDK’s API_VERSION aligns with the new server version that introduces queued_dttm.
next_kwargs: Annotated[dict[str, Any] | str | None, Field(title="Next Kwargs")] = None
xcom_keys_to_clear: Annotated[list[str] | None, Field(title="Xcom Keys To Clear")] = None
should_retry: Annotated[bool | None, Field(title="Should Retry")] = False
queued_dttm: Annotated[AwareDatetime | None, Field(title="Queued Dttm")] = None
airflow-core/src/airflow/api_fastapi/execution_api/datamodels/taskinstance.py
Show resolved
Hide resolved
|
@ashb @kaxil @amoghrajesh would appreciate a review when you get a chance. Thanks! |
|
@dandanseo123 This PR has been converted to draft because it does not yet meet our Pull Request quality criteria. Issues found:
What to do next:
Converting a PR to draft is not a rejection — it is an invitation to bring the PR up to the project's standards so that maintainer review time is spent productively. There is no rush — take your time and work at your own pace. We appreciate your contribution and are happy to wait for updates. If you have questions, feel free to ask on the Airflow Slack. |
af4167b to
19c3dc0
Compare
30519a3 to
78ab502
Compare
78ab502 to
a2cd22d
Compare
6f4d86a to
619bfd8
Compare
619bfd8 to
b7349b2
Compare
Description:
In AF2, listener hooks (
on_task_instance_success/failed) received the full ORMTaskInstancemodel, which includesduration,queued_dttm, andstart_date. In AF3, these hooks receiveRuntimeTaskInstance— a minimal runtime object missing these fields. Plugins relying on them for metrics break when migrating to AF3.Additionally, DAG-level hooks (
on_dag_run_*) already expose queue time viaDagRun.queued_at, but task-level hooks do not expose the equivalentqueued_dttm. This is inconsistent across the listener API.Changes
duration: added as a computed property onRuntimeTaskInstance:(end_date - start_date).total_seconds(). This matches the ORM'sset_duration()formula.queued_dttm: Flows from DB →TIRunContext→RuntimeTaskInstancevia the Execution APIti_runendpoint.durationis computed correctly for deferred tasks (previously used the supervisor process start time, under-reporting duration).All three fields added to RuntimeTaskInstanceProtocol.
Use case
Listener plugins that publish task execution metrics need
duration(how long the task ran) andqueued_dttm(how long it waited in queue before starting). Without these fields, plugin authors must either query the DB directly from the listener hook or accept missing metrics, neither is ideal.Generated-by: Claude Code following the guidelines