From 127c0725b9eb7c8be015ac10d74f963e3d6383ae Mon Sep 17 00:00:00 2001 From: Jarek Potiuk Date: Wed, 27 Dec 2023 19:50:53 +0100 Subject: [PATCH] Revert "Remove remaining Airflow 2.5 backcompat code from GCS Task Handler (#36443)" (#36453) This reverts commit 75faf1115d990746784e25280c0b326b3b557b86. --- .../google/cloud/log/gcs_task_handler.py | 41 ++++++++++++++++++- .../google/cloud/log/test_gcs_task_handler.py | 9 ++-- 2 files changed, 45 insertions(+), 5 deletions(-) diff --git a/airflow/providers/google/cloud/log/gcs_task_handler.py b/airflow/providers/google/cloud/log/gcs_task_handler.py index abc2bc8845b9a..9921bb8753f03 100644 --- a/airflow/providers/google/cloud/log/gcs_task_handler.py +++ b/airflow/providers/google/cloud/log/gcs_task_handler.py @@ -26,6 +26,7 @@ # not sure why but mypy complains on missing `storage` but it is clearly there and is importable from google.cloud import storage # type: ignore[attr-defined] +from packaging.version import Version from airflow.configuration import conf from airflow.exceptions import AirflowNotFoundException @@ -47,6 +48,18 @@ logger = logging.getLogger(__name__) +def get_default_delete_local_copy(): + """Load delete_local_logs conf if Airflow version > 2.6 and return False if not. + + TODO: delete this function when min airflow version >= 2.6. + """ + from airflow.version import version + + if Version(version) < Version("2.6"): + return False + return conf.getboolean("logging", "delete_local_logs") + + class GCSTaskHandler(FileTaskHandler, LoggingMixin): """ GCSTaskHandler is a python log handler that handles and reads task instance logs. @@ -95,8 +108,8 @@ def __init__( self.gcp_keyfile_dict = gcp_keyfile_dict self.scopes = gcp_scopes self.project_id = project_id - self.delete_local_copy = kwargs.get( - "delete_local_copy", conf.getboolean("logging", "delete_local_logs") + self.delete_local_copy = ( + kwargs["delete_local_copy"] if "delete_local_copy" in kwargs else get_default_delete_local_copy() ) @cached_property @@ -205,6 +218,30 @@ def _read_remote_logs(self, ti, try_number, metadata=None) -> tuple[list[str], l messages.append(f"Unable to read remote log {e}") return messages, logs + def _read(self, ti, try_number, metadata=None): + """ + Read logs of given task instance and try_number from GCS. + + If failed, read the log from task instance host machine. + + todo: when min airflow version >= 2.6, remove this method + + :param ti: task instance object + :param try_number: task instance try_number to read logs from + :param metadata: log metadata, + can be used for steaming log reading and auto-tailing. + """ + if hasattr(super(), "_read_remote_logs"): + # from Airflow 2.6, we don't implement the `_read` method. + # if parent has _read_remote_logs, we're >= 2.6 + return super()._read(ti, try_number, metadata) + + messages, logs = self._read_remote_logs(ti, try_number, metadata) + if not logs: + return super()._read(ti, try_number, metadata) + + return "".join([f"*** {x}\n" for x in messages]) + "\n".join(logs), {"end_of_log": True} + def gcs_write(self, log, remote_log_location) -> bool: """ Write the log to the remote location and return `True`; fail silently and return `False` on error. diff --git a/tests/providers/google/cloud/log/test_gcs_task_handler.py b/tests/providers/google/cloud/log/test_gcs_task_handler.py index a3e929b985334..2d4dd7340d805 100644 --- a/tests/providers/google/cloud/log/test_gcs_task_handler.py +++ b/tests/providers/google/cloud/log/test_gcs_task_handler.py @@ -248,8 +248,8 @@ def test_write_to_remote_on_close_failed_read_old_logs(self, mock_blob, mock_cli ) @pytest.mark.parametrize( - "delete_local_copy, expected_existence_of_local_copy", - [(True, False), (False, True)], + "delete_local_copy, expected_existence_of_local_copy, airflow_version", + [(True, False, "2.6.0"), (False, True, "2.6.0"), (True, True, "2.5.0"), (False, True, "2.5.0")], ) @mock.patch( "airflow.providers.google.cloud.log.gcs_task_handler.get_credentials_and_project_id", @@ -265,9 +265,12 @@ def test_close_with_delete_local_copy_conf( local_log_location, delete_local_copy, expected_existence_of_local_copy, + airflow_version, ): mock_blob.from_string.return_value.download_as_bytes.return_value = b"CONTENT" - with conf_vars({("logging", "delete_local_logs"): str(delete_local_copy)}): + with conf_vars({("logging", "delete_local_logs"): str(delete_local_copy)}), mock.patch( + "airflow.version.version", airflow_version + ): handler = GCSTaskHandler( base_log_folder=local_log_location, gcs_log_folder="gs://bucket/remote/log/location",