From 28adce5ec719006a49688c7283412c3df6a7992f Mon Sep 17 00:00:00 2001 From: Daniel Standish <15932138+dstandish@users.noreply.github.com> Date: Sat, 4 Nov 2023 17:13:36 -0700 Subject: [PATCH 1/4] Move "reconcile containers" logic to own method for clarity Easier to follow this way. --- .../cncf/kubernetes/utils/pod_manager.py | 67 ++++++++++++------- .../cncf/kubernetes/utils/test_pod_manager.py | 2 +- 2 files changed, 42 insertions(+), 27 deletions(-) diff --git a/airflow/providers/cncf/kubernetes/utils/pod_manager.py b/airflow/providers/cncf/kubernetes/utils/pod_manager.py index 2d5abec0e96a8..943af47be5672 100644 --- a/airflow/providers/cncf/kubernetes/utils/pod_manager.py +++ b/airflow/providers/cncf/kubernetes/utils/pod_manager.py @@ -476,53 +476,68 @@ def consume_logs( else: # follow requested, but container is done break - def fetch_requested_container_logs( - self, pod: V1Pod, container_logs: Iterable[str] | str | Literal[True], follow_logs=False - ) -> None: - """ - Follow the logs of containers in the specified pod and publish it to airflow logging. - - Returns when all the containers exit. - """ - all_containers = self.get_container_names(pod) - if all_containers: - if isinstance(container_logs, str): + def _reconcile_requested_log_containers( + self, requested: Iterable[str] | str | bool, actual: list[str], pod_name + ) -> list[str]: + """Return actual containers based on requested.""" + containers_to_log = [] + if actual: + if isinstance(requested, str): # fetch logs only for requested container if only one container is provided - if container_logs in all_containers: - self.fetch_container_logs(pod=pod, container_name=container_logs, follow=follow_logs) + if requested in actual: + containers_to_log.append(requested) else: self.log.error( "container %s whose logs were requested not found in the pod %s", - container_logs, - pod.metadata.name, + requested, + pod_name, ) - elif isinstance(container_logs, bool): + elif isinstance(requested, bool): # if True is provided, get logs for all the containers - if container_logs is True: - for container_name in all_containers: - self.fetch_container_logs(pod=pod, container_name=container_name, follow=follow_logs) + if requested is True: + containers_to_log.extend(actual) else: self.log.error( "False is not a valid value for container_logs", ) else: # if a sequence of containers are provided, iterate for every container in the pod - if isinstance(container_logs, Iterable): - for container in container_logs: - if container in all_containers: - self.fetch_container_logs(pod=pod, container_name=container, follow=follow_logs) + if isinstance(requested, Iterable): + for container in requested: + if container in actual: + containers_to_log.append(container) else: self.log.error( "Container %s whose logs were requests not found in the pod %s", container, - pod.metadata.name, + pod_name, ) else: self.log.error( - "Invalid type %s specified for container names input parameter", type(container_logs) + "Invalid type %s specified for container names input parameter", type(requested) ) else: - self.log.error("Could not retrieve containers for the pod: %s", pod.metadata.name) + self.log.error("Could not retrieve containers for the pod: %s", pod_name) + return containers_to_log + + def fetch_requested_container_logs( + self, pod: V1Pod, containers: Iterable[str] | str | Literal[True], follow_logs=False + ) -> None: + """ + Follow the logs of containers in the specified pod and publish it to airflow logging. + + Returns when all the containers exit. + + :meta private: + """ + all_containers = self.get_container_names(pod) + containers_to_log = self._reconcile_requested_log_containers( + requested=containers, + actual=all_containers, + pod_name=pod.metadata.name, + ) + for c in containers_to_log: + self.fetch_container_logs(pod=pod, container_name=c, follow=follow_logs) def await_container_completion(self, pod: V1Pod, container_name: str) -> None: """ diff --git a/tests/providers/cncf/kubernetes/utils/test_pod_manager.py b/tests/providers/cncf/kubernetes/utils/test_pod_manager.py index 11dce0e17b474..e6f071559b031 100644 --- a/tests/providers/cncf/kubernetes/utils/test_pod_manager.py +++ b/tests/providers/cncf/kubernetes/utils/test_pod_manager.py @@ -421,7 +421,7 @@ def test_fetch_requested_container_logs(self, container_is_running, container_lo ) self.pod_manager.fetch_requested_container_logs( - pod=mock_pod, container_logs=container_logs, follow_logs=follow + pod=mock_pod, containers=container_logs, follow_logs=follow ) calls = {tuple(x[1].values()) for x in container_is_running.call_args_list} pod = self.pod_manager.read_pod.return_value From 36c986f9a16246087d5d0ad240a4d64c71db980c Mon Sep 17 00:00:00 2001 From: Daniel Standish <15932138+dstandish@users.noreply.github.com> Date: Sat, 4 Nov 2023 17:20:48 -0700 Subject: [PATCH 2/4] fixup! Move "reconcile containers" logic to own method for clarity --- airflow/providers/cncf/kubernetes/utils/pod_manager.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/airflow/providers/cncf/kubernetes/utils/pod_manager.py b/airflow/providers/cncf/kubernetes/utils/pod_manager.py index 943af47be5672..75ff82fc2fa73 100644 --- a/airflow/providers/cncf/kubernetes/utils/pod_manager.py +++ b/airflow/providers/cncf/kubernetes/utils/pod_manager.py @@ -381,6 +381,8 @@ def fetch_container_logs( Between when the pod starts and logs being available, there might be a delay due to CSR not approved and signed yet. In such situation, ApiException is thrown. This is why we are retrying on this specific exception. + + :meta private: """ @tenacity.retry( From 522e5b82c81f90146d7cc8ff0253edebe404661a Mon Sep 17 00:00:00 2001 From: Daniel Standish <15932138+dstandish@users.noreply.github.com> Date: Sat, 4 Nov 2023 17:22:26 -0700 Subject: [PATCH 3/4] fixup! fixup! Move "reconcile containers" logic to own method for clarity --- airflow/providers/cncf/kubernetes/operators/pod.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/providers/cncf/kubernetes/operators/pod.py b/airflow/providers/cncf/kubernetes/operators/pod.py index 5b58b1bedb687..58e54a72d3650 100644 --- a/airflow/providers/cncf/kubernetes/operators/pod.py +++ b/airflow/providers/cncf/kubernetes/operators/pod.py @@ -611,7 +611,7 @@ def execute_sync(self, context: Context): if self.get_logs: self.pod_manager.fetch_requested_container_logs( pod=self.pod, - container_logs=self.container_logs, + containers=self.container_logs, follow_logs=True, ) if not self.get_logs or ( From caeebe1bbbd71ba59ae62159ad40f3c41aab2759 Mon Sep 17 00:00:00 2001 From: Daniel Standish <15932138+dstandish@users.noreply.github.com> Date: Tue, 7 Nov 2023 07:21:25 -0800 Subject: [PATCH 4/4] fix test --- tests/providers/cncf/kubernetes/operators/test_pod.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/tests/providers/cncf/kubernetes/operators/test_pod.py b/tests/providers/cncf/kubernetes/operators/test_pod.py index 4ad631ac4e604..ae49d7fe7d031 100644 --- a/tests/providers/cncf/kubernetes/operators/test_pod.py +++ b/tests/providers/cncf/kubernetes/operators/test_pod.py @@ -1371,9 +1371,7 @@ def test_get_logs_but_not_for_base_container( pod = self.run_pod(k) # check that the base container is not included in the logs - mock_fetch_log.assert_called_once_with( - pod=pod, container_logs=["some_init_container"], follow_logs=True - ) + mock_fetch_log.assert_called_once_with(pod=pod, containers=["some_init_container"], follow_logs=True) # check that KPO waits for the base container to complete before proceeding to extract XCom mock_await_container_completion.assert_called_once_with(pod=pod, container_name="base") # check that we wait for the xcom sidecar to start before extracting XCom