From f82fef6bffb8d9ee1e943da2d81c247eb91e6d3c Mon Sep 17 00:00:00 2001 From: Daniel Wolf <95075445+wolfdn@users.noreply.github.com> Date: Mon, 30 Mar 2026 07:03:41 +0000 Subject: [PATCH 1/2] Add retries for `_write_logs` method in `KubernetesPodOperator` --- .../cncf/kubernetes/operators/pod.py | 61 +++++++++-------- .../cncf/kubernetes/operators/test_pod.py | 66 ++++++++++++++++++- 2 files changed, 99 insertions(+), 28 deletions(-) diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/pod.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/pod.py index e1609a187e84b..88c3c4f71731b 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/pod.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/pod.py @@ -102,6 +102,8 @@ from airflow.providers.cncf.kubernetes.secret import Secret from airflow.sdk import Context +log = logging.getLogger(__name__) + alphanum_lower = string.ascii_lowercase + string.digits KUBE_CONFIG_ENV_VAR = "KUBECONFIG" @@ -972,7 +974,14 @@ def trigger_reentry(self, context: Context, event: dict[str, Any]) -> Any: if event["status"] in ("error", "failed", "timeout", "success"): if self.get_logs: - self._write_logs(self.pod, follow=follow, since_time=last_log_time) + try: + self._write_logs(self.pod, follow=follow, since_time=last_log_time) + except (HTTPError, ApiException) as e: + self.log.warning( + "Reading of logs interrupted with error %r. " + "Set log level to DEBUG for traceback.", + e if not isinstance(e, ApiException) else e.reason, + ) for callback in self.callbacks: callback.on_pod_completion( @@ -1035,32 +1044,32 @@ def _clean(self, event: dict[str, Any], result: dict | None, context: Context) - result=result, ) + @tenacity.retry( + stop=tenacity.stop_after_attempt(3), + wait=tenacity.wait_exponential(max=15), + retry=tenacity.retry_if_exception_type((HTTPError, ApiException)), + before_sleep=tenacity.before_sleep_log(log, logging.WARNING), + reraise=True, + ) def _write_logs(self, pod: k8s.V1Pod, follow: bool = False, since_time: DateTime | None = None) -> None: - try: - since_seconds = ( - math.ceil((datetime.datetime.now(tz=datetime.timezone.utc) - since_time).total_seconds()) - if since_time - else None - ) - logs = self.client.read_namespaced_pod_log( - name=pod.metadata.name, - namespace=pod.metadata.namespace, - container=self.base_container_name, - follow=follow, - timestamps=False, - since_seconds=since_seconds, - _preload_content=False, - ) - for raw_line in logs: - line = raw_line.decode("utf-8", errors="backslashreplace").rstrip("\n") - if line: - self.log.info("[%s] logs: %s", self.base_container_name, line) - except (HTTPError, ApiException) as e: - self.log.warning( - "Reading of logs interrupted with error %r; will retry. " - "Set log level to DEBUG for traceback.", - e if not isinstance(e, ApiException) else e.reason, - ) + since_seconds = ( + math.ceil((datetime.datetime.now(tz=datetime.timezone.utc) - since_time).total_seconds()) + if since_time + else None + ) + logs = self.client.read_namespaced_pod_log( + name=pod.metadata.name, + namespace=pod.metadata.namespace, + container=self.base_container_name, + follow=follow, + timestamps=False, + since_seconds=since_seconds, + _preload_content=False, + ) + for raw_line in logs: + line = raw_line.decode("utf-8", errors="backslashreplace").rstrip("\n") + if line: + self.log.info("[%s] logs: %s", self.base_container_name, line) def post_complete_action( self, *, pod: k8s.V1Pod, remote_pod: k8s.V1Pod, context: Context, result: dict | None, **kwargs diff --git a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_pod.py b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_pod.py index bc9742688ac8c..6f7b79fcd3b12 100644 --- a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_pod.py +++ b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_pod.py @@ -25,6 +25,7 @@ import pendulum import pytest +import tenacity from kubernetes.client import ApiClient, V1Pod, V1PodSecurityContext, V1PodStatus, models as k8s from kubernetes.client.exceptions import ApiException @@ -45,7 +46,7 @@ AirflowSkipException, TaskDeferred, ) -from airflow.utils import timezone +from airflow.sdk import timezone from airflow.utils.session import create_session from airflow.utils.types import DagRunType @@ -2764,7 +2765,7 @@ def test_async_write_logs_should_execute_successfully( @patch(HOOK_CLASS) @patch(KUB_OP_PATH.format("pod_manager")) def test_async_write_logs_handler_api_exception( - self, mock_manager, mocked_hook, mock_extract_xcom, post_complete_action, mocked_client + self, mock_manager, mocked_hook, mock_extract_xcom, mocked_client, post_complete_action ): mocked_client.read_namespaced_pod_log.side_effect = ApiException(status=404) mock_manager.await_pod_completion.side_effect = ApiException(status=404) @@ -2777,9 +2778,70 @@ def test_async_write_logs_handler_api_exception( get_logs=True, deferrable=True, ) + # Patch tenacity wait to avoid real delays from _write_logs retries + k._write_logs.retry.wait = tenacity.wait_none() self.run_pod_async(k) post_complete_action.assert_not_called() + @patch(KUB_OP_PATH.format("post_complete_action")) + @patch(KUB_OP_PATH.format("client")) + @patch(HOOK_CLASS) + @patch(KUB_OP_PATH.format("pod_manager")) + def test_write_logs_retries_on_api_exception( + self, mock_manager, mocked_hook, mocked_client, post_complete_action + ): + """Test that _write_logs retries on ApiException and succeeds on subsequent attempt.""" + test_logs = b"log line\n" + mocked_client.read_namespaced_pod_log.side_effect = [ + ApiException(status=500), + [test_logs], + ] + mock_manager.await_pod_completion.return_value = k8s.V1Pod( + metadata=k8s.V1ObjectMeta(name=TEST_NAME, namespace=TEST_NAMESPACE) + ) + mocked_hook.return_value.get_pod.return_value = k8s.V1Pod( + metadata=k8s.V1ObjectMeta(name=TEST_NAME, namespace=TEST_NAMESPACE) + ) + k = KubernetesPodOperator( + task_id="task", + get_logs=True, + deferrable=True, + ) + # Patch tenacity wait to avoid real delays in tests + k._write_logs.retry.wait = tenacity.wait_none() + self.run_pod_async(k) + assert mocked_client.read_namespaced_pod_log.call_count == 2 + post_complete_action.assert_called_once() + + @patch(KUB_OP_PATH.format("post_complete_action")) + @patch(KUB_OP_PATH.format("client")) + @patch(HOOK_CLASS) + @patch(KUB_OP_PATH.format("pod_manager")) + def test_write_logs_gives_up_after_max_retries( + self, mock_manager, mocked_hook, mocked_client, post_complete_action, caplog + ): + """Test that _write_logs gives up after 3 failed attempts and trigger_reentry catches the error.""" + mocked_client.read_namespaced_pod_log.side_effect = ApiException(status=500) + mock_manager.await_pod_completion.return_value = k8s.V1Pod( + metadata=k8s.V1ObjectMeta(name=TEST_NAME, namespace=TEST_NAMESPACE) + ) + mocked_hook.return_value.get_pod.return_value = k8s.V1Pod( + metadata=k8s.V1ObjectMeta(name=TEST_NAME, namespace=TEST_NAMESPACE) + ) + k = KubernetesPodOperator( + task_id="task", + get_logs=True, + deferrable=True, + ) + # Patch tenacity wait to avoid real delays in tests + k._write_logs.retry.wait = tenacity.wait_none() + self.run_pod_async(k) + # 3 attempts (stop_after_attempt(3)) + assert mocked_client.read_namespaced_pod_log.call_count > 1 + # trigger_reentry catches the error and continues; post_complete_action still called via _clean + post_complete_action.assert_called_once() + assert "Reading of logs interrupted with error" in caplog.text + @pytest.mark.parametrize( ("log_pod_spec_on_failure", "expect_match"), [ From a9a3981f61f6ed4313d9f3090b382461e51e51a0 Mon Sep 17 00:00:00 2001 From: Daniel Wolf <95075445+wolfdn@users.noreply.github.com> Date: Tue, 31 Mar 2026 05:55:36 +0000 Subject: [PATCH 2/2] Reset changes to timezone --- .../kubernetes/tests/unit/cncf/kubernetes/operators/test_pod.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_pod.py b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_pod.py index 6f7b79fcd3b12..412e0e08a63bc 100644 --- a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_pod.py +++ b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_pod.py @@ -46,7 +46,7 @@ AirflowSkipException, TaskDeferred, ) -from airflow.sdk import timezone +from airflow.utils import timezone from airflow.utils.session import create_session from airflow.utils.types import DagRunType