Skip to content

Commit

Permalink
Inspect container state rather than last_state when deciding whether …
Browse files Browse the repository at this point in the history
…to skip (#33702)
  • Loading branch information
SamWheating committed Aug 24, 2023
1 parent f971ba2 commit c477031
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 8 deletions.
6 changes: 3 additions & 3 deletions airflow/providers/cncf/kubernetes/operators/pod.py
Original file line number Diff line number Diff line change
Expand Up @@ -731,10 +731,10 @@ def cleanup(self, pod: k8s.V1Pod, remote_pod: k8s.V1Pod):
(x for x in container_statuses if x.name == self.base_container_name), None
)
exit_code = (
base_container_status.last_state.terminated.exit_code
base_container_status.state.terminated.exit_code
if base_container_status
and base_container_status.last_state
and base_container_status.last_state.terminated
and base_container_status.state
and base_container_status.state.terminated
else None
)
if exit_code in self.skip_on_exit_code:
Expand Down
18 changes: 17 additions & 1 deletion kubernetes_tests/test_kubernetes_pod_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
from kubernetes.client.rest import ApiException
from pytest import param

from airflow.exceptions import AirflowException
from airflow.exceptions import AirflowException, AirflowSkipException
from airflow.models import DAG, Connection, DagRun, TaskInstance
from airflow.providers.cncf.kubernetes.hooks.kubernetes import KubernetesHook
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
Expand Down Expand Up @@ -225,6 +225,22 @@ def test_delete_operator_pod(self, mock_get_connection):
assert self.expected_pod["spec"] == actual_pod["spec"]
assert self.expected_pod["metadata"]["labels"] == actual_pod["metadata"]["labels"]

def test_skip_on_specified_exit_code(self, mock_get_connection):
k = KubernetesPodOperator(
namespace="default",
image="ubuntu:16.04",
cmds=["bash", "-cx"],
arguments=["exit 42"],
task_id=str(uuid4()),
in_cluster=False,
do_xcom_push=False,
is_delete_operator_pod=True,
skip_on_exit_code=42,
)
context = create_context(k)
with pytest.raises(AirflowSkipException):
k.execute(context)

def test_already_checked_on_success(self, mock_get_connection):
"""
When ``is_delete_operator_pod=False``, pod should have 'already_checked'
Expand Down
8 changes: 4 additions & 4 deletions tests/providers/cncf/kubernetes/operators/test_pod.py
Original file line number Diff line number Diff line change
Expand Up @@ -1309,10 +1309,10 @@ def test_task_skip_when_pod_exit_with_certain_code(

base_container = MagicMock()
base_container.name = k.base_container_name
base_container.last_state.terminated.exit_code = actual_exit_code
base_container.state.terminated.exit_code = actual_exit_code
sidecar_container = MagicMock()
sidecar_container.name = "airflow-xcom-sidecar"
sidecar_container.last_state.terminated.exit_code = 0
sidecar_container.state.terminated.exit_code = 0
remote_pod.return_value.status.container_statuses = [base_container, sidecar_container]
remote_pod.return_value.status.phase = "Succeeded" if actual_exit_code == 0 else "Failed"

Expand Down Expand Up @@ -1569,10 +1569,10 @@ def test_async_create_pod_with_skip_on_exit_code_should_skip(

base_container = MagicMock()
base_container.name = k.base_container_name
base_container.last_state.terminated.exit_code = actual_exit_code
base_container.state.terminated.exit_code = actual_exit_code
sidecar_container = MagicMock()
sidecar_container.name = "airflow-xcom-sidecar"
sidecar_container.last_state.terminated.exit_code = 0
sidecar_container.state.terminated.exit_code = 0
remote_pod = MagicMock()
remote_pod.status.phase = pod_status
remote_pod.status.container_statuses = [base_container, sidecar_container]
Expand Down

0 comments on commit c477031

Please sign in to comment.