Skip to content

Commit

Permalink
Support google-cloud-logging` >=2.0.0 (#13801)
Browse files Browse the repository at this point in the history
  • Loading branch information
mik-laj committed Feb 3, 2021
1 parent 629abfd commit 0e8c77b
Show file tree
Hide file tree
Showing 4 changed files with 163 additions and 106 deletions.
1 change: 1 addition & 0 deletions airflow/providers/google/ADDITIONAL_INFO.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ Details are covered in the UPDATING.md files for each library, but there are som
| [``google-cloud-datacatalog``](https://pypi.org/project/google-cloud-datacatalog/) | ``>=0.5.0,<0.8`` | ``>=3.0.0,<4.0.0`` | [`UPGRADING.md`](https://github.com/googleapis/python-datacatalog/blob/master/UPGRADING.md) |
| [``google-cloud-dataproc``](https://pypi.org/project/google-cloud-dataproc/) | ``>=1.0.1,<2.0.0`` | ``>=2.2.0,<3.0.0`` | [`UPGRADING.md`](https://github.com/googleapis/python-dataproc/blob/master/UPGRADING.md) |
| [``google-cloud-kms``](https://pypi.org/project/google-cloud-os-login/) | ``>=1.2.1,<2.0.0`` | ``>=2.0.0,<3.0.0`` | [`UPGRADING.md`](https://github.com/googleapis/python-kms/blob/master/UPGRADING.md) |
| [``google-cloud-logging``](https://pypi.org/project/google-cloud-logging/) | ``>=1.14.0,<2.0.0`` | ``>=2.0.0,<3.0.0`` | [`UPGRADING.md`](https://github.com/googleapis/python-logging/blob/master/UPGRADING.md) |
| [``google-cloud-monitoring``](https://pypi.org/project/google-cloud-monitoring/) | ``>=0.34.0,<2.0.0`` | ``>=2.0.0,<3.0.0`` | [`UPGRADING.md`](https://github.com/googleapis/python-monitoring/blob/master/UPGRADING.md) |
| [``google-cloud-os-login``](https://pypi.org/project/google-cloud-os-login/) | ``>=1.0.0,<2.0.0`` | ``>=2.0.0,<3.0.0`` | [`UPGRADING.md`](https://github.com/googleapis/python-oslogin/blob/master/UPGRADING.md) |
| [``google-cloud-pubsub``](https://pypi.org/project/google-cloud-pubsub/) | ``>=1.0.0,<2.0.0`` | ``>=2.0.0,<3.0.0`` | [`UPGRADING.md`](https://github.com/googleapis/python-pubsub/blob/master/UPGRADING.md) |
Expand Down
51 changes: 37 additions & 14 deletions airflow/providers/google/cloud/log/stackdriver_task_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,12 @@

from cached_property import cached_property
from google.api_core.gapic_v1.client_info import ClientInfo
from google.auth.credentials import Credentials
from google.cloud import logging as gcp_logging
from google.cloud.logging import Resource
from google.cloud.logging.handlers.transports import BackgroundThreadTransport, Transport
from google.cloud.logging.resource import Resource
from google.cloud.logging_v2.services.logging_service_v2 import LoggingServiceV2Client
from google.cloud.logging_v2.types import ListLogEntriesRequest, ListLogEntriesResponse

from airflow import version
from airflow.models import TaskInstance
Expand Down Expand Up @@ -102,18 +105,33 @@ def __init__(
self.task_instance_hostname = 'default-hostname'

@cached_property
def _client(self) -> gcp_logging.Client:
"""Google Cloud Library API client"""
def _credentials_and_project(self) -> Tuple[Credentials, str]:
credentials, project = get_credentials_and_project_id(
key_path=self.gcp_key_path, scopes=self.scopes, disable_logging=True
)
return credentials, project

@property
def _client(self) -> gcp_logging.Client:
"""The Cloud Library API client"""
credentials, project = self._credentials_and_project
client = gcp_logging.Client(
credentials=credentials,
project=project,
client_info=ClientInfo(client_library_version='airflow_v' + version.version),
)
return client

@property
def _logging_service_client(self) -> LoggingServiceV2Client:
"""The Cloud logging service v2 client."""
credentials, _ = self._credentials_and_project
client = LoggingServiceV2Client(
credentials=credentials,
client_info=ClientInfo(client_library_version='airflow_v' + version.version),
)
return client

@cached_property
def _transport(self) -> Transport:
"""Object responsible for sending data to Stackdriver"""
Expand Down Expand Up @@ -214,9 +232,10 @@ def escale_label_value(value: str) -> str:
escaped_value = value.replace("\\", "\\\\").replace('"', '\\"')
return f'"{escaped_value}"'

_, project = self._credentials_and_project
log_filters = [
f'resource.type={escale_label_value(self.resource.type)}',
f'logName="projects/{self._client.project}/logs/{self.name}"',
f'logName="projects/{project}/logs/{self.name}"',
]

for key, value in self.resource.labels.items():
Expand Down Expand Up @@ -277,17 +296,21 @@ def _read_single_logs_page(self, log_filter: str, page_token: Optional[str] = No
:return: Downloaded logs and next page token
:rtype: Tuple[str, str]
"""
entries = self._client.list_entries(
filter_=log_filter, page_token=page_token, order_by='timestamp asc', page_size=1000
_, project = self._credentials_and_project
request = ListLogEntriesRequest(
resource_names=[f'projects/{project}'],
filter=log_filter,
page_token=page_token,
order_by='timestamp asc',
page_size=1000,
)
page = next(entries.pages)
next_page_token = entries.next_page_token
response = self._logging_service_client.list_log_entries(request=request)
page: ListLogEntriesResponse = next(response.pages)
messages = []
for entry in page:
if "message" in entry.payload:
messages.append(entry.payload["message"])

return "\n".join(messages), next_page_token
for entry in page.entries:
if "message" in entry.json_payload:
messages.append(entry.json_payload["message"])
return "\n".join(messages), page.next_page_token

@classmethod
def _task_instance_to_labels(cls, ti: TaskInstance) -> Dict[str, str]:
Expand Down Expand Up @@ -323,7 +346,7 @@ def get_external_log_url(self, task_instance: TaskInstance, try_number: int) ->
:return: URL to the external log collection service
:rtype: str
"""
project_id = self._client.project
_, project_id = self._credentials_and_project

ti_labels = self._task_instance_to_labels(task_instance)
ti_labels[self.LABEL_TRY_NUMBER] = str(try_number)
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ def get_sphinx_theme_version() -> str:
'google-cloud-dlp>=0.11.0,<2.0.0',
'google-cloud-kms>=2.0.0,<3.0.0',
'google-cloud-language>=1.1.1,<2.0.0',
'google-cloud-logging>=1.14.0,<2.0.0',
'google-cloud-logging>=2.1.1,<3.0.0',
'google-cloud-memcache>=0.2.0',
'google-cloud-monitoring>=2.0.0,<3.0.0',
'google-cloud-os-login>=2.0.0,<3.0.0',
Expand Down
Loading

0 comments on commit 0e8c77b

Please sign in to comment.