Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add on_skipped_callback in to BaseOperator #36374

Merged
merged 7 commits into from Jan 14, 2024

Conversation

romsharon98
Copy link
Collaborator

@romsharon98 romsharon98 commented Dec 22, 2023

related: #35936


^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in newsfragments.

@romsharon98 romsharon98 marked this pull request as draft December 22, 2023 20:30
@eladkal eladkal changed the title draft: Add on skipped callback Add on_skipped_callback to BaseOperator Dec 22, 2023
@romsharon98 romsharon98 changed the title Add on_skipped_callback to BaseOperator Add on_skipped_callback Dec 22, 2023
@romsharon98 romsharon98 changed the title Add on_skipped_callback Add on_skipped_callback in to BaseOperator Dec 22, 2023
Copy link
Member

@hussein-awala hussein-awala left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The trigger rule dep could change the TI state to skipped without raising this exception, I wonder if we should handle that case too 🤔

changed = ti.set_state(new_state, session)

I'm not sure if we handle that for other callbacks

Copy link
Collaborator

@dirrao dirrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have seen a scenario where the skipped state is set when branching doesn't follow some of the downstream tasks. Looks like that's not covered.

@eladkal
Copy link
Contributor

eladkal commented Dec 23, 2023

I have seen a scenario where the skipped state is set when branching doesn't follow some of the downstream tasks. Looks like that's not covered.

I'm not sure if we can/should handle it. The goal here is to handle the case when the task is executed and then status changed to skipped due to raise of skip exception. It is not meant to be used with cascading status to downstream tasks (though maybe it's something we should think about). However for the scope of this PR I think we should clarify in docs about this unique case.

@romsharon98 romsharon98 force-pushed the feature/on-skipped-callback branch 2 times, most recently from 777ff5b to 8093aaa Compare December 23, 2023 14:40
@romsharon98
Copy link
Collaborator Author

The trigger rule dep could change the TI state to skipped without raising this exception, I wonder if we should handle that case too 🤔

changed = ti.set_state(new_state, session)

I'm not sure if we handle that for other callbacks

As @eladkal commented, I think it should take care by another PR that call the on_skipped_callback everywhere the task is skipped and not only if this exception is thrown.

@romsharon98 romsharon98 marked this pull request as ready for review December 23, 2023 19:43
@romsharon98 romsharon98 force-pushed the feature/on-skipped-callback branch 2 times, most recently from 35755d3 to 468eec6 Compare December 23, 2023 19:54
@hussein-awala
Copy link
Member

The mentioned issue mentions #30264 as a potential use case, but this PR will not close it, because the running tasks are marked as skipped without raising any skip exception:

if (
dag_run.start_date
and dag.dagrun_timeout
and dag_run.start_date < timezone.utcnow() - dag.dagrun_timeout
):
dag_run.set_state(DagRunState.FAILED)
unfinished_task_instances = session.scalars(
select(TI)
.where(TI.dag_id == dag_run.dag_id)
.where(TI.run_id == dag_run.run_id)
.where(TI.state.in_(State.unfinished))
)
for task_instance in unfinished_task_instances:
task_instance.state = TaskInstanceState.SKIPPED
session.merge(task_instance)
session.flush()

So IMHO, if you want to split implementing the callback on different PRs, you need to update the description by replacing:

closes: #35936

by

related: #35936

of course, with clarifying that in the doc as @eladkal suggested.

@eladkal
Copy link
Contributor

eladkal commented Dec 25, 2023

The mentioned issue mentions #30264 as a potential use case, but this PR will not close it

That was my edit to the description.

If you are OK with it I think we can keep it as close and open a new issue that aim to solve the 2nd use case. Its just easier to track when issue has single well defined task.

@potiuk
Copy link
Member

potiuk commented Dec 26, 2023

If you are OK with it I think we can keep it as close and open a new issue that aim to solve the 2nd use case. Its just easier to track when issue has single well defined task.

I just updated the description to "related: #35936" instead of closes: . That's enough to make the issue related but not close the original issue when merged.

@potiuk potiuk mentioned this pull request Dec 26, 2023
2 tasks
@jscheffl jscheffl added type:new-feature Changelog: New Features area:core labels Jan 2, 2024
@jscheffl jscheffl added this to the Airflow 2.9.0 milestone Jan 2, 2024
@potiuk potiuk merged commit 3eed501 into apache:main Jan 14, 2024
52 checks passed
abhishekbhakat pushed a commit to abhishekbhakat/my_airflow that referenced this pull request Mar 5, 2024

---------

Co-authored-by: Jens Scheffler <95105677+jscheffl@users.noreply.github.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:core type:new-feature Changelog: New Features
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

7 participants