Skip to content

KubernetesPodOperator does not enforce execution_timeout semantics in Deferrable mode #67227

@paultmathew

Description

@paultmathew

Apache Airflow Provider(s)

cncf-kubernetes

Versions of Apache Airflow Providers

apache-airflow-providers-cncf-kubernetes (current main, reproduces against pod.py at HEAD)

Apache Airflow version

main (also reproduces on Airflow 3.2.x)

Operating System

Linux (EKS)

Deployment

Other

Deployment details

KubernetesExecutor on EKS, deferrable mode enabled.

What happened

When using KubernetesPodOperator(deferrable=True) with execution_timeout set, the Airflow task does not time out and the underlying pod continues to run well past the configured execution_timeout.

In non-deferrable mode, exceeding execution_timeout raises AirflowTaskTimeout, the task fails, and on_kill() deletes the pod. In deferrable mode the task transitions to DEFERRED immediately after the pod is launched, the synchronous execute() returns, and the signal.alarm-based timeout context manager that wraps execute() exits cleanly. There is no further enforcement of execution_timeout for the lifetime of the deferral, so the trigger keeps polling the pod indefinitely (bounded only by active_deadline_seconds on the pod itself, which defaults to ~1h or whatever the operator passed in).

This creates inconsistent behaviour between deferrable and non-deferrable execution modes.

This is the same class of issue addressed for DbtCloudRunJobOperator (#61467PR #66449) and AirbyteTriggerSyncOperator (#64048PR #64051).

What you think should happen instead

execution_timeout should be enforced consistently regardless of execution mode.

When a deferrable KubernetesPodOperator exceeds execution_timeout:

  • the Airflow task should fail due to execution timeout
  • the underlying pod should be deleted (kubelet sends SIGTERM, respecting terminationGracePeriodSeconds)

This ensures predictable timeout behaviour and prevents long-running or orphaned pods.

How to reproduce

from datetime import datetime, timedelta

from airflow import DAG
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator

with DAG(
    dag_id="kpo_deferrable_execution_timeout_repro",
    start_date=datetime(2026, 1, 1),
    schedule=None,
    catchup=False,
) as dag:
    KubernetesPodOperator(
        task_id="run_long_pod",
        namespace="default",
        image="alpine:3.20",
        cmds=["sh", "-c"],
        arguments=["sleep 1800"],
        deferrable=True,
        execution_timeout=timedelta(seconds=30),
    )
  1. Trigger the DAG.
  2. Observe that the task transitions to DEFERRED within seconds.
  3. Wait past the 30-second execution_timeout.

Observed behaviour

  • The Airflow task remains in DEFERRED state.
  • The pod continues running for the full 30 minutes (or until active_deadline_seconds fires, whichever comes first).
  • The task is never marked as failed due to timeout.

Expected behaviour

  • The Airflow task should be marked failed shortly after 30 seconds.
  • The pod should be deleted.

Anything else

Root cause

KubernetesPodOperator.execute() calls self.defer(trigger=trigger, method_name="trigger_reentry") (pod.py:952) without passing a timeout= kwarg. As a result, the resulting Trigger row has trigger_timeout=NULL, the triggerer's RunTrigger.timeout_after is None (triggerer_job_runner.py:786,795), and the trigger has no upper bound on its lifetime.

The framework-level execution_timeout enforcement is currently a no-op for any deferred task — the wrapping with timeout(...) block in task_runner.py:1789 only covers the synchronous portion of execute(), which exits cleanly when TaskDeferred is raised. There is a literal # TODO: handle timeout in case of deferral at task_runner.py:1782 acknowledging this gap.

This issue addresses the operator-specific symptom for KubernetesPodOperator, mirroring the pattern already merged for Airbyte and DbtCloud.

Proposed fix

Mirror the pattern from PR #64051:

  1. Compute an absolute execution_deadline from self.execution_timeout before deferring.
  2. Pass timeout=self.execution_timeout to self.defer(...) so the Trigger has a hard wait deadline.
  3. Pass execution_deadline (or equivalent) into KubernetesPodTrigger so it can emit a timeout event when the deadline is exceeded.
  4. Handle the timeout event in trigger_reentry / execute_complete, deleting the pod (best-effort; cancellation failures should be logged but not mask the timeout).
  5. The same logic must apply on re-deferral via trigger_reentry when logging_interval is set — each subsequent defer() should pass the remaining budget, not the full execution_timeout.

KubernetesPodTrigger already has safe_to_cancel/cleanup (Airflow ≤ 3.2) and the new BaseTrigger.on_kill() (Airflow 3.3.0+, PR #65590) for pod deletion, so the pod-cleanup half of the contract is already handled — only the execution_deadline plumbing is needed.

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

No one assigned

    Labels

    area:async-operatorsAIP-40: Deferrable ("Async") Operatorskind:bugThis is a clearly a bugpriority:highHigh priority bug that should be patched quickly but does not require immediate new releaseprovider:cncf-kubernetesKubernetes (k8s) provider related issues

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions