Skip to content

Scheduler becomes zombie when run_job raises unhandled exception #32706

@xuganyu96

Description

@xuganyu96

Apache Airflow version

2.6.3

What happened

Context

When the backend database shuts down (for maintenance, for example), Airflow scheduler's main scheduler loop crashes, but the scheduler process does not exit. In my company's setup, the scheduler process is monitored by supervisord, but since the scheduler process does not exit, supervisord did not pick up on the scheduler failure, causing prolonged scheduler outage.

Root cause

In the airflow/cli/commands/scheduler_command.py, the main function call of the airflow scheduler command is the _run_scheduler_job function. When the _run_scheduler_job function is called, depending on the configuration, two sub-processes serve_logs and/or health_check may be started. The life cycle of these two sub-processes are managed by a context manager, so that when the context exits, the two sub-processes are terminated by the context managers:

def _run_scheduler_job(job_runner: SchedulerJobRunner, *, skip_serve_logs: bool) -> None:
    InternalApiConfig.force_database_direct_access()
    enable_health_check = conf.getboolean("scheduler", "ENABLE_HEALTH_CHECK")
    with _serve_logs(skip_serve_logs), _serve_health_check(enable_health_check):
        run_job(job=job_runner.job, execute_callable=job_runner._execute)

@contextmanager
def _serve_logs(skip_serve_logs: bool = False):
    """Starts serve_logs sub-process."""
    from airflow.utils.serve_logs import serve_logs

    sub_proc = None
    executor_class, _ = ExecutorLoader.import_default_executor_cls()
    if executor_class.serve_logs:
        if skip_serve_logs is False:
            sub_proc = Process(target=serve_logs)
            sub_proc.start()
    yield
    if sub_proc:
        sub_proc.terminate()


@contextmanager
def _serve_health_check(enable_health_check: bool = False):
    """Starts serve_health_check sub-process."""
    sub_proc = None
    if enable_health_check:
        sub_proc = Process(target=serve_health_check)
        sub_proc.start()
    yield
    if sub_proc:
        sub_proc.terminate()

The mis-behavior happens when run_job raises unhandled exception. The exception takes over the control flow, and the context managers will not properly exit. When the main Python process tries to exit, the multiprocessing module tries to terminate all child processes (https://github.com/python/cpython/blob/1e1f4e91a905bab3103250a3ceadac0693b926d9/Lib/multiprocessing/util.py#L320C43-L320C43) by first calling join(). Because the sub-processes serve_logs and/or health_check are never terminated, calling join() on them will hang indefinitely, thus causing the zombie state.

Note that this behavior was introduced since 2.5.0 (2.4.3 does not have this issue) when the two sub-processes are not managed with context manager, and the scheduler job is placed inside a try-catch-finally block.

What you think should happen instead

The scheduler process should never hang. If something went wrong, such as a database disconnect, the scheduler should simply crash, and let whoever manages the scheduler process handle respawn.

As to how this should be achieved, I think the best way is to place run_job inside a try-catch block so that any exception can be caught and gracefully handled, although I am open to feedback.

How to reproduce

To reproduce the scheduler zombie state

Start an Airflow cluster with breeze:

breeze start-airflow --python 3.9 --backend postgres [with any version at or later than 2.5.0]

After the command opens the tmux windows, stop the postgres container docker stop docker-compose-postgres-1

The webserver will not do anything. The triggerer should correctly crash and exit. The scheduler will crash but not exit.

To reproduce the context manager's failure to exit

from multiprocessing import Process
from contextlib import contextmanager
import time

def busybox():
    time.sleep(24 * 3600)  # the entire day

@contextmanager
def some_resource():
    subproc = Process(target=busybox)
    subproc.start()
    print(f"Sub-process {subproc} started")
    yield
    subproc.terminate()
    subproc.join()
    print(f"Sub-process {subproc} terminated")

def main():
    with some_resource():
        raise Exception("Oops")


if __name__ == "__main__":
    main()

Operating System

MacOS with Docker Desktop

Versions of Apache Airflow Providers

No response

Deployment

Official Apache Airflow Helm Chart

Deployment details

No response

Anything else

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

Labels

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions