From c47703103982ec4730ea28c8a5eda12ed2ce008a Mon Sep 17 00:00:00 2001 From: Sam Wheating Date: Thu, 24 Aug 2023 11:22:16 -0700 Subject: [PATCH] Inspect container state rather than last_state when deciding whether to skip (#33702) --- .../providers/cncf/kubernetes/operators/pod.py | 6 +++--- .../test_kubernetes_pod_operator.py | 18 +++++++++++++++++- .../cncf/kubernetes/operators/test_pod.py | 8 ++++---- 3 files changed, 24 insertions(+), 8 deletions(-) diff --git a/airflow/providers/cncf/kubernetes/operators/pod.py b/airflow/providers/cncf/kubernetes/operators/pod.py index e32e8b4c1f2f7..28d603e0eaef4 100644 --- a/airflow/providers/cncf/kubernetes/operators/pod.py +++ b/airflow/providers/cncf/kubernetes/operators/pod.py @@ -731,10 +731,10 @@ def cleanup(self, pod: k8s.V1Pod, remote_pod: k8s.V1Pod): (x for x in container_statuses if x.name == self.base_container_name), None ) exit_code = ( - base_container_status.last_state.terminated.exit_code + base_container_status.state.terminated.exit_code if base_container_status - and base_container_status.last_state - and base_container_status.last_state.terminated + and base_container_status.state + and base_container_status.state.terminated else None ) if exit_code in self.skip_on_exit_code: diff --git a/kubernetes_tests/test_kubernetes_pod_operator.py b/kubernetes_tests/test_kubernetes_pod_operator.py index a1182581f97d3..006ddfc788de1 100644 --- a/kubernetes_tests/test_kubernetes_pod_operator.py +++ b/kubernetes_tests/test_kubernetes_pod_operator.py @@ -35,7 +35,7 @@ from kubernetes.client.rest import ApiException from pytest import param -from airflow.exceptions import AirflowException +from airflow.exceptions import AirflowException, AirflowSkipException from airflow.models import DAG, Connection, DagRun, TaskInstance from airflow.providers.cncf.kubernetes.hooks.kubernetes import KubernetesHook from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator @@ -225,6 +225,22 @@ def test_delete_operator_pod(self, mock_get_connection): assert self.expected_pod["spec"] == actual_pod["spec"] assert self.expected_pod["metadata"]["labels"] == actual_pod["metadata"]["labels"] + def test_skip_on_specified_exit_code(self, mock_get_connection): + k = KubernetesPodOperator( + namespace="default", + image="ubuntu:16.04", + cmds=["bash", "-cx"], + arguments=["exit 42"], + task_id=str(uuid4()), + in_cluster=False, + do_xcom_push=False, + is_delete_operator_pod=True, + skip_on_exit_code=42, + ) + context = create_context(k) + with pytest.raises(AirflowSkipException): + k.execute(context) + def test_already_checked_on_success(self, mock_get_connection): """ When ``is_delete_operator_pod=False``, pod should have 'already_checked' diff --git a/tests/providers/cncf/kubernetes/operators/test_pod.py b/tests/providers/cncf/kubernetes/operators/test_pod.py index daf0ea90dd239..3ada6218a5329 100644 --- a/tests/providers/cncf/kubernetes/operators/test_pod.py +++ b/tests/providers/cncf/kubernetes/operators/test_pod.py @@ -1309,10 +1309,10 @@ def test_task_skip_when_pod_exit_with_certain_code( base_container = MagicMock() base_container.name = k.base_container_name - base_container.last_state.terminated.exit_code = actual_exit_code + base_container.state.terminated.exit_code = actual_exit_code sidecar_container = MagicMock() sidecar_container.name = "airflow-xcom-sidecar" - sidecar_container.last_state.terminated.exit_code = 0 + sidecar_container.state.terminated.exit_code = 0 remote_pod.return_value.status.container_statuses = [base_container, sidecar_container] remote_pod.return_value.status.phase = "Succeeded" if actual_exit_code == 0 else "Failed" @@ -1569,10 +1569,10 @@ def test_async_create_pod_with_skip_on_exit_code_should_skip( base_container = MagicMock() base_container.name = k.base_container_name - base_container.last_state.terminated.exit_code = actual_exit_code + base_container.state.terminated.exit_code = actual_exit_code sidecar_container = MagicMock() sidecar_container.name = "airflow-xcom-sidecar" - sidecar_container.last_state.terminated.exit_code = 0 + sidecar_container.state.terminated.exit_code = 0 remote_pod = MagicMock() remote_pod.status.phase = pod_status remote_pod.status.container_statuses = [base_container, sidecar_container]