-
Notifications
You must be signed in to change notification settings - Fork 16.3k
Add on_task_instance_skipped listener hookspec #59467
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
kacpermuda
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, this will definitely help listeners to correctly "see" how the task finished it's execution, after they receive an information about task start. Left a few suggestions to improve the docstring.
Co-authored-by: Kacper Muda <mudakacper@gmail.com>
|
Awesome work, congrats on your first merged pull request! You are invited to check our Issue Tracker for additional contributions. |
Co-authored-by: Kacper Muda <mudakacper@gmail.com>
Co-authored-by: Kacper Muda <mudakacper@gmail.com>
Summary
This PR adds a new listener hookspec
on_task_instance_skippedthat is called when a task raisesAirflowSkipException.Related issue: closes #59370
Motivation
The current listener interface provides hooks for:
on_task_instance_running- when a task startson_task_instance_success- when a task succeedson_task_instance_failed- when a task failsBut there is no hook for when a task is skipped via
AirflowSkipException. This gap prevents listener implementations from handling self-skipping tasks.Scope & Limitations
This hook does NOT cover tasks skipped by:
BranchPythonOperator(tasks not in the selected branch)ShortCircuitOperatorFor comprehensive skip tracking across all skip types, use DAG-level events (
on_dag_run_success/on_dag_run_failed) which includeAirflowStateRunFacetwith the completetasksStatemap.Changes
airflow-core/src/airflow/listeners/spec/taskinstance.pyon_task_instance_skippedhookspectask-sdk/src/airflow/sdk/execution_time/task_runner.pyfinalize()when state is SKIPPEDtask-sdk/tests/task_sdk/execution_time/test_task_runner.pyFollow-up
OpenLineage provider implementation will be submitted in a separate PR after this is merged.
Testing
test_task_runner_calls_listeners_skippedthat verifies the listener is called when a task raisesAirflowSkipException^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named
{pr_number}.significant.rstor{issue_number}.significant.rst, in airflow-core/newsfragments.