@@ -16,9 +16,11 @@
from datetime import timedelta
from datetime import timezone
import logging
import numbers
import os
import pytest
import unittest
import uuid
from google .api_core .exceptions import BadGateway
from google .api_core .exceptions import Conflict
@@ -36,6 +38,8 @@
from google .cloud .logging_v2 import client
from google .cloud .logging_v2 .resource import Resource
from google .protobuf .struct_pb2 import Struct , Value , ListValue , NullValue
from test_utils .retry import RetryErrors
from test_utils .retry import RetryResult
from test_utils .system import unique_resource_id
@@ -142,32 +146,119 @@ def tearDown(self):
def _logger_name (prefix ):
return prefix + unique_resource_id ("-" )
def test_list_entry_with_unregistered (self ):
from google .protobuf import any_pb2
@staticmethod
def _to_value (data ):
if data is None :
return Value (null_value = NullValue .NULL_VALUE )
elif isinstance (data , numbers .Number ):
return Value (number_value = data )
elif isinstance (data , str ):
return Value (string_value = data )
elif isinstance (data , bool ):
return Value (bool_value = data )
elif isinstance (data , (list , tuple , set )):
return Value (
list_value = ListValue (values = (TestLogging ._to_value (e ) for e in data ))
)
elif isinstance (data , dict ):
return Value (struct_value = TestLogging ._dict_to_struct (data ))
else :
raise TypeError ("Unknown data type: %r" % type (data ))
@staticmethod
def _dict_to_struct (data ):
return Struct (fields = {k : TestLogging ._to_value (v ) for k , v in data .items ()})
def test_list_entry_with_auditlog (self ):
"""
Test emitting and listing logs containing a google.cloud.audit.AuditLog proto message
"""
from google .protobuf import descriptor_pool
from google .cloud .logging_v2 import entries
pool = descriptor_pool .Default ()
type_name = "google.cloud.audit.AuditLog"
# Make sure the descriptor is not known in the registry.
with self .assertRaises (KeyError ):
pool .FindMessageTypeByName (type_name )
type_url = "type.googleapis.com/" + type_name
filter_ = self .TYPE_FILTER .format (type_url ) + f" AND { _time_filter } "
entry_iter = iter (Config .CLIENT .list_entries (page_size = 1 , filter_ = filter_ ))
# Make sure the descriptor is known in the registry.
# Raises KeyError if unknown
pool .FindMessageTypeByName (type_name )
# create log
audit_dict = {
"@type" : type_url ,
"methodName" : "test" ,
"requestMetadata" : {"callerIp" : "::1" , "callerSuppliedUserAgent" : "test" },
"resourceName" : "test" ,
"serviceName" : "test" ,
"status" : {"code" : 0 },
}
audit_struct = self ._dict_to_struct (audit_dict )
logger = Config .CLIENT .logger (f"audit-proto-{ uuid .uuid1 ()} " )
logger .log_proto (audit_struct )
# retrieve log
retry = RetryErrors ((TooManyRequests , StopIteration ), max_tries = 8 )
protobuf_entry = retry (lambda : next (logger .list_entries ()))()
retry = RetryErrors (TooManyRequests )
protobuf_entry = retry (lambda : next (entry_iter ))()
self .assertIsInstance (protobuf_entry , entries .ProtobufEntry )
self .assertIsNone (protobuf_entry .payload_pb )
self .assertIsInstance (protobuf_entry .payload_json , dict )
self .assertEqual (protobuf_entry .payload_json ["@type" ], type_url )
self .assertEqual (
protobuf_entry .payload_json ["methodName" ], audit_dict ["methodName" ]
)
self .assertEqual (
protobuf_entry .to_api_repr ()["protoPayload" ]["@type" ], type_url
)
self .assertEqual (
protobuf_entry .to_api_repr ()["protoPayload" ]["methodName" ],
audit_dict ["methodName" ],
)
def test_list_entry_with_requestlog (self ):
"""
Test emitting and listing logs containing a google.appengine.logging.v1.RequestLog proto message
"""
from google .protobuf import descriptor_pool
from google .cloud .logging_v2 import entries
pool = descriptor_pool .Default ()
type_name = "google.appengine.logging.v1.RequestLog"
type_url = "type.googleapis.com/" + type_name
# Make sure the descriptor is known in the registry.
# Raises KeyError if unknown
pool .FindMessageTypeByName (type_name )
# create log
req_dict = {
"@type" : type_url ,
"ip" : "0.0.0.0" ,
"appId" : "test" ,
"versionId" : "test" ,
"requestId" : "12345" ,
"latency" : "500.0s" ,
"method" : "GET" ,
"status" : 500 ,
"resource" : "test" ,
"httpVersion" : "HTTP/1.1" ,
}
req_struct = self ._dict_to_struct (req_dict )
logger = Config .CLIENT .logger (f"req-proto-{ uuid .uuid1 ()} " )
logger .log_proto (req_struct )
# retrieve log
retry = RetryErrors ((TooManyRequests , StopIteration ), max_tries = 8 )
protobuf_entry = retry (lambda : next (logger .list_entries ()))()
self .assertIsInstance (protobuf_entry , entries .ProtobufEntry )
if Config .CLIENT ._use_grpc :
self .assertIsNone (protobuf_entry .payload_json )
self .assertIsInstance (protobuf_entry .payload_pb , any_pb2 .Any )
self .assertEqual (protobuf_entry .payload_pb .type_url , type_url )
else :
self .assertIsNone (protobuf_entry .payload_pb )
self .assertEqual (protobuf_entry .payload_json ["@type" ], type_url )
self .assertIsNone (protobuf_entry .payload_pb )
self .assertIsInstance (protobuf_entry .payload_json , dict )
self .assertEqual (protobuf_entry .payload_json ["@type" ], type_url )
self .assertEqual (
protobuf_entry .to_api_repr ()["protoPayload" ]["@type" ], type_url
)
def test_log_text (self ):
TEXT_PAYLOAD = "System test: test_log_text"
@@ -288,7 +379,7 @@ def test_log_handler_async(self):
cloud_logger = logging .getLogger (handler .name )
cloud_logger .addHandler (handler )
cloud_logger .warn (LOG_MESSAGE )
cloud_logger .warning (LOG_MESSAGE )
handler .flush ()
entries = _list_entries (logger )
expected_payload = {"message" : LOG_MESSAGE , "python_logger" : handler .name }
@@ -310,7 +401,7 @@ def test_log_handler_sync(self):
LOGGER_NAME = "mylogger"
cloud_logger = logging .getLogger (LOGGER_NAME )
cloud_logger .addHandler (handler )
cloud_logger .warn (LOG_MESSAGE )
cloud_logger .warning (LOG_MESSAGE )
entries = _list_entries (logger )
expected_payload = {"message" : LOG_MESSAGE , "python_logger" : LOGGER_NAME }
@@ -342,7 +433,7 @@ def test_handlers_w_extras(self):
"resource" : Resource (type = "cloudiot_device" , labels = {}),
"labels" : {"test-label" : "manual" },
}
cloud_logger .warn (LOG_MESSAGE , extra = extra )
cloud_logger .warning (LOG_MESSAGE , extra = extra )
entries = _list_entries (logger )
self .assertEqual (len (entries ), 1 )
@@ -363,7 +454,7 @@ def test_log_root_handler(self):
self .to_delete .append (logger )
google .cloud .logging .handlers .handlers .setup_logging (handler )
logging .warn (LOG_MESSAGE )
logging .warning (LOG_MESSAGE )
entries = _list_entries (logger )
expected_payload = {"message" : LOG_MESSAGE , "python_logger" : "root" }