From d15b0d54dcafbdeb7c31d2d4ca182302f22ad229 Mon Sep 17 00:00:00 2001 From: Rahul Goyal Date: Wed, 4 May 2022 23:34:00 +0530 Subject: [PATCH 1/8] Fixed defect --- .../providers/cncf/kubernetes/utils/pod_manager.py | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/airflow/providers/cncf/kubernetes/utils/pod_manager.py b/airflow/providers/cncf/kubernetes/utils/pod_manager.py index 993ba12e313f..31cdef64cb32 100644 --- a/airflow/providers/cncf/kubernetes/utils/pod_manager.py +++ b/airflow/providers/cncf/kubernetes/utils/pod_manager.py @@ -368,20 +368,25 @@ def extract_xcom(self, pod: V1Pod) -> str: _preload_content=False, ) ) as resp: - result = self._exec_pod_command(resp, f'cat {PodDefaults.XCOM_MOUNT_PATH}/return.json') + result = self._exec_pod_command(resp, f'cat {PodDefaults.XCOM_MOUNT_PATH}/return.json', True) self._exec_pod_command(resp, 'kill -s SIGINT 1') if result is None: raise AirflowException(f'Failed to extract xcom from pod: {pod.metadata.name}') return result - def _exec_pod_command(self, resp, command: str) -> Optional[str]: + def _exec_pod_command(self, resp, command: str, extract_full_content=False) -> Optional[str]: if resp.is_open(): self.log.info('Running command... %s\n', command) resp.write_stdin(command + '\n') while resp.is_open(): resp.update(timeout=1) + res = None if resp.peek_stdout(): - return resp.read_stdout() + res = resp.read_stdout() + if extract_full_content: + while resp.peek_stdout(): + res = res + resp.read_stdout() + return res if resp.peek_stderr(): self.log.info("stderr from command: %s", resp.read_stderr()) break From 8d1a652e49aa1b1cee33cd510d33fb20ce777826 Mon Sep 17 00:00:00 2001 From: rahulgoyal2987 Date: Thu, 5 May 2022 01:38:56 +0530 Subject: [PATCH 2/8] Update pod_manager.py --- airflow/providers/cncf/kubernetes/utils/pod_manager.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/airflow/providers/cncf/kubernetes/utils/pod_manager.py b/airflow/providers/cncf/kubernetes/utils/pod_manager.py index 31cdef64cb32..8ffe06712509 100644 --- a/airflow/providers/cncf/kubernetes/utils/pod_manager.py +++ b/airflow/providers/cncf/kubernetes/utils/pod_manager.py @@ -383,9 +383,9 @@ def _exec_pod_command(self, resp, command: str, extract_full_content=False) -> O res = None if resp.peek_stdout(): res = resp.read_stdout() - if extract_full_content: - while resp.peek_stdout(): - res = res + resp.read_stdout() + if extract_full_content: + while resp.peek_stdout(): + res = res + resp.read_stdout() return res if resp.peek_stderr(): self.log.info("stderr from command: %s", resp.read_stderr()) From 756ddf784fe763c937ee2f5ff37b7155ea844a19 Mon Sep 17 00:00:00 2001 From: Rahul Goyal Date: Sat, 7 May 2022 07:25:36 +0530 Subject: [PATCH 3/8] Fixed review comments --- .../cncf/kubernetes/utils/pod_manager.py | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/airflow/providers/cncf/kubernetes/utils/pod_manager.py b/airflow/providers/cncf/kubernetes/utils/pod_manager.py index 8ffe06712509..ae8066f41b52 100644 --- a/airflow/providers/cncf/kubernetes/utils/pod_manager.py +++ b/airflow/providers/cncf/kubernetes/utils/pod_manager.py @@ -368,26 +368,27 @@ def extract_xcom(self, pod: V1Pod) -> str: _preload_content=False, ) ) as resp: - result = self._exec_pod_command(resp, f'cat {PodDefaults.XCOM_MOUNT_PATH}/return.json', True) + result = self._exec_pod_command(resp, f'cat {PodDefaults.XCOM_MOUNT_PATH}/return.json') self._exec_pod_command(resp, 'kill -s SIGINT 1') if result is None: raise AirflowException(f'Failed to extract xcom from pod: {pod.metadata.name}') return result - def _exec_pod_command(self, resp, command: str, extract_full_content=False) -> Optional[str]: + def _exec_pod_command(self, resp, command: str) -> Optional[str]: if resp.is_open(): self.log.info('Running command... %s\n', command) resp.write_stdin(command + '\n') while resp.is_open(): resp.update(timeout=1) res = None - if resp.peek_stdout(): - res = resp.read_stdout() - if extract_full_content: - while resp.peek_stdout(): - res = res + resp.read_stdout() + while resp.peek_stdout(): + res = res + resp.read_stdout() + if res: return res - if resp.peek_stderr(): - self.log.info("stderr from command: %s", resp.read_stderr()) + error_res = None + while resp.peek_stderr(): + error_res = error_res + resp.read_stderr() + if error_res: + self.log.info("stderr from command: %s", error_res) break return None From 72c1dfb9c83bde9455b139ea40a8a5d439c97b39 Mon Sep 17 00:00:00 2001 From: Rahul Goyal Date: Wed, 4 May 2022 23:34:00 +0530 Subject: [PATCH 4/8] Fixed defect --- .../providers/cncf/kubernetes/utils/pod_manager.py | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/airflow/providers/cncf/kubernetes/utils/pod_manager.py b/airflow/providers/cncf/kubernetes/utils/pod_manager.py index 993ba12e313f..31cdef64cb32 100644 --- a/airflow/providers/cncf/kubernetes/utils/pod_manager.py +++ b/airflow/providers/cncf/kubernetes/utils/pod_manager.py @@ -368,20 +368,25 @@ def extract_xcom(self, pod: V1Pod) -> str: _preload_content=False, ) ) as resp: - result = self._exec_pod_command(resp, f'cat {PodDefaults.XCOM_MOUNT_PATH}/return.json') + result = self._exec_pod_command(resp, f'cat {PodDefaults.XCOM_MOUNT_PATH}/return.json', True) self._exec_pod_command(resp, 'kill -s SIGINT 1') if result is None: raise AirflowException(f'Failed to extract xcom from pod: {pod.metadata.name}') return result - def _exec_pod_command(self, resp, command: str) -> Optional[str]: + def _exec_pod_command(self, resp, command: str, extract_full_content=False) -> Optional[str]: if resp.is_open(): self.log.info('Running command... %s\n', command) resp.write_stdin(command + '\n') while resp.is_open(): resp.update(timeout=1) + res = None if resp.peek_stdout(): - return resp.read_stdout() + res = resp.read_stdout() + if extract_full_content: + while resp.peek_stdout(): + res = res + resp.read_stdout() + return res if resp.peek_stderr(): self.log.info("stderr from command: %s", resp.read_stderr()) break From 12ac959d31986601d49d897a422304b17471a271 Mon Sep 17 00:00:00 2001 From: rahulgoyal2987 Date: Thu, 5 May 2022 01:38:56 +0530 Subject: [PATCH 5/8] Update pod_manager.py --- airflow/providers/cncf/kubernetes/utils/pod_manager.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/airflow/providers/cncf/kubernetes/utils/pod_manager.py b/airflow/providers/cncf/kubernetes/utils/pod_manager.py index 31cdef64cb32..8ffe06712509 100644 --- a/airflow/providers/cncf/kubernetes/utils/pod_manager.py +++ b/airflow/providers/cncf/kubernetes/utils/pod_manager.py @@ -383,9 +383,9 @@ def _exec_pod_command(self, resp, command: str, extract_full_content=False) -> O res = None if resp.peek_stdout(): res = resp.read_stdout() - if extract_full_content: - while resp.peek_stdout(): - res = res + resp.read_stdout() + if extract_full_content: + while resp.peek_stdout(): + res = res + resp.read_stdout() return res if resp.peek_stderr(): self.log.info("stderr from command: %s", resp.read_stderr()) From 21e2c727c3a42a3cb2c7f126d05a41f2ffb524eb Mon Sep 17 00:00:00 2001 From: Rahul Goyal Date: Sat, 7 May 2022 07:25:36 +0530 Subject: [PATCH 6/8] Fixed review comments --- .../cncf/kubernetes/utils/pod_manager.py | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/airflow/providers/cncf/kubernetes/utils/pod_manager.py b/airflow/providers/cncf/kubernetes/utils/pod_manager.py index 8ffe06712509..ae8066f41b52 100644 --- a/airflow/providers/cncf/kubernetes/utils/pod_manager.py +++ b/airflow/providers/cncf/kubernetes/utils/pod_manager.py @@ -368,26 +368,27 @@ def extract_xcom(self, pod: V1Pod) -> str: _preload_content=False, ) ) as resp: - result = self._exec_pod_command(resp, f'cat {PodDefaults.XCOM_MOUNT_PATH}/return.json', True) + result = self._exec_pod_command(resp, f'cat {PodDefaults.XCOM_MOUNT_PATH}/return.json') self._exec_pod_command(resp, 'kill -s SIGINT 1') if result is None: raise AirflowException(f'Failed to extract xcom from pod: {pod.metadata.name}') return result - def _exec_pod_command(self, resp, command: str, extract_full_content=False) -> Optional[str]: + def _exec_pod_command(self, resp, command: str) -> Optional[str]: if resp.is_open(): self.log.info('Running command... %s\n', command) resp.write_stdin(command + '\n') while resp.is_open(): resp.update(timeout=1) res = None - if resp.peek_stdout(): - res = resp.read_stdout() - if extract_full_content: - while resp.peek_stdout(): - res = res + resp.read_stdout() + while resp.peek_stdout(): + res = res + resp.read_stdout() + if res: return res - if resp.peek_stderr(): - self.log.info("stderr from command: %s", resp.read_stderr()) + error_res = None + while resp.peek_stderr(): + error_res = error_res + resp.read_stderr() + if error_res: + self.log.info("stderr from command: %s", error_res) break return None From 08db0774f7ba637cfba93a09618d6d2aa9056311 Mon Sep 17 00:00:00 2001 From: Rahul Goyal Date: Tue, 10 May 2022 09:37:23 +0530 Subject: [PATCH 7/8] Fixed defects --- airflow/providers/cncf/kubernetes/utils/pod_manager.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/airflow/providers/cncf/kubernetes/utils/pod_manager.py b/airflow/providers/cncf/kubernetes/utils/pod_manager.py index ae8066f41b52..6622590844d2 100644 --- a/airflow/providers/cncf/kubernetes/utils/pod_manager.py +++ b/airflow/providers/cncf/kubernetes/utils/pod_manager.py @@ -380,15 +380,15 @@ def _exec_pod_command(self, resp, command: str) -> Optional[str]: resp.write_stdin(command + '\n') while resp.is_open(): resp.update(timeout=1) - res = None + res = "" while resp.peek_stdout(): res = res + resp.read_stdout() - if res: - return res error_res = None while resp.peek_stderr(): error_res = error_res + resp.read_stderr() if error_res: self.log.info("stderr from command: %s", error_res) break + if res: + return res return None From 6fe3a9839b95666effa641b333cf979383d2d848 Mon Sep 17 00:00:00 2001 From: Rahul Goyal Date: Tue, 10 May 2022 09:58:40 +0530 Subject: [PATCH 8/8] Fixed defects --- airflow/providers/cncf/kubernetes/utils/pod_manager.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/airflow/providers/cncf/kubernetes/utils/pod_manager.py b/airflow/providers/cncf/kubernetes/utils/pod_manager.py index da2f7416869d..46e593a2c93d 100644 --- a/airflow/providers/cncf/kubernetes/utils/pod_manager.py +++ b/airflow/providers/cncf/kubernetes/utils/pod_manager.py @@ -375,20 +375,20 @@ def extract_xcom(self, pod: V1Pod) -> str: return result def _exec_pod_command(self, resp, command: str) -> Optional[str]: + res = None if resp.is_open(): self.log.info('Running command... %s\n', command) resp.write_stdin(command + '\n') while resp.is_open(): resp.update(timeout=1) - res = "" while resp.peek_stdout(): - res = res + resp.read_stdout() - error_res = "" + res = res + resp.read_stdout() if res else resp.read_stdout() + error_res = None while resp.peek_stderr(): - error_res = error_res + resp.read_stderr() + error_res = error_res + resp.read_stderr() if error_res else resp.read_stderr() if error_res: self.log.info("stderr from command: %s", error_res) break if res: return res - return None + return res