Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

on_failure_callback is not called when task is manually marked as failed #38935

Closed
1 of 2 tasks
FFCMSouza opened this issue Apr 11, 2024 · 8 comments
Closed
1 of 2 tasks
Labels
area:core kind:bug This is a clearly a bug needs-triage label for new issues that we didn't triage yet

Comments

@FFCMSouza
Copy link

Apache Airflow version

Other Airflow 2 version (please specify below)

If "Other Airflow 2 version" selected, which one?

2.7.1

What happened?

on_failure_callback is not called when task is terminated externally.
A similar issue was reported in #25297 and fixed in #29743.
I saw that the code change to call the on_failure_callback in this cases is no longer present on version 2.7.1.
After which on_failure_callback is no longer called when SIGTERM is received

As you can see from the screenshots bellow the function was never called, the logs are never printed.
image
image
image
image

What you think should happen instead?

on_failure_callback should be called when task fails including when the task was manually marked as failed.

How to reproduce

Create a dag task with an on_failure_callback configured, start and then mark the task as failed. You can see from the log that the function was never called.

Operating System

debian

Versions of Apache Airflow Providers

No response

Deployment

Other Docker-based deployment

Deployment details

No response

Anything else?

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@FFCMSouza FFCMSouza added area:core kind:bug This is a clearly a bug needs-triage label for new issues that we didn't triage yet labels Apr 11, 2024
Copy link

boring-cyborg bot commented Apr 11, 2024

Thanks for opening your first issue here! Be sure to follow the issue template! If you are willing to raise PR to address this issue please do so, no need to wait for approval.

@FFCMSouza
Copy link
Author

FFCMSouza commented Apr 11, 2024

I saw that in version 1.9.0, there is already a different type of exception for this case.
I believe that the following change could be made to fix this behavior:

+          except (AirflowFailException, AirflowSensorTimeout, AirflowTaskTerminated) as e:
-          except (AirflowFailException, AirflowSensorTimeout) as e:
                # If AirflowFailException is raised, task should not retry.
                # If a sensor in reschedule mode reaches timeout, task should not retry.
                self.handle_failure(e, test_mode, context, force_fail=True, session=session)
                session.commit()
                raise
+          except (AirflowTaskTimeout, AirflowException) as e:
-          except (AirflowTaskTimeout, AirflowException, AirflowTaskTerminated) as e:
                if not test_mode:
                    self.refresh_from_db(lock_for_update=True, session=session)
                # for case when task is marked as success/failed externally
                # or dagrun timed out and task is marked as skipped
                # current behavior doesn't hit the callbacks
                if self.state in State.finished:
                    self.clear_next_method_args()
                    session.merge(self)
                    session.commit()
                    return None
                else:
                    self.handle_failure(e, test_mode, context, session=session)
                    session.commit()
                    raise

@ephraimbuddy
Copy link
Contributor

This behaviour is documented in the code

# for case when task is marked as success/failed externally
# or dagrun timed out and task is marked as skipped
# current behavior doesn't hit the callbacks
if self.state in State.finished:
.

So it's not a bug

@FFCMSouza
Copy link
Author

Yes, I saw that. So there is no intention to change this behavior?
To me this doesn't make sense, marking a task as failed should trigger the failure callback.
If that's the case, is there any workaround to do this? I'm trying to use the on_kill method to do this, does it make sense to you?

@ephraimbuddy
Copy link
Contributor

Yes, I saw that. So there is no intention to change this behavior? To me this doesn't make sense, marking a task as failed should trigger the failure callback. If that's the case, is there any workaround to do this? I'm trying to use the on_kill method to do this, does it make sense to you?

There are no plans to change the behaviour at the moment. Also, I don't know why you would want the callback to run when you intentionally failed a task

@FFCMSouza
Copy link
Author

There are no plans to change the behaviour at the moment. Also, I don't know why you would want the callback to run when you intentionally failed a task

In my case, I use Airflow as an orchestrator to run Spark applications (spark-sql and spark-submit). When a task has failed or is marked as failed, in addition to notifying that the task has failed, I need to make sure that the corresponding step has been canceled in Spark. Most of the time, when a task actually fails, the cancel step operation is unnecessary, but this is not the case when a task is marked as failed.

I don't think it's such a rare scenario, plus similar issues have been opened before. I believe this behavior change would be very welcome in future Airflow releases.

@FFCMSouza
Copy link
Author

FFCMSouza commented Apr 14, 2024

Anyway, I managed to solve my problem overwriting the on_kill method in my spark operator.
The only downside is that the on_kill method doesn't have access to the task context. To solve this problem, I created variables on the execute method with the context information I need.

def on_kill(self):
    logging.info('starting on_kill')
    cancel_step(self.project, self.dag_name, self.run_id, self.task_id)
def execute(self, context):
    self.log.info("starting SparkToDataLake.execute")

    self.dag_name = context.get('task_instance').dag_id
    self.run_id = context.get('run_id')
    self.task_id = context.get('task_instance').task_id + str(context.get('task_instance').map_index)

image

PS: The on_kill method is also called on the AirflowTaskTimeout exception.

@ephraimbuddy
Copy link
Contributor

I think it might be worth it if you can create a feature request and explain how necessary this issue is, the use cases because I see it, that it's not required because the user is performing a manual action on the UI, what's the need to run the callback when same callback can be a task instead.

closing since it's not a bug

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:core kind:bug This is a clearly a bug needs-triage label for new issues that we didn't triage yet
Projects
None yet
Development

No branches or pull requests

2 participants