Skip to content

Commit

Permalink
Fix KubernetesPodTrigger startup timeout (#34579)
Browse files Browse the repository at this point in the history
  • Loading branch information
dzhigimont committed Sep 28, 2023
1 parent 729e5fb commit bd51200
Show file tree
Hide file tree
Showing 4 changed files with 64 additions and 2 deletions.
5 changes: 4 additions & 1 deletion airflow/providers/cncf/kubernetes/operators/pod.py
Expand Up @@ -679,7 +679,10 @@ def execute_complete(self, context: Context, event: dict, **kwargs):
return xcom_sidecar_output
finally:
istio_enabled = self.is_istio_enabled(pod)
pod = self.pod_manager.await_pod_completion(pod, istio_enabled, self.base_container_name)
# Skip await_pod_completion when the event is 'timeout' due to the pod can hang
# on the ErrImagePull or ContainerCreating step and it will never complete
if event["status"] != "timeout":
pod = self.pod_manager.await_pod_completion(pod, istio_enabled, self.base_container_name)
if pod is not None:
self.post_complete_action(
pod=pod,
Expand Down
2 changes: 1 addition & 1 deletion airflow/providers/cncf/kubernetes/triggers/pod.py
Expand Up @@ -166,7 +166,7 @@ async def run(self) -> AsyncIterator[TriggerEvent]: # type: ignore[override]
elif self.should_wait(pod_phase=pod_status, container_state=container_state):
self.log.info("Container is not completed and still working.")

if pod_status == PodPhase.PENDING and container_state == ContainerState.UNDEFINED:
if pod_status == PodPhase.PENDING and container_state != ContainerState.RUNNING:
delta = datetime.datetime.now(tz=datetime.timezone.utc) - self.trigger_start_time
if delta.total_seconds() >= self.startup_timeout:
message = (
Expand Down
24 changes: 24 additions & 0 deletions tests/providers/cncf/kubernetes/operators/test_pod.py
Expand Up @@ -1841,3 +1841,27 @@ class TestSubclassKPO(KubernetesPodOperator):

k = TestSubclassKPO(task_id="task")
assert k.container_logs == "test-base-container"


@patch(KUB_OP_PATH.format("post_complete_action"))
@patch(HOOK_CLASS)
@patch(KUB_OP_PATH.format("pod_manager"))
def test_async_skip_kpo_wait_termination_with_timeout_event(mock_manager, mocked_hook, post_complete_action):
metadata = {"metadata.name": TEST_NAME, "metadata.namespace": TEST_NAMESPACE}
pending_state = mock.MagicMock(**metadata, **{"status.phase": "Pending"})
mocked_hook.return_value.get_pod.return_value = pending_state
ti_mock = MagicMock()

event = {"status": "timeout", "message": "timeout", "name": TEST_NAME, "namespace": TEST_NAMESPACE}

k = KubernetesPodOperator(task_id="task", deferrable=True)

# assert that the AirflowException is raised when the timeout event is present
with pytest.raises(AirflowException):
k.execute_complete({"ti": ti_mock}, event)

# assert that the await_pod_completion is not called
mock_manager.await_pod_completion.assert_not_called()

# assert that the cleanup is called
post_complete_action.assert_called_once()
35 changes: 35 additions & 0 deletions tests/providers/cncf/kubernetes/triggers/test_pod.py
Expand Up @@ -27,6 +27,7 @@
from kubernetes.client import models as k8s

from airflow.providers.cncf.kubernetes.triggers.pod import ContainerState, KubernetesPodTrigger
from airflow.providers.cncf.kubernetes.utils.pod_manager import PodPhase
from airflow.triggers.base import TriggerEvent

TRIGGER_PATH = "airflow.providers.cncf.kubernetes.triggers.pod.KubernetesPodTrigger"
Expand Down Expand Up @@ -334,3 +335,37 @@ def test_define_container_state_should_execute_successfully(
)

assert expected_state == trigger.define_container_state(pod)

@pytest.mark.asyncio
@pytest.mark.parametrize("container_state", [ContainerState.WAITING, ContainerState.UNDEFINED])
@mock.patch(f"{TRIGGER_PATH}.define_container_state")
@mock.patch(f"{TRIGGER_PATH}._get_async_hook")
async def test_run_loop_return_timeout_event(
self, mock_hook, mock_method, trigger, caplog, container_state
):
trigger.trigger_start_time = TRIGGER_START_TIME - datetime.timedelta(minutes=2)
mock_hook.return_value.get_pod.return_value = self._mock_pod_result(
mock.MagicMock(
status=mock.MagicMock(
phase=PodPhase.PENDING,
)
)
)
mock_method.return_value = container_state

caplog.set_level(logging.INFO)

generator = trigger.run()
actual = await generator.asend(None)
assert (
TriggerEvent(
{
"name": POD_NAME,
"namespace": NAMESPACE,
"status": "timeout",
"message": f"Pod took longer than {STARTUP_TIMEOUT_SECS} seconds to start."
" Check the pod events in kubernetes to determine why.",
}
)
== actual
)

0 comments on commit bd51200

Please sign in to comment.