diff --git a/tests/providers/cncf/kubernetes/operators/test_pod.py b/tests/providers/cncf/kubernetes/operators/test_pod.py index db10bff330b01..022e4f9bb79af 100644 --- a/tests/providers/cncf/kubernetes/operators/test_pod.py +++ b/tests/providers/cncf/kubernetes/operators/test_pod.py @@ -25,6 +25,7 @@ import pendulum import pytest from kubernetes.client import ApiClient, V1Pod, V1PodSecurityContext, V1PodStatus, models as k8s +from kubernetes.client.exceptions import ApiException from urllib3 import HTTPResponse from airflow.exceptions import AirflowException, AirflowSkipException, TaskDeferred @@ -1998,6 +1999,31 @@ def test_async_write_logs_should_execute_successfully( else: mock_manager.return_value.read_pod_logs.assert_not_called() + @pytest.mark.parametrize("get_logs", [True]) + @patch(KUB_OP_PATH.format("post_complete_action")) + @patch(KUB_OP_PATH.format("extract_xcom")) + @patch(HOOK_CLASS) + @patch(KUB_OP_PATH.format("pod_manager")) + def test_async_write_logs_handler_api_exception( + self, mock_manager, mocked_hook, mock_extract_xcom, get_logs + ): + mock_manager.read_pod_logs.return_value = ApiException(status=404) + mocked_hook.return_value.get_pod.return_value = k8s.V1Pod( + metadata=k8s.V1ObjectMeta(name=TEST_NAME, namespace=TEST_NAMESPACE) + ) + mock_extract_xcom.return_value = "{}" + k = KubernetesPodOperator( + task_id="task", + get_logs=get_logs, + deferrable=True, + ) + self.run_pod_async(k) + mock_k = k + mock_k.log.warning = MagicMock(return_value='test') + + if get_logs: + mock_k.log.warning.assert_called_once() + @pytest.mark.parametrize( "log_pod_spec_on_failure,expect_match", [