Forward termination signals from supervisor to task subprocess#61627
Forward termination signals from supervisor to task subprocess#61627andreahlert wants to merge 2 commits intoapache:mainfrom
Conversation
When a Kubernetes worker pod receives SIGTERM (e.g. spot interruption, scaling down), the signal is delivered to the supervisor process (PID 1 in the container). Previously, the supervisor had no signal handler and would exit with default behavior, leaving the task subprocess orphaned without ever calling the operator's on_kill() hook. This meant spawned resources (pods, subprocesses, etc.) were never cleaned up. This change installs SIGTERM/SIGINT signal handlers in the supervise() function that forward the received signal to the task subprocess. The child process already has a signal handler (registered in task_runner.py) that calls on_kill() when it receives SIGTERM, so forwarding the signal completes the chain and restores the Airflow 2 behavior. Fixes: apache#58936
Remove SystemExit from test's on_kill() to match realistic operator behavior, and add SIGKILL safety net in the background thread to prevent the test from hanging if signal forwarding fails.
| try: | ||
| os.kill(process.pid, signum) | ||
| except ProcessLookupError: | ||
| pass |
There was a problem hiding this comment.
Are you swallowing the exception here because you anticipate that the child worker has been killed before invoking os.kill? If that is the case, I would add a comment here explaining that. Like this:
# Child process may have already exited during shutdown races.
This is more a nit but silent exception swallowing tends to raise eyebrows for readers who might have partial context.
| except ProcessLookupError: | ||
| pass | ||
|
|
||
| signal.signal(signal.SIGTERM, _forward_signal) |
There was a problem hiding this comment.
Your new implementation handles both SIGTERM and SIGINT, but you appear to be testing only SIGTERM here. Is this because you only anticipate K8s to send SIGTERM? I would suggest explaining that here in a comment so that a casual reader does not assume this is a gap. Not a blocking suggestion.
| def execute(self, context): | ||
| for i in range(1000): | ||
| print(f"Iteration {i}") | ||
| sleep(1) |
There was a problem hiding this comment.
I get why the loop needs to run “long enough” so the subprocess is alive when the signal is delivered, but 1000 iterations at 1s each feels a bit overkill. If for some reason the subprocess doesn’t get terminated as expected, this could run for ~15 minutes and materially stall CI. Is there any reason this can’t be reduced to a smaller number (e.g. 30–60 iterations) while still leaving plenty of headroom for signal delivery?
Fixes: #58936
Summary
When a Kubernetes worker pod receives SIGTERM (e.g. spot interruption, scaling down, rolling update), the signal is delivered to the supervisor process (PID 1 in the container). Previously, the supervisor had no signal handler installed and would exit with default behavior, leaving the task subprocess orphaned without ever calling the operator's
on_kill()hook. This meant spawned resources (pods, subprocesses, etc.) were never cleaned up.Root cause: The
supervise()function starts the task subprocess and callsprocess.wait(), but never installs signal handlers for SIGTERM/SIGINT. The task subprocess does have a SIGTERM handler (registered intask_runner.py) that callson_kill(), but the signal never reaches it because the supervisor process terminates first.Fix: Install SIGTERM/SIGINT signal handlers in
supervise()that forward the received signal to the task subprocess viaos.kill(). The child's existing handler then callson_kill()as expected, restoring the Airflow 2 behavior.Signal flow after fix:
_on_termhandler callsoperator.on_kill()wait()returns normallyChanges
task-sdk/src/airflow/sdk/execution_time/supervisor.py: Added signal forwarding insupervise()function. Signal handlers are saved, installed beforeprocess.wait(), and restored in afinallyblock.task-sdk/tests/task_sdk/execution_time/test_supervisor.py: Added test that verifies SIGTERM forwarding from supervisor to subprocess triggers the operator'son_kill()hook.Test plan
test_on_kill_hook_called_when_supervisor_receives_sigtermverifies the signal forwarding chaintest_on_kill_hook_called_when_sigkilledstill passes (no regression)test_kill_escalation_path,test_exit_by_signal) still pass