You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Deferred tasks interacting with external systems (BQ, DBX, etc.) have no standard way to cancel the external job when a user marks the task failed, clears it, or marks it succeeded. This leaves orphaned jobs running in the background.
I intend to fix that by adding on_kill() to BaseTrigger — a no-op by default that trigger authors override to cancel their external job when the task is acted on by a user.
The triggerer already passes trigger IDs through a to_cancel queue when a trigger is removed. Everything in that queue is a user action —> redistribution only happens when a triggerer is considered dead, at which point it is no longer processing its queue. We use asyncio.Task.cancel("user-action") to pass the reason through the existing cancellation path, so run_trigger() can check e.args[0] and invoke on_kill() without any extra API round-trip.
Impact
Existing trigger subclasses
Purely additive. on_kill() defaults to a no-op so nothing breaks if you do not implement it.
Triggerer shutdown / redistribution
No performance impact. on_kill() is only invoked when CancelledError carries the "user-action" message. Shutdown fires task.cancel("EOF - shutting down") and redistribution goes through the to_create path — neither triggers on_kill().
Provider authors
Override on_kill() to cancel external jobs. The framework guarantees it is only called for user initiated cancellations and you can implement it as needed.
Testing
Trigger
SENTINEL_FILE="/tmp/on_cancel_was_called"classVerifyOnCancelTrigger(BaseTrigger):
""" Trigger that sleeps forever. on_cancel() writes a sentinel file so we can verify it was called without needing a real external service like Databricks or BigQuery. """defserialize(self) ->tuple[str, dict]:
return ("verify_on_cancel_trigger.VerifyOnCancelTrigger", {})
asyncdefrun(self) ->AsyncIterator[TriggerEvent]:
log.info("VerifyOnCancelTrigger running — mark this task as failed to test on_cancel()")
awaitasyncio.sleep(99999)
yieldTriggerEvent("done") # never reachedasyncdefon_kill(self) ->None:
log.info("VerifyOnCancelTrigger.on_cancel() CALLED — simulated external job cancelled")
withopen(SENTINEL_FILE+"-"+str(random.randint(1, 10000)), "w") asf:
f.write("cancelled")
DAG:
```pythonclassDeferForeverOperator(BaseOperator):
defexecute(self, context):
self.defer(trigger=VerifyOnCancelTrigger(), method_name="resume")
defresume(self, context, event=None):
log.info("Resumed after trigger fired (should not happen in cancel test)")
withDAG(
dag_id="test_on_cancel",
start_date=datetime(2024, 1, 1),
schedule=None,
tags=["manual-test"],
) asdag:
DeferForeverOperator(task_id="deferred_task")
The trigger is such that if on_cancel is called, once I mark it failed from UI, it will create a sentinel file.
Cancelled trigger
The first one is marked failed, second is mark success and third is cancelled.
Before run
root@0c61b077a443:/opt/airflow$ ls /tmp/on_cancel_was_called
ls: cannot access '/tmp/on_cancel_was_called': No such file or directory
[Breeze:3.10.19] root@0c61b077a443:/opt/airflow$
[Breeze:3.10.19] root@0c61b077a443:/opt/airflow$
After run
The image shows 3 files created for each action mentioned in 1.
Whats next?
Databricks triggers: add on_kill() to DatabricksExecutionTrigger and DatabricksSQLStatementExecutionTrigger (follow-up PR)
BigQuery trigger: migrate from duplicated safe_to_cancel/get_task_state
boilerplate to on_kill()
Read the Pull Request Guidelines for more information. Note: commit author/co-author name and email in commits become permanently public when merged.
For fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
For significant user-facing changes create newsfragment: {pr_number}.significant.rst, in airflow-core/newsfragments. You can add this file in a follow-up commit after the PR is created so you know the PR number.
As discussed on Slack with you and @ashb, two suggestions:
1. Rename on_cancel → on_kill
cancel already means asyncio.Task.cancel() / CancelledError in async land (fires on every exit: shutdown, redeploy, timeout, user action). on_kill is Airflow's established vocabulary for "task is being killed, stop the external work" -- see BaseOperator.on_kill(). DBX/BQ/Spark operators already implement it. Same name on the trigger is symmetric with the operator and avoids the CancelledError overlap.
2. Distinguish from cleanup() in the docstring
BaseTrigger.cleanup() already runs on every exit via the finally at triggerer_job_runner.py:1338-1353, including triggerer redeploys where the trigger hops to another process and keeps watching the same external job. If "cancel external work" went in cleanup, every rolling triggerer deploy would kill in-flight work.
So cleanup = "this trigger instance is leaving this process" (release local resources) and the new hook = "the task itself is being killed" (stop external work). The docstring should spell this out -- otherwise the obvious impl is to put DBX/BQ cancel in cleanup, which passes tests but breaks the first triggerer rollout.
amoghrajesh
changed the title
Add on_cancel() hook to BaseTrigger to handle user actions on triggers
Add on_kill() hook to BaseTrigger to handle user actions on triggers
Apr 21, 2026
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Was generative AI tooling used to co-author this PR?
closes: #36090
Why + What
Deferred tasks interacting with external systems (BQ, DBX, etc.) have no standard way to cancel the external job when a user marks the task failed, clears it, or marks it succeeded. This leaves orphaned jobs running in the background.
I intend to fix that by adding
on_kill()toBaseTrigger— a no-op by default that trigger authors override to cancel their external job when the task is acted on by a user.The triggerer already passes trigger IDs through a
to_cancelqueue when a trigger is removed. Everything in that queue is a user action —> redistribution only happens when a triggerer is considered dead, at which point it is no longer processing its queue. We useasyncio.Task.cancel("user-action")to pass the reason through the existing cancellation path, sorun_trigger()can checke.args[0]andinvoke on_kill()without any extra API round-trip.Impact
Existing trigger subclasses
Purely additive.
on_kill()defaults to a no-op so nothing breaks if you do not implement it.Triggerer shutdown / redistribution
No performance impact.
on_kill()is only invoked whenCancelledErrorcarries the "user-action" message. Shutdown firestask.cancel("EOF - shutting down")and redistribution goes through theto_createpath — neither triggerson_kill().Provider authors
Override
on_kill()to cancel external jobs. The framework guarantees it is only called for user initiated cancellations and you can implement it as needed.Testing
Trigger
The trigger is such that if
on_cancelis called, once I mark it failed from UI, it will create a sentinel file.The first one is marked failed, second is mark success and third is cancelled.
root@0c61b077a443:/opt/airflow$ ls /tmp/on_cancel_was_called ls: cannot access '/tmp/on_cancel_was_called': No such file or directory [Breeze:3.10.19] root@0c61b077a443:/opt/airflow$ [Breeze:3.10.19] root@0c61b077a443:/opt/airflow$The image shows 3 files created for each action mentioned in 1.
Whats next?
DatabricksExecutionTriggerandDatabricksSQLStatementExecutionTrigger(follow-up PR)boilerplate to
on_kill(){pr_number}.significant.rst, in airflow-core/newsfragments. You can add this file in a follow-up commit after the PR is created so you know the PR number.