Skip to content

Commit

Permalink
feat!: support json logs (#316)
Browse files Browse the repository at this point in the history
  • Loading branch information
daniel-sanche committed Jan 27, 2022
1 parent e3cac88 commit 5267152
Show file tree
Hide file tree
Showing 13 changed files with 437 additions and 91 deletions.
2 changes: 2 additions & 0 deletions google/cloud/logging_v2/handlers/_monitored_resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,8 @@ def _create_global_resource(project):

def detect_resource(project=""):
"""Return the default monitored resource based on the local environment.
If GCP resource not found, defaults to `global`.
Args:
project (str): The project ID to pass on to the resource (if needed)
Returns:
Expand Down
27 changes: 19 additions & 8 deletions google/cloud/logging_v2/handlers/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

"""Python :mod:`logging` handlers for Cloud Logging."""

import collections
import json
import logging

Expand Down Expand Up @@ -92,15 +93,19 @@ def filter(self, record):
record._span_id = getattr(record, "span_id", inferred_span) or None
record._http_request = getattr(record, "http_request", inferred_http)
record._source_location = CloudLoggingFilter._infer_source_location(record)
record._labels = {**self.default_labels, **user_labels} or None
# add logger name as a label if possible
logger_label = {"python_logger": record.name} if record.name else {}
record._labels = {**logger_label, **self.default_labels, **user_labels} or None
# create string representations for structured logging
record._trace_str = record._trace or ""
record._span_id_str = record._span_id or ""
record._http_request_str = json.dumps(record._http_request or {})
record._source_location_str = json.dumps(record._source_location or {})
record._labels_str = json.dumps(record._labels or {})
# break quotes for parsing through structured logging
record._msg_str = str(record.msg).replace('"', '\\"') if record.msg else ""
record._http_request_str = json.dumps(
record._http_request or {}, ensure_ascii=False
)
record._source_location_str = json.dumps(
record._source_location or {}, ensure_ascii=False
)
record._labels_str = json.dumps(record._labels or {}, ensure_ascii=False)
return True


Expand Down Expand Up @@ -183,9 +188,15 @@ def emit(self, record):
Args:
record (logging.LogRecord): The record to be logged.
"""
message = super(CloudLoggingHandler, self).format(record)
labels = record._labels
resource = record._resource or self.resource
labels = record._labels
message = None
if isinstance(record.msg, collections.abc.Mapping):
# if input is a dictionary, pass as-is for structured logging
message = record.msg
elif record.msg:
# otherwise, format message string based on superclass
message = super(CloudLoggingHandler, self).format(record)
if resource.type == _GAE_RESOURCE_TYPE and record._trace is not None:
# add GAE-specific label
labels = {_GAE_TRACE_ID_LABEL: record._trace, **(labels or {})}
Expand Down
31 changes: 21 additions & 10 deletions google/cloud/logging_v2/handlers/structured_log.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,23 @@

"""Logging handler for printing formatted structured logs to standard output.
"""
import collections
import json
import logging.handlers

from google.cloud.logging_v2.handlers.handlers import CloudLoggingFilter
from google.cloud.logging_v2.handlers.handlers import _format_and_parse_message

GCP_FORMAT = (
'{"message": %(_formatted_msg)s, '
"{%(_payload_str)s"
'"severity": "%(levelname)s", '
'"logging.googleapis.com/labels": %(_labels_str)s, '
'"logging.googleapis.com/trace": "%(_trace_str)s", '
'"logging.googleapis.com/spanId": "%(_span_id_str)s", '
'"logging.googleapis.com/trace_sampled": %(_trace_sampled_str)s, '
'"logging.googleapis.com/sourceLocation": %(_source_location_str)s, '
'"httpRequest": %(_http_request_str)s }'
'"httpRequest": %(_http_request_str)s '
"}"
)


Expand Down Expand Up @@ -57,15 +61,22 @@ def format(self, record):
Args:
record (logging.LogRecord): The log record.
Returns:
str: A JSON string formatted for GKE fluentd.
str: A JSON string formatted for GCP structured logging.
"""
# let other formatters alter the message
super_payload = None
if record.msg:
# format the message using default handler behaviors
super_payload = super(StructuredLogHandler, self).format(record)
# properly break any formatting in string to make it json safe
record._formatted_msg = json.dumps(super_payload or "")
payload = None
message = _format_and_parse_message(record, super(StructuredLogHandler, self))

if isinstance(message, collections.abc.Mapping):
# if input is a dictionary, encode it as a json string
encoded_msg = json.dumps(message, ensure_ascii=False)
# strip out open and close parentheses
payload = encoded_msg.lstrip("{").rstrip("}") + ","
elif message:
# properly break any formatting in string to make it json safe
encoded_message = json.dumps(message, ensure_ascii=False)
payload = '"message": {},'.format(encoded_message)

record._payload_str = payload or ""
# remove exception info to avoid duplicating it
# https://github.com/googleapis/python-logging/issues/382
record.exc_info = None
Expand Down
14 changes: 10 additions & 4 deletions google/cloud/logging_v2/handlers/transports/background_thread.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ def _thread_main(self):
if item is _WORKER_TERMINATOR:
done = True # Continue processing items.
else:
batch.log_struct(**item)
batch.log(**item)

self._safely_commit_batch(batch)

Expand Down Expand Up @@ -226,12 +226,18 @@ def enqueue(self, record, message, **kwargs):
Args:
record (logging.LogRecord): Python log record that the handler was called with.
message (str): The message from the ``LogRecord`` after being
message (str or dict): The message from the ``LogRecord`` after being
formatted by the associated log formatters.
kwargs: Additional optional arguments for the logger
"""
# set python logger name as label if missing
labels = kwargs.pop("labels", {})
if record.name:
labels["python_logger"] = labels.get("python_logger", record.name)
kwargs["labels"] = labels
# enqueue new entry
queue_entry = {
"info": {"message": message, "python_logger": record.name},
"message": message,
"severity": _helpers._normalize_severity(record.levelno),
"timestamp": datetime.datetime.utcfromtimestamp(record.created),
}
Expand Down Expand Up @@ -285,7 +291,7 @@ def send(self, record, message, **kwargs):
Args:
record (logging.LogRecord): Python log record that the handler was called with.
message (str): The message from the ``LogRecord`` after being
message (str or dict): The message from the ``LogRecord`` after being
formatted by the associated log formatters.
kwargs: Additional optional arguments for the logger
"""
Expand Down
2 changes: 1 addition & 1 deletion google/cloud/logging_v2/handlers/transports/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ def send(self, record, message, **kwargs):
Args:
record (logging.LogRecord): Python log record that the handler was called with.
message (str): The message from the ``LogRecord`` after being
message (str or dict): The message from the ``LogRecord`` after being
formatted by the associated log formatters.
kwargs: Additional optional arguments for the logger
"""
Expand Down
16 changes: 11 additions & 5 deletions google/cloud/logging_v2/handlers/transports/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
Logs directly to the the Cloud Logging API with a synchronous call.
"""

from google.cloud.logging_v2 import _helpers
from google.cloud.logging_v2.handlers.transports.base import Transport

Expand All @@ -36,11 +35,18 @@ def send(self, record, message, **kwargs):
Args:
record (logging.LogRecord):
Python log record that the handler was called with.
message (str): The message from the ``LogRecord`` after being
message (str or dict): The message from the ``LogRecord`` after being
formatted by the associated log formatters.
kwargs: Additional optional arguments for the logger
"""
info = {"message": message, "python_logger": record.name}
self.logger.log_struct(
info, severity=_helpers._normalize_severity(record.levelno), **kwargs,
# set python logger name as label if missing
labels = kwargs.pop("labels", {})
if record.name:
labels["python_logger"] = labels.get("python_logger", record.name)
# send log synchronously
self.logger.log(
message,
severity=_helpers._normalize_severity(record.levelno),
labels=labels,
**kwargs,
)
45 changes: 45 additions & 0 deletions google/cloud/logging_v2/logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,16 @@

"""Define API Loggers."""

import collections

from google.cloud.logging_v2._helpers import _add_defaults_to_filter
from google.cloud.logging_v2.entries import LogEntry
from google.cloud.logging_v2.entries import ProtobufEntry
from google.cloud.logging_v2.entries import StructEntry
from google.cloud.logging_v2.entries import TextEntry
from google.cloud.logging_v2.resource import Resource

import google.protobuf.message

_GLOBAL_RESOURCE = Resource(type="global", labels={})

Expand Down Expand Up @@ -197,6 +200,30 @@ def log_proto(self, message, *, client=None, **kw):
"""
self._do_log(client, ProtobufEntry, message, **kw)

def log(self, message=None, *, client=None, **kw):
"""Log an arbitrary message via a POST request.
Type will be inferred based on the input message.
See
https://cloud.google.com/logging/docs/reference/v2/rest/v2/entries/list
Args:
message (Optional[str or dict or google.protobuf.Message]): The message. to log
client (Optional[~logging_v2.client.Client]):
The client to use. If not passed, falls back to the
``client`` stored on the current sink.
kw (Optional[dict]): additional keyword arguments for the entry.
See :class:`~logging_v2.entries.LogEntry`.
"""
entry_type = LogEntry
if isinstance(message, google.protobuf.message.Message):
entry_type = ProtobufEntry
elif isinstance(message, collections.abc.Mapping):
entry_type = StructEntry
elif isinstance(message, str):
entry_type = TextEntry
self._do_log(client, entry_type, message, **kw)

def delete(self, logger_name=None, *, client=None):
"""Delete all entries in a logger via a DELETE request
Expand Down Expand Up @@ -361,6 +388,24 @@ def log_proto(self, message, **kw):
"""
self.entries.append(ProtobufEntry(payload=message, **kw))

def log(self, message=None, **kw):
"""Add an arbitrary message to be logged during :meth:`commit`.
Type will be inferred based on the input message.
Args:
message (Optional[str or dict or google.protobuf.Message]): The message. to log
kw (Optional[dict]): Additional keyword arguments for the entry.
See :class:`~logging_v2.entries.LogEntry`.
"""
entry_type = LogEntry
if isinstance(message, google.protobuf.message.Message):
entry_type = ProtobufEntry
elif isinstance(message, collections.abc.Mapping):
entry_type = StructEntry
elif isinstance(message, str):
entry_type = TextEntry
self.entries.append(entry_type(payload=message, **kw))

def commit(self, *, client=None):
"""Send saved log entries as a single API call.
Expand Down
Loading

0 comments on commit 5267152

Please sign in to comment.