Skip to content

Task-runner's venv / Popen subprocesses become orphans on heartbeat 409; supervisor also crashes with ValueError: I/O operation on closed epoll object #65505

@cmettler

Description

@cmettler

Under which category would you file this issue?

Airflow Core

Apache Airflow version

3.2.0

What happened and how to reproduce it?

TL;DR

Trigger: a running TaskInstance's state gets set to failed while the task is actually still executing. In practice this happens when:

  • the scheduler runs adopt_or_reset_orphaned_tasks and resets a TI whose heartbeat looks stale (e.g. after a scheduler restart, or under worker load),
  • someone (or an external system) PATCHes the TI state to failed via the REST API,
  • any other code path transitions a running TI out of running without going through the normal task-runner exit.

The next heartbeat PUT from the still-running task-runner then returns HTTP 409, which triggers the broken kill path.

On Airflow 3.2.0 + CeleryExecutor, two things happen as a result:

  1. Any subprocess the task-runner spawned @task.virtualenv is reparented to PID 1 and keeps running — wastes CPU, RAM, third-party API quota. No code path ever reaps it.
  2. ~60 s later the supervisor crashes with ValueError: I/O operation on closed epoll object, so Celery reports the task as raised on top of the orphan.

Related issues: #50500 (closed), #50507 (closed), #58562 (open). The ValueError part of this bug is a regression from the fix for #50500 (PR #51180, commit a2651f17d43, merged 2025-05-29) — see "Two problems, two origins" below. The orphan part is older and structural (no parent-death signal, no process-group kill).

Detailed sequence

A long-running @task.virtualenv task (or any task using PythonVirtualenvOperator, ExternalPythonOperator, DockerOperator, BashOperator, or Cosmos dbt) is supervised by airflow-sdk's supervise()_monitor_subprocess() loop.

The scheduler resets the TaskInstance to failed. The next heartbeat PUT returns HTTP 409. _send_heartbeat_if_needed() (supervisor.py:1189) responds with self.kill(signal.SIGTERM, force=True), which SIGTERM→SIGKILL-escalates the task-runner.

Two bad things happen:

1. Supervisor crashes with ValueError

Approximately 60 s later (one [workers] socket_cleanup_timeout), _monitor_subprocess() invokes _cleanup_open_sockets() because one socket ("requests") is still open. That closes the selector. The next iteration of _service_subprocess() calls self.selector.select(timeout=timeout) on the closed selector and raises ValueError.

Full traceback from the Celery worker log:

Traceback (most recent call last):
  File ".../celery/app/trace.py", line 585, in trace_task
    R = retval = fun(*args, **kwargs)
  File ".../celery/app/trace.py", line 858, in __protected_call__
    return self.run(*args, **kwargs)
  File ".../airflow/providers/celery/executors/celery_executor_utils.py", line 202, in execute_workload
    supervise(...)
  File ".../airflow/sdk/execution_time/supervisor.py", line 2107, in supervise
    exit_code = process.wait()
  File ".../airflow/sdk/execution_time/supervisor.py", line 1062, in wait
    self._monitor_subprocess()
  File ".../airflow/sdk/execution_time/supervisor.py", line 1127, in _monitor_subprocess
    alive = self._service_subprocess(max_wait_time=max_wait_time) is None
  File ".../airflow/sdk/execution_time/supervisor.py", line 791, in _service_subprocess
    events = self.selector.select(timeout=timeout)
  File "/usr/python/lib/python3.12/selectors.py", line 468, in select
    fd_event_list = self._selector.poll(timeout, max_ev)
ValueError: I/O operation on closed epoll object

2. The venv (/ Popen) subprocess is an orphan

The supervisor's kill() signals only the task-runner. Subprocesses the task-runner itself spawned (e.g. via python_virtualenv._execute_in_subprocess for @task.virtualenv) were neither signalled nor reaped. When the task-runner dies, those grandchildren are reparented to PID 1 and keep running until they finish on their own or are killed manually — consuming CPU, RAM, and third-party API quota.

Verified in /proc: immediately after the "Task killed!" log line, the venv subprocess is alive with State: S (sleeping) and ppid=1.

Timeline of one incident

TaskInstance 019da136-26e0-7eb1-..., task-runner PID 45780, venv PID 45843:

15:30:03  task-runner spawns venv, venv begins its work
15:31:48  apiserver: PUT /execution/task-instances/.../heartbeat → 409
15:31:48  supervisor: "Server indicated the task shouldn't be running anymore"
15:31:53  supervisor: "Process did not terminate in time; escalating"   (SIGTERM→SIGKILL)
15:31:58  supervisor: "Process exited exit_code=-9"                     (task-runner dead)
                      venv now has ppid=1
15:32:58  supervisor: "Process exited with open sockets; cleaning up after timeout"
                      (60 s SOCKET_CLEANUP_TIMEOUT elapsed)
15:32:58  supervisor: "Force-closed stuck sockets"                      (closes selector)
15:32:58  ValueError: I/O operation on closed epoll object              (crash)
~ 30 min  venv finally finishes its internal for loop and exits

Two problems, two origins

(1) The orphan is the older, structural problem. A Popen subprocess spawned by the task-runner (e.g. in providers/standard/utils/python_virtualenv._execute_in_subprocess) is not placed in a new session, has no preexec_fn registering PR_SET_PDEATHSIG, and is never signalled when the supervisor's kill() reaches the task-runner. As soon as the task-runner dies, the Popen child is reparented to PID 1 and keeps running. This behaviour predates the task-sdk refactor.

(2) The ValueError: I/O operation on closed epoll object is a regression introduced by PR #51180.

git blame on task-sdk/src/airflow/sdk/execution_time/supervisor.py shows that before commit a2651f17d43 ("Fix lingering task supervisors when EOF is missed", PR #51180, merged 2025-05-29) there was exactly one self.selector.close() call in the file — the finally of wait() at line 1064. That one site ran only after the monitor loop exited, so a race was not possible.

PR #51180 added _cleanup_open_sockets() (supervisor.py:677-695) and invokes it from inside _monitor_subprocess() (supervisor.py:1144). That is the second close site, and it fires while the loop is still running. Combined with the pre-existing kill(force=True) call path that re-enters _service_subprocess(), the crash became reachable.

Before PR #51180 the orphan bug would cause the supervisor to hang on the closed-but-not-flagged pipe forever (exactly the symptom #50500 tried to fix). #51180 fixed the hang but made the orphan visible as a Celery-level traceback because the cleanup path now closes the selector under its own feet.

Repro DAG

Save as dags/test_supervisor_orphan_repro.py:

"""Minimal reproduction for the supervisor-epoll / orphan-venv bug.

Drop into `dags/`, unpause, trigger manually, wait until the venv task
has started (~30 s), then PATCH the TaskInstance to `failed` via REST
(see steps below).  The supervisor will crash with ValueError ~60 s
later; the venv child keeps running as an orphan (ppid=1) until it
finishes its own loop ~30 min later.

Watch /tmp/orphan_test_pid_<pid>.log on the worker — it keeps growing
past the "Task killed!" supervisor log line, and the `ppid=` value
flips from the task-runner PID to 1 at that moment.
"""

from __future__ import annotations

from datetime import datetime

from airflow.sdk import dag, task


@dag(
    dag_id="test_supervisor_orphan_repro",
    schedule=None,
    start_date=datetime(2024, 1, 1),
    catchup=False,
    tags=["supervisor-epoll-bug"],
)
def test_supervisor_orphan_repro():

    @task.virtualenv(
        requirements=[],
        venv_cache_path="/opt/airflow/venv_cache",
        system_site_packages=True,
    )
    def long_running_step() -> dict:
        import logging
        import os
        import time

        pid = os.getpid()
        signal_log = f"/tmp/orphan_test_pid_{pid}.log"
        with open(signal_log, "a") as f:
            f.write(f"{time.time():.3f} pid={pid} start ppid={os.getppid()}\n")

        log = logging.getLogger("repro")
        log.setLevel(logging.INFO)

        # 60 * 30 s = 30 min, well past default task_instance_heartbeat_timeout (300 s)
        for i in range(60):
            ppid = os.getppid()
            with open(signal_log, "a") as f:
                f.write(f"{time.time():.3f} pid={pid} step={i} ppid={ppid}\n")
            log.info("pid=%s step=%d/60 ppid=%s", pid, i, ppid)
            time.sleep(30)

        return {"pid": pid, "steps": 60}

    long_running_step()


test_supervisor_orphan_repro()

Steps

  1. Airflow 3.2.0 + CeleryExecutor.

  2. Save the DAG above; unpause; trigger manually.

  3. Wait until a line appears in /tmp/orphan_test_pid_<pid>.log on the worker (~30 s).

  4. PATCH the TaskInstance state to failed via REST:

    PATCH /api/v2/dags/test_supervisor_orphan_repro/dagRuns/{run_id}/taskInstances/long_running_step
    Content-Type: application/json
    { "new_state": "failed" }
    
  5. Within ~5 s, the next heartbeat returns 409 and the supervisor logs "Server indicated the task shouldn't be running anymore".

  6. ~60 s later (one [workers] socket_cleanup_timeout), docker logs airflow-worker shows the ValueError: I/O operation on closed epoll object.

  7. cat /tmp/orphan_test_pid_<pid>.log on the worker continues to grow after "Task killed!" was logged, with ppid=1 instead of the task-runner PID — the venv is an orphan.

  8. docker exec airflow-worker cat /proc/<pid>/status → process still alive with State: S (sleeping) and PPid: 1.

What you think should happen instead?

Neither orphans nor a supervisor crash should happen when a running TaskInstance is transitioned out of running by the server.

Expected:

  1. The task-runner and any subprocesses it spawned (venv children, Docker exec, Bash shells, Cosmos dbt runs, …) terminate promptly when the supervisor's kill() fires.
  2. The supervisor itself exits cleanly — supervise() returns a non-zero exit code so Celery reports the task as a normal failure, not as raised unexpected.

Operating System

Linux (official Airflow Docker image)

Deployment

Docker-Compose

Apache Airflow Provider(s)

celery

Versions of Apache Airflow Providers

apache-airflow-providers-celery | 3.17.1

Official Helm Chart version

Not Applicable

Kubernetes Version

No response

Helm Chart configuration

No response

Docker Image customizations

Based on the official apache/airflow:3.2.0-python3.12 image with only system/tooling additions and user-level Python packages — no modifications to airflow-sdk / task-sdk / supervisor source.

FROM apache/airflow:3.2.0-python3.12
USER root
RUN apt-get update \
    && apt-get install -y --no-install-recommends \
        libxcb1 libgl1 libglib2.0-0 \
        postgresql-client wget unzip \
        dotnet-sdk-10.0 powershell \
    && # ... install duckdb CLI ...
USER airflow
COPY requirements.txt /
RUN pip install --no-cache-dir apache-airflow==${AIRFLOW_VERSION} -r /requirements.txt

Anything else?

Proposed fix — process-group kill in airflow-sdk

Fixes both problems in airflow-sdk alone, covers every operator/subprocess type, and does not require any provider-side changes.

In task-sdk/src/airflow/sdk/execution_time/supervisor.py:

@@ WatchedSubprocess.start() — right after os.fork()
         pid = os.fork()
         if pid == 0:
+            # Put the task-runner into its own session so the supervisor can
+            # deliver signals to the whole tree via os.killpg().
+            with suppress(OSError):
+                os.setsid()
             cls._close_unused_sockets(...)
             ...

@@ WatchedSubprocess.kill() — inside the escalation loop
         for sig in escalation_path:
             try:
-                self._process.send_signal(sig)
+                # Signal the whole process group so subprocesses the
+                # task-runner spawned (venv children, Docker exec, bash
+                # shells, etc.) are also reached.
+                try:
+                    os.killpg(os.getpgid(self._process.pid), sig)
+                except (ProcessLookupError, PermissionError):
+                    self._process.send_signal(sig)

Why this works:

  • os.setsid() makes the task-runner its own session+pgid leader. Any Popen() it executes (which doesn't itself set start_new_session) inherits that pgid.
  • os.killpg() on SIGTERM reaches both the task-runner and the venv subprocess.
  • The venv subprocess has no SIGTERM handler → terminates immediately → closes the inherited "requests" pipe → supervisor sees EOF on that socket → _open_sockets drains → _cleanup_open_sockets() is never invoked → selector never gets closed mid-loop → no ValueError → no orphan.
  • The task-runner's own _on_term handler (task_runner.py:1243) still runs, so ti.task.on_kill() gets called for cleanup. No SIGKILL escalation needed.

Verified on DEV against Airflow 3.2.0 + CeleryExecutor: the exact same 409-on-heartbeat reproduction path that was producing ValueError + orphan now ends as Celery succeeded in 27-35 s, task-runner exit 0, no orphans.

proposed_fix.patch
proposed_fix_killpg.patch

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

No one assigned

    Labels

    area:corekind:bugThis is a clearly a bugneeds-triagelabel for new issues that we didn't triage yetpriority:criticalShowstopper bug that should be patched immediately

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions