Skip to content

Fix KubernetesPodTrigger.get_task_state KeyError on mapped TIs (#67296)#67297

Merged
jscheffl merged 1 commit into
apache:mainfrom
paultmathew:fix/67296-kpo-trigger-mapped-ti-key
May 21, 2026
Merged

Fix KubernetesPodTrigger.get_task_state KeyError on mapped TIs (#67296)#67297
jscheffl merged 1 commit into
apache:mainfrom
paultmathew:fix/67296-kpo-trigger-mapped-ti-key

Conversation

@paultmathew
Copy link
Copy Markdown
Contributor

Fix a key-shape mismatch between the execution API's /states endpoint and
KubernetesPodTrigger.get_task_state: the endpoint suffixes the response
key with _{map_index} for mapped TIs, but the trigger looked the value
up by plain task_id, so the lookup KeyError-ed for every mapped
deferrable KubernetesPodOperator task. cleanup()'s broad
except Exception swallowed the error and defensively skipped
hook.delete_pod() -- leaving Mark Failed in the UI useless on mapped
deferrable KPO tasks. The pod stayed Running until
active_deadline_seconds expired (often hours).

For continuous deferrable pollers expanded via .expand(...) this also
caused overlapping-writer races against external systems on the next
schedule, because the failed run's pod was still alive when the next run
spawned its own pod.

This PR composes the lookup key with the _{map_index} suffix when the
TI is mapped, matching how the API serialises the response.

closes: #67296

Test plan

Unit tests

Three new tests in providers/cncf/kubernetes/tests/unit/cncf/kubernetes/triggers/test_pod.py:

  • test_get_task_state_uses_task_id_for_non_mapped_ti -- regression guard for the non-mapped branch (response keyed by plain task_id).
  • test_get_task_state_uses_composite_key_for_mapped_ti -- the bug repro: mapped TI with task_id=\"map_group.task_a\" + map_index=2 returns the state stored under key \"map_group.task_a_2\".
  • test_get_task_state_raises_when_mapped_key_missing -- pins the wrapped AirflowException shape so callers (e.g. safe_to_cancel's except Exception) keep matching.

Local run:

```
uv run --no-sync --project providers/cncf/kubernetes pytest \
providers/cncf/kubernetes/tests/unit/cncf/kubernetes/triggers/test_pod.py \
-k "get_task_state or safe_to_cancel"
```

Result: 5 passed, 2 skipped, 1 warning in 2.40s (2 skips are legacy Airflow < 3.3 paths unrelated to this PR).

End-to-end smoke test on an EKS sandbox cluster

Airflow 3.2.1 with apache-airflow-providers-cncf-kubernetes==10.16.0, deferrable KPO tasks inside a mapped @task_group.expand(...) (the same shape as the issue's repro DAG).

Before this PR (provider 10.16.0 unpatched) -- one of three TI logs after Mark Failed:

```
[17:46:25.889914Z] WARNING - Could not determine task state during cleanup;
skipping pod deletion to be safe.
AirflowException: ('TaskInstance with dag_id: %s, ...', '...',
'map_group.task_a', '...', 2)
,KeyError: 'map_group.task_a'
File pod.py, line 399 in get_task_state
```

Pods stayed Running for the full 600s sleep; nothing in the kubelet event stream until active_deadline_seconds (would have been 7200s here).

After this PR -- same DAG, same Mark Failed:

```
18:17:54 pod: starting task_a-0 (duration=600s exit_code=0) (x3)
18:20:07 kubelet: Killing: Stopping container base (x3)
18:20:07 pod: received SIGTERM (x3)
18:20:07 runtime: Task ... deleted with exit code 0 (x3)
```

End-to-end Mark Failed -> delete_pod -> kubelet SIGTERM -> pod-side
trap -> container exit 0 -> kubelet cleanup completes in ~1 second for
all three mapped TIs. The previous Could not determine task state warning
no longer fires.

Static checks

  • prek run ruff --files <changed>: pass
  • prek run ruff-format --files <changed>: pass

Was generative AI tooling used to co-author this PR?
  • Yes -- Cursor / Claude Opus 4.7

Generated-by: Cursor / Claude Opus 4.7 following the guidelines

Made with Cursor

…e#67296)

The execution API's /states endpoint encodes the response key as
``f"{task_id}_{map_index}"`` for mapped TIs but the trigger was looking
the value up by plain ``task_id``. For any mapped deferrable
KubernetesPodOperator task that lookup raised KeyError, which
cleanup()'s broad ``except Exception`` swallowed and skipped
``hook.delete_pod()`` -- so Mark Failed in the UI left the pod running
until ``active_deadline_seconds`` expired.

Compose the lookup key with the ``_{map_index}`` suffix when the TI is
mapped, matching how the API serialises the response. cleanup() now
sees the real state, ``safe_to_cancel()`` returns the right value, and
mark-failed actually deletes the pod within the grace period.

Co-authored-by: Cursor <cursoragent@cursor.com>
@boring-cyborg boring-cyborg Bot added area:providers provider:cncf-kubernetes Kubernetes (k8s) provider related issues labels May 21, 2026
@paultmathew paultmathew marked this pull request as ready for review May 21, 2026 20:17
@jscheffl jscheffl merged commit a73c626 into apache:main May 21, 2026
218 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:providers provider:cncf-kubernetes Kubernetes (k8s) provider related issues

Projects

None yet

Development

Successfully merging this pull request may close these issues.

KubernetesPodTrigger.get_task_state KeyError on mapped TIs (skips delete_pod)

2 participants