Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 54 additions & 37 deletions examples/custom_logging_interceptor/cloud_logging_interceptor.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,16 @@

import logging
import time
from typing import Any, Callable, Dict, Optional, Tuple

from google.cloud import logging
from grpc import UnaryUnaryClientInterceptor, UnaryStreamClientInterceptor
from google.cloud import logging as google_cloud_logging
from grpc import (
Call,
Future,
UnaryUnaryClientInterceptor,
UnaryStreamClientInterceptor,
)
from grpc._interceptor import _ClientCallDetails

from google.ads.googleads.interceptors import LoggingInterceptor, mask_message

Expand All @@ -40,27 +47,29 @@ class CloudLoggingInterceptor(LoggingInterceptor):
this is to inherit from the Interceptor class instead, and selectively copy whatever
logic is needed from the LoggingInterceptor class."""

def __init__(self, api_version):
def __init__(self, api_version: str):
"""Initializer for the CloudLoggingInterceptor.

Args:
api_version: a str of the API version of the request.
"""
super().__init__(logger=None, api_version=api_version)
# Instantiate the Cloud Logging client.
logging_client = logging.Client()
self.logger = logging_client.logger("cloud_logging")
logging_client: google_cloud_logging.Client = google_cloud_logging.Client()
self.logger: google_cloud_logging.Logger = logging_client.logger("cloud_logging")
self.rpc_start: float
self.rpc_end: float

def log_successful_request(
self,
method,
customer_id,
metadata_json,
request_id,
request,
trailing_metadata_json,
response,
):
method: str,
customer_id: Optional[str],
metadata_json: str,
request_id: str,
request: Any, # google.ads.googleads.vX.services.types.SearchGoogleAdsRequest or SearchGoogleAdsStreamRequest
trailing_metadata_json: str,
response: Any, # grpc.Call or grpc.Future
) -> None:
"""Handles logging of a successful request.

Args:
Expand All @@ -78,15 +87,15 @@ def log_successful_request(
# The response result could contain up to 10,000 rows of data,
# so consider truncating this value before logging it, to save
# on data storage costs and maintain readability.
result = self.retrieve_and_mask_result(response)
result: Any = self.retrieve_and_mask_result(response)

# elapsed_ms is the approximate elapsed time of the RPC, in milliseconds.
# There are different ways to define and measure elapsed time, so use
# whatever approach makes sense for your monitoring purposes.
# rpc_start and rpc_end are set in the intercept_unary_* methods below.
elapsed_ms = (self.rpc_end - self.rpc_start) * 1000
elapsed_ms: float = (self.rpc_end - self.rpc_start) * 1000

debug_log = {
debug_log: Dict[str, Any] = {
"method": method,
"host": metadata_json,
"request_id": request_id,
Expand All @@ -98,7 +107,7 @@ def log_successful_request(
}
self.logger.log_struct(debug_log, severity="DEBUG")

info_log = {
info_log: Dict[str, Any] = {
"customer_id": customer_id,
"method": method,
"request_id": request_id,
Expand All @@ -110,14 +119,14 @@ def log_successful_request(

def log_failed_request(
self,
method,
customer_id,
metadata_json,
request_id,
request,
trailing_metadata_json,
response,
):
method: str,
customer_id: Optional[str],
metadata_json: str,
request_id: str,
request: Any, # google.ads.googleads.vX.services.types.SearchGoogleAdsRequest or SearchGoogleAdsStreamRequest
trailing_metadata_json: str,
response: Any, # grpc.Call or grpc.Future
) -> None:
"""Handles logging of a failed request.

Args:
Expand All @@ -129,11 +138,11 @@ def log_failed_request(
trailing_metadata_json: A JSON str of trailing_metadata.
response: A JSON str of the response message.
"""
exception = self._get_error_from_response(response)
exception_str = self._parse_exception_to_str(exception)
fault_message = self._get_fault_message(exception)
exception: Any = self._get_error_from_response(response)
exception_str: str = self._parse_exception_to_str(exception)
fault_message: str = self._get_fault_message(exception)

info_log = {
info_log: Dict[str, Any] = {
"method": method,
"endpoint": self.endpoint,
"host": metadata_json,
Expand All @@ -145,7 +154,7 @@ def log_failed_request(
}
self.logger.log_struct(info_log, severity="INFO")

error_log = {
error_log: Dict[str, Any] = {
"method": method,
"endpoint": self.endpoint,
"request_id": request_id,
Expand All @@ -155,7 +164,12 @@ def log_failed_request(
}
self.logger.log_struct(error_log, severity="ERROR")

def intercept_unary_unary(self, continuation, client_call_details, request):
def intercept_unary_unary(
self,
continuation: Callable[[_ClientCallDetails, Any], Any], # Any is request type
client_call_details: _ClientCallDetails,
request: Any, # google.ads.googleads.vX.services.types.SearchGoogleAdsRequest
) -> Any: # grpc.Call or grpc.Future
"""Intercepts and logs API interactions.

Overrides abstract method defined in grpc.UnaryUnaryClientInterceptor.
Expand All @@ -171,15 +185,15 @@ def intercept_unary_unary(self, continuation, client_call_details, request):
A grpc.Call/grpc.Future instance representing a service response.
"""
# Set the rpc_end value to current time when RPC completes.
def update_rpc_end(response_future):
def update_rpc_end(response_future: Any) -> None: # response_future is grpc.Future
self.rpc_end = time.perf_counter()

# Capture precise clock time to later calculate approximate elapsed
# time of the RPC.
self.rpc_start = time.perf_counter()

# The below call is REQUIRED.
response = continuation(client_call_details, request)
response: Any = continuation(client_call_details, request) # response is grpc.Call or grpc.Future

response.add_done_callback(update_rpc_end)

Expand All @@ -189,8 +203,11 @@ def update_rpc_end(response_future):
return response

def intercept_unary_stream(
self, continuation, client_call_details, request
):
self,
continuation: Callable[[_ClientCallDetails, Any], Any], # Any is request type
client_call_details: _ClientCallDetails,
request: Any, # google.ads.googleads.vX.services.types.SearchGoogleAdsStreamRequest
) -> Any: # grpc.Call or grpc.Future
"""Intercepts and logs API interactions for Unary-Stream requests.

Overrides abstract method defined in grpc.UnaryStreamClientInterceptor.
Expand All @@ -206,7 +223,7 @@ def intercept_unary_stream(
A grpc.Call/grpc.Future instance representing a service response.
"""

def on_rpc_complete(response_future):
def on_rpc_complete(response_future: Any) -> None: # response_future is grpc.Future
self.rpc_end = time.perf_counter()
self.log_request(client_call_details, request, response_future)

Expand All @@ -215,7 +232,7 @@ def on_rpc_complete(response_future):
self.rpc_start = time.perf_counter()

# The below call is REQUIRED.
response = continuation(client_call_details, request)
response: Any = continuation(client_call_details, request) # response is grpc.Call or grpc.Future

# Set self._cache to the cache on the response wrapper in order to
# access the streaming logs. This is REQUIRED in order to log streaming
Expand Down
23 changes: 15 additions & 8 deletions examples/custom_logging_interceptor/get_campaigns.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,25 @@

import argparse
import sys
from typing import Any, Iterable

from google.ads.googleads.client import GoogleAdsClient
from google.ads.googleads.errors import GoogleAdsException
from google.ads.googleads.v20.services.services.google_ads_service import GoogleAdsServiceClient
from google.ads.googleads.v20.services.types.google_ads_service import SearchGoogleAdsStreamResponse
from google.ads.googleads.v20.types.google_ads_row import GoogleAdsRow

from cloud_logging_interceptor import CloudLoggingInterceptor


def main(client, customer_id):
def main(client: GoogleAdsClient, customer_id: str) -> None:
# Instantiate the GoogleAdsService object with a custom interceptor.
ga_service = client.get_service(
ga_service: GoogleAdsServiceClient = client.get_service(
"GoogleAdsService",
interceptors=[CloudLoggingInterceptor(api_version="v20")],
)

query = """
query: str = """
SELECT
campaign.id,
campaign.name
Expand All @@ -41,18 +46,18 @@ def main(client, customer_id):
LIMIT 10"""

# Issues a search request using streaming.
stream = ga_service.search_stream(customer_id=customer_id, query=query)
stream: Iterable[SearchGoogleAdsStreamResponse] = ga_service.search_stream(customer_id=customer_id, query=query)

for batch in stream:
for row in batch.results:
for row: GoogleAdsRow in batch.results:
print(
f"Campaign with ID {row.campaign.id} and name "
f'"{row.campaign.name}" was found.'
)


if __name__ == "__main__":
parser = argparse.ArgumentParser(
parser: argparse.ArgumentParser = argparse.ArgumentParser(
description="Lists all campaigns for specified customer."
)
# The following argument(s) should be provided to run the example.
Expand All @@ -63,11 +68,13 @@ def main(client, customer_id):
required=True,
help="The Google Ads customer ID.",
)
args = parser.parse_args()
args: argparse.Namespace = parser.parse_args()

# GoogleAdsClient will read the google-ads.yaml configuration file in the
# home directory if none is specified.
googleads_client = GoogleAdsClient.load_from_storage(version="v20")
googleads_client: GoogleAdsClient = GoogleAdsClient.load_from_storage(
version="v20"
)

try:
main(googleads_client, args.customer_id)
Expand Down