-
Notifications
You must be signed in to change notification settings - Fork 13.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Clearing ti.next_method and ti.next_kwargs on task finish. #19183
Clearing ti.next_method and ti.next_kwargs on task finish. #19183
Conversation
…_kwargs when a task finishes running.
Congratulations on your first Pull Request and welcome to the Apache Airflow community! If you have any issues or are unsure about any anything please check our Contribution Guide (https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst)
|
|
def failure(): | ||
raise AirflowException | ||
|
||
def skip(): | ||
raise AirflowSkipException | ||
|
||
def success(): | ||
return None | ||
|
||
def reschedule(): | ||
reschedule_date = timezone.utcnow() | ||
raise AirflowRescheduleException(reschedule_date) | ||
|
||
_retries = 0 | ||
_retry_delay = datetime.timedelta(seconds=0) | ||
|
||
if state == State.FAILED: | ||
_python_callable = failure | ||
elif state == State.SKIPPED: | ||
_python_callable = skip | ||
elif state == State.SUCCESS: | ||
_python_callable = success | ||
elif state == State.UP_FOR_RESCHEDULE: | ||
_python_callable = reschedule | ||
elif state in [State.FAILED, State.UP_FOR_RETRY]: | ||
_python_callable = failure | ||
_retries = 1 | ||
_retry_delay = datetime.timedelta(seconds=2) | ||
|
||
with dag_maker("test_deferred_method_clear"): | ||
task = PythonOperator( | ||
task_id="test_deferred_method_clear_task", | ||
python_callable=_python_callable, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
def failure(): | |
raise AirflowException | |
def skip(): | |
raise AirflowSkipException | |
def success(): | |
return None | |
def reschedule(): | |
reschedule_date = timezone.utcnow() | |
raise AirflowRescheduleException(reschedule_date) | |
_retries = 0 | |
_retry_delay = datetime.timedelta(seconds=0) | |
if state == State.FAILED: | |
_python_callable = failure | |
elif state == State.SKIPPED: | |
_python_callable = skip | |
elif state == State.SUCCESS: | |
_python_callable = success | |
elif state == State.UP_FOR_RESCHEDULE: | |
_python_callable = reschedule | |
elif state in [State.FAILED, State.UP_FOR_RETRY]: | |
_python_callable = failure | |
_retries = 1 | |
_retry_delay = datetime.timedelta(seconds=2) | |
with dag_maker("test_deferred_method_clear"): | |
task = PythonOperator( | |
task_id="test_deferred_method_clear_task", | |
python_callable=_python_callable, | |
def run(state): | |
if state == State.FAILED: | |
raise AirflowException | |
if state == State.SKIPPED: | |
raise AirflowSkipException | |
if state == State.UP_FOR_RESCHEDULE: | |
raise AirflowRescheduleException(timezone.utcnow()) | |
return None # SUCCESS | |
_retries = 0 | |
_retry_delay = datetime.timedelta(seconds=0) | |
if state in [State.FAILED, State.UP_FOR_RETRY]: | |
_retries = 1 | |
_retry_delay = datetime.timedelta(seconds=2) | |
with dag_maker("test_deferred_method_clear"): | |
task = PythonOperator( | |
task_id="test_deferred_method_clear_task", | |
python_callable=run, | |
op_args=[state], |
Do the retries
and retry_delay
differences actually matter?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ReadytoRocc, I'm going to merge this for 2.2.1, but can you follow up with refactoring the test when you get the chance? Thanks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jedcunningham thanks for the the suggestions. Will do.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do the retries and retry_delay differences actually matter?
@jedcunningham I believe it would only matter for the following cases:
State.FAILED
needsretries
set to0
so that task goes into a state of failed.State.UP_FOR_RETRY
needsretries
set to1
so that task goes into a state of retry.
I believe we can set _retry_delay
once, default _retries = 0
, and then only set _retries = 1
, if State.UP_FOR_RETRY
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jedcunningham - PR for refactored tests: #19194
The PR most likely needs to run full matrix of tests because it modifies parts of the core of Airflow. However, committers might decide to merge it quickly and take the risk. If they don't merge it quickly - please rebase it to the latest main at your convenience, or amend the last commit of the PR, and push it with --force-with-lease. |
Awesome work, congrats on your first merged pull request! |
@ReadytoRocc, congrats on your first commit 🎉🚀 |
(cherry picked from commit 8a0d6c2)
(cherry picked from commit 8a0d6c2)
(cherry picked from commit 8a0d6c2)
Extending the changes of #18210 to all cases of a task finishing in
ti._run_raw_task()
.next_method
is not cleared when Clearing a Successful Task #19120next_method
andnext_kwargs
not cleared on retries #18146