Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions providers/cncf/kubernetes/docs/changelog.rst
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are not adding text to changelog in PRs - we alsways have this made by release managers. Please add the description to other documentstion pages.

Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,20 @@
Changelog
---------

**New config flags for KubernetesPodOperator zombie-pod cleanup.**
Two new ``[kubernetes_executor]`` options control periodic force-deletion of
zombie KPO pods — pods that keep Running in Kubernetes after their
TaskInstance has reached a terminal state (e.g. because a sidecar container
ignores SIGTERM):

- ``clean_zombie_kpo_pods`` (boolean, default ``True``) — enable/disable the
cleanup scan. Set to ``False`` if you prefer to manage pod lifecycle yourself.
- ``zombie_kpo_pod_cleanup_interval`` (integer seconds, default ``300``) — how
often to scan and force-delete zombie pods.

Zombie pods are force-deleted with ``grace_period_seconds=0`` so that stuck
sidecar containers cannot delay pod termination.

**Default xcom-sidecar image is now pinned to** ``alpine:3.23``.
The default container image for the xcom sidecar (used by ``KubernetesPodOperator``
when ``do_xcom_push=True``) has changed from the unpinned ``alpine`` (which resolves
Expand Down
21 changes: 21 additions & 0 deletions providers/cncf/kubernetes/provider.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,27 @@ config:
type: string
example: ~
default: "False"
clean_zombie_kpo_pods:
description: |
If True, the Kubernetes Executor will periodically scan for
KubernetesPodOperator pods that are still Running in Kubernetes but
whose TaskInstance has reached a terminal state (zombie pods), and
force-delete them. This prevents sidecar containers that ignore
SIGTERM (e.g. the xcom sidecar) from keeping pods alive indefinitely
after the task has finished.
version_added: ~
type: boolean
example: ~
default: "True"
zombie_kpo_pod_cleanup_interval:
description: |
How often (in seconds) to scan for and force-delete zombie
KubernetesPodOperator pods. Only used when clean_zombie_kpo_pods
is True.
version_added: ~
type: integer
example: ~
default: "300"
worker_pod_pending_fatal_container_state_reasons:
description: |
If the worker pods are in a pending state due to a fatal container
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ def __init__(self, *args, **kwargs):
self.kube_client: client.CoreV1Api | None = None
self.scheduler_job_id: str | None = None
self._last_completed_pod_adoption = 0.0
self._last_zombie_kpo_cleanup = 0.0
self.last_handled: dict[TaskInstanceKey, float] = {}
self.kubernetes_queue: str | None = None
self.task_publish_retries: Counter[TaskInstanceKey] = Counter()
Expand Down Expand Up @@ -274,6 +275,12 @@ def sync(self) -> None:
self._last_completed_pod_adoption = now
self._adopt_completed_pods(self.kube_client)

if self.kube_config.clean_zombie_kpo_pods:
zombie_interval = float(self.kube_config.zombie_kpo_pod_cleanup_interval)
if now - self._last_zombie_kpo_cleanup >= zombie_interval:
self._last_zombie_kpo_cleanup = now
self._cleanup_zombie_kpo_pods(self.kube_client)

if self.running:
self.log.debug("self.running: %s", self.running)
if self.queued_tasks:
Expand Down Expand Up @@ -817,6 +824,132 @@ def _adopt_completed_pods(self, kube_client: client.CoreV1Api) -> None:
)
)

# Operators whose pods this cleanup manages. EksPodOperator and
# GKEStartPodOperator are intentionally excluded: their pods run in
# external clusters where this kube_client has no authority.
_KPO_OPERATORS: frozenset[str] = frozenset(
[
"KubernetesPodOperator",
"SparkKubernetesOperator",
]
)

@provide_session
def _cleanup_zombie_kpo_pods(
self, kube_client: client.CoreV1Api, *, session: Session = NEW_SESSION
) -> None:
"""Force-delete KubernetesPodOperator pods whose TaskInstance is no longer active.

A pod is a zombie when either:

- No matching non-terminal TaskInstance exists in the DB (the TI already
finished or was never recorded), or
- The pod's ``try_number`` label is less than the active TI's current
``try_number`` (the pod is a leftover from a previous retry attempt).

Force-deletion (``grace_period_seconds=0``) is used so that sidecar
containers that ignore SIGTERM cannot delay pod termination.
"""
from sqlalchemy import or_

from airflow.models.taskinstance import TaskInstance
from airflow.providers.cncf.kubernetes.pod_generator import make_safe_label_value
from airflow.utils.state import State

pod_list = self._list_pods({"label_selector": "kubernetes_pod_operator=True"})
if not pod_list:
return

# Parse pod labels. Values were already normalized by make_safe_label_value()
# at pod-creation time (operators/pod.py get_labels()), so they are ready to
# compare directly against normalized DB values.
pod_identities = []
for pod in pod_list:
labels = pod.metadata.labels or {}
dag_id = labels.get("dag_id")
task_id = labels.get("task_id")
run_id = labels.get("run_id")
if not (dag_id and task_id and run_id):
continue
map_index = int(labels.get("map_index", -1))
try_number = int(labels.get("try_number", 0))
pod_identities.append((pod, dag_id, task_id, run_id, map_index, try_number))

if not pod_identities:
return

# Single batch query: all non-terminal TIs for KPO-managed operators.
# We match on notin_(finished) rather than in_(unfinished) because SQL
# evaluates "NULL NOT IN (...)" as NULL, not TRUE — the explicit is_(None)
# arm ensures we also capture TIs with no state set yet.
terminal_state_values = [s.value for s in State.finished]
active_tis = session.execute(
select(
TaskInstance.dag_id,
TaskInstance.task_id,
TaskInstance.run_id,
TaskInstance.map_index,
TaskInstance.try_number,
).where(
or_(
TaskInstance.state.notin_(terminal_state_values),
TaskInstance.state.is_(None),
),
TaskInstance.operator.in_(self._KPO_OPERATORS),
)
).all()

# Build lookup: normalized (dag_id, task_id, run_id, map_index) → max try_number.
# DB values must be normalized identically to how the operator wrote pod labels.
active_lookup: dict[tuple[str, str, str, int], int] = {}
for ti in active_tis:
key = (
make_safe_label_value(ti.dag_id),
make_safe_label_value(ti.task_id),
make_safe_label_value(ti.run_id),
ti.map_index,
)
if key not in active_lookup or ti.try_number > active_lookup[key]:
active_lookup[key] = ti.try_number

from kubernetes.client import V1DeleteOptions
from kubernetes.client.rest import ApiException

for pod, dag_id, task_id, run_id, map_index, pod_try in pod_identities:
pod_key = (dag_id, task_id, run_id, map_index)
active_try = active_lookup.get(pod_key)

is_zombie = active_try is None or pod_try < active_try
if not is_zombie:
continue

self.log.warning(
"Force-deleting zombie KPO pod %s/%s "
"(dag_id=%s task_id=%s run_id=%s map_index=%d try_number=%d active_try=%s)",
pod.metadata.namespace,
pod.metadata.name,
dag_id,
task_id,
run_id,
map_index,
pod_try,
active_try,
)
try:
kube_client.delete_namespaced_pod(
pod.metadata.name,
pod.metadata.namespace,
body=V1DeleteOptions(grace_period_seconds=0),
)
except ApiException as e:
if e.status != 404:
self.log.warning(
"Failed to force-delete zombie KPO pod %s/%s: %s",
pod.metadata.namespace,
pod.metadata.name,
e,
)

def _flush_task_queue(self) -> None:
if TYPE_CHECKING:
assert self.task_queue
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,12 @@ def __init__(self, executor_conf: ExecutorConf | None = None):
self.delete_worker_pods_on_failure = self._conf.getboolean(
self.kubernetes_section, "delete_worker_pods_on_failure"
)
self.clean_zombie_kpo_pods = self._conf.getboolean(
self.kubernetes_section, "clean_zombie_kpo_pods", fallback=True
)
self.zombie_kpo_pod_cleanup_interval = self._conf.getint(
self.kubernetes_section, "zombie_kpo_pod_cleanup_interval", fallback=300
)
self.worker_pod_pending_fatal_container_state_reasons: list[str] = []
fatal_reasons = self._conf.get(
self.kubernetes_section, "worker_pod_pending_fatal_container_state_reasons", fallback=""
Expand Down
Loading
Loading