Skip to content

Commit

Permalink
Extend task context logging support for remote logging using GCP GCS
Browse files Browse the repository at this point in the history
With the addition of taxt context logging feature in PR apache#32646,
this PR extends the feature to GCP Cloud storage when is it set as
remote logging store. Here, backward compatibility is ensured for
older versions of Airflow that do not have the feature included in
Airflow Core.
  • Loading branch information
pankajkoti committed Nov 17, 2023
1 parent 0c6fd5b commit 4af74f0
Showing 1 changed file with 14 additions and 4 deletions.
18 changes: 14 additions & 4 deletions airflow/providers/google/cloud/log/gcs_task_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import shutil
from functools import cached_property
from pathlib import Path
from typing import Collection
from typing import TYPE_CHECKING, Collection

# not sure why but mypy complains on missing `storage` but it is clearly there and is importable
from google.cloud import storage # type: ignore[attr-defined]
Expand All @@ -36,6 +36,9 @@
from airflow.utils.log.file_task_handler import FileTaskHandler
from airflow.utils.log.logging_mixin import LoggingMixin

if TYPE_CHECKING:
from airflow.models.taskinstance import TaskInstance

_DEFAULT_SCOPESS = frozenset(
[
"https://www.googleapis.com/auth/devstorage.read_write",
Expand Down Expand Up @@ -96,6 +99,7 @@ def __init__(
**kwargs,
):
super().__init__(base_log_folder, filename_template)
self.handler: logging.FileHandler | None = None
self.remote_base = gcs_log_folder
self.log_relative_path = ""
self.closed = False
Expand Down Expand Up @@ -137,15 +141,21 @@ def client(self) -> storage.Client:
project=self.project_id if self.project_id else project_id,
)

def set_context(self, ti):
super().set_context(ti)
def set_context(self, ti: TaskInstance, *, identifier: str | None = None) -> None:
if getattr(self, "supports_task_context_logging", False):
super().set_context(ti, identifier=identifier)
else:
super().set_context(ti)
# Log relative path is used to construct local and remote
# log path to upload log files into GCS and read from the
# remote location.
if TYPE_CHECKING:
assert self.handler is not None

full_path = self.handler.baseFilename
self.log_relative_path = Path(full_path).relative_to(self.local_base).as_posix()
is_trigger_log_context = getattr(ti, "is_trigger_log_context", False)
self.upload_on_close = is_trigger_log_context or not ti.raw
self.upload_on_close = is_trigger_log_context or not getattr(ti, "raw", None)

def close(self):
"""Close and upload local log file to remote storage GCS."""
Expand Down

0 comments on commit 4af74f0

Please sign in to comment.