Skip to content

KubernetesPodTrigger.get_task_state KeyError on mapped TIs (skips delete_pod) #67296

@paultmathew

Description

@paultmathew

Under which category would you file this issue?

Providers

Apache Airflow version

3.2.1

What happened and how to reproduce it?

KubernetesPodTrigger.safe_to_cancel() calls get_task_state(), which calls
RuntimeTaskInstance.get_task_states(...) against the execution API's
/states endpoint and looks the response up by plain task_id:

# providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/triggers/pod.py
return task_states_response[self.task_instance.run_id][self.task_instance.task_id]

But the API endpoint encodes the response key differently for mapped TIs
(one entry per (task_id, map_index) pair):

# airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py
[
    run_id_task_state_map[task.run_id].update(
        {task.task_id: task.state}
        if task.map_index < 0
        else {f"{task.task_id}_{task.map_index}": task.state}
    )
    for task in results
]

So for any mapped deferrable KubernetesPodOperator task — for example a
KubernetesPodOperator(..., deferrable=True).expand(...) or one nested
inside a @task_group.expand(...) — the trigger's lookup with
task_id="map_group.task_a" and map_index=2 looks for key
"map_group.task_a", but the response only contains
"map_group.task_a_2". KeyError → wrapped in AirflowException
cleanup()'s broad except Exception defensively skips
hook.delete_pod() → the pod is never deleted on user-mark-failed,
keeping it alive until active_deadline_seconds expires (often hours).

Log from a real staging run (Airflow 3.2.1 +
apache-airflow-providers-cncf-kubernetes==10.16.0):

[2026-05-21T17:46:25.889914Z] WARNING - Could not determine task state
during cleanup; skipping pod deletion to be safe.
AirflowException: ('TaskInstance with dag_id: %s, task_id: %s, run_id: %s
and map_index: %s is not found', 'platform_dag_behavior_test',
'map_group.task_a', 'manual__2026-05-21T17:43:39.041740+00:00', 2)
    File pod.py, line 434 in cleanup
    File pod.py, line 420 in safe_to_cancel
    File pod.py, line 401 in get_task_state
,KeyError: 'map_group.task_a'
    File pod.py, line 399 in get_task_state

The same trigger code is present on main — bug is not fixed there.

Impact

Affects every mapped deferrable KPO task on Airflow 3.x with
apache-airflow-providers-cncf-kubernetes >= 10.15.0 (which is when
safe_to_cancel was introduced in #62401). Silently breaks the
mark-failed-deletes-pod contract — operators of mapped deferred KPO tasks
who hit "Mark Failed" in the UI see the pod stay Running until the pod
deadline. For long-lived deferrable tasks (e.g. continuous stream
pollers) this can cause overlapping-writer races against external
systems (Iceberg commits in our case).

What you think should happen instead?

get_task_state() should compose the lookup key the same way the API
server encodes it — appending the _{map_index} suffix when the TI is
mapped. That makes the lookup succeed for mapped TIs and produces the
correct safe_to_cancel() result, which lets cleanup() call
hook.delete_pod() and Mark Failed actually terminates the pod.

Suggested diff (matches the patch we are running as a local plugin
workaround):

--- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/triggers/pod.py
+++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/triggers/pod.py
@@ async def get_task_state(self):
             task_states_response = await sync_to_async(RuntimeTaskInstance.get_task_states)(
                 dag_id=self.task_instance.dag_id,
                 task_ids=[self.task_instance.task_id],
                 run_ids=[self.task_instance.run_id],
                 map_index=self.task_instance.map_index,
             )
+            # The /states endpoint suffixes the response key with
+            # `_{map_index}` for mapped TIs (see the dict-key construction
+            # in airflow-core/.../execution_api/routes/task_instances.py).
+            ti_key = (
+                f"{self.task_instance.task_id}_{self.task_instance.map_index}"
+                if self.task_instance.map_index >= 0
+                else self.task_instance.task_id
+            )
             try:
-                return task_states_response[self.task_instance.run_id][self.task_instance.task_id]
+                return task_states_response[self.task_instance.run_id][ti_key]
             except KeyError:
                 raise AirflowException(
                     "TaskInstance with dag_id: %s, task_id: %s, run_id: %s and map_index: %s is not found",
                     self.task_instance.dag_id,
                     self.task_instance.task_id,
                     self.task_instance.run_id,
                     self.task_instance.map_index,
                 )

Plus a unit test in
providers/cncf/kubernetes/tests/unit/cncf/kubernetes/triggers/test_pod.py
covering both the mapped and non-mapped branches.


How to reproduce

Minimal repro DAG:

from datetime import datetime, timezone
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
from airflow.sdk import dag, task_group


@task_group(group_id="map_group")
def group(idx: int):
    KubernetesPodOperator(
        task_id="probe",
        image="docker.io/library/alpine:3.20",
        cmds=["/bin/sh", "-c"],
        arguments=[f'echo idx={idx}; sleep 600; exit 0'],
        deferrable=True,
        on_kill_action="delete_pod",
    )


@dag(
    dag_id="kpo_mapped_safe_to_cancel_bug",
    schedule=None,
    start_date=datetime(2026, 1, 1, tzinfo=timezone.utc),
    catchup=False,
)
def bug_repro():
    group.expand(idx=[0, 1, 2])


bug_repro()

Steps:

  1. Trigger the DAG.
  2. Wait for the three map_group.probe mapped TIs to enter DEFERRED state
    (the pods are sleeping).
  3. Mark the dag run as Failed in the UI.
  4. Observe: pods stay Running for the full 600s sleep instead of being
    deleted. The triggerer log contains the Could not determine task state during cleanup; skipping pod deletion to be safe. warning from
    the snippet above.

Removing the .expand(...) (i.e. a single non-mapped KubernetesPodOperator)
makes the bug go away — non-mapped TIs use the plain task_id key in the
API response, so the trigger's lookup succeeds.

Operating System

Container base: apache/airflow:slim-3.2.1-python3.12, running on Amazon Linux 2 nodes (EKS, ARM64 / Graviton). Not OS-specific — the mismatch is purely Python-level between the trigger and the execution API.

Deployment

Official Apache Airflow Helm Chart

Apache Airflow Provider(s)

cncf-kubernetes

Versions of Apache Airflow Providers

apache-airflow-providers-cncf-kubernetes==10.16.0

Official Helm Chart version

1.19.0

Kubernetes Version

Amazon EKS 1.33 (Kubernetes 1.33)

Helm Chart configuration

Not Applicable

Docker Image customizations

Not Applicable

Anything else?

Latent since apache-airflow-providers-cncf-kubernetes==10.15.0, where
the safe_to_cancel mechanism was introduced in #62401. Before 10.15
the trigger had no cleanup() / safe_to_cancel() and so the lookup
mismatch had no observable effect (the pod also wasn't deleted, but for
unrelated reasons).

We are currently running a local Airflow plugin that monkey-patches
KubernetesPodTrigger.get_task_state with the diff above as a
workaround. Happy to share if useful — but the upstream fix is small
enough that a plugin shouldn't be needed long-term.

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

No one assigned

    Labels

    area:providerskind:bugThis is a clearly a bugneeds-triagelabel for new issues that we didn't triage yetpriority:highHigh priority bug that should be patched quickly but does not require immediate new releaseprovider:cncf-kubernetesKubernetes (k8s) provider related issues

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions