Skip to content

Commit

Permalink
✨Source Google Ads: Add possibility to sync all connected accounts (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
tolik0 committed Jan 11, 2024
1 parent c7c6a27 commit c1574b8
Show file tree
Hide file tree
Showing 16 changed files with 538 additions and 210 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ data:
connectorSubtype: api
connectorType: source
definitionId: 253487c0-2246-43ba-a21f-5116b20a2c50
dockerImageTag: 3.1.0
dockerImageTag: 3.2.0
dockerRepository: airbyte/source-google-ads
documentationUrl: https://docs.airbyte.com/integrations/sources/google-ads
githubIssueLabel: source-google-ads
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,28 @@ def __init__(self, credentials: MutableMapping[str, Any]):
# `google-ads` library version `14.0.0` and higher requires an additional required parameter `use_proto_plus`.
# More details can be found here: https://developers.google.com/google-ads/api/docs/client-libs/python/protobuf-messages
credentials["use_proto_plus"] = True
self.client = self.get_google_ads_client(credentials)
self.ga_service = self.client.get_service("GoogleAdsService")
self.clients = {}
self.ga_services = {}
self.credentials = credentials

self.clients["default"] = self.get_google_ads_client(credentials)
self.ga_services["default"] = self.clients["default"].get_service("GoogleAdsService")

self.customer_service = self.clients["default"].get_service("CustomerService")

def get_client(self, login_customer_id="default"):
if login_customer_id in self.clients:
return self.clients[login_customer_id]
new_creds = self.credentials.copy()
new_creds["login_customer_id"] = login_customer_id
self.clients[login_customer_id] = self.get_google_ads_client(new_creds)
return self.clients[login_customer_id]

def ga_service(self, login_customer_id="default"):
if login_customer_id in self.ga_services:
return self.ga_services[login_customer_id]
self.ga_services[login_customer_id] = self.clients[login_customer_id].get_service("GoogleAdsService")
return self.ga_services[login_customer_id]

@staticmethod
def get_google_ads_client(credentials) -> GoogleAdsClient:
Expand All @@ -38,6 +58,14 @@ def get_google_ads_client(credentials) -> GoogleAdsClient:
message = "The authentication to Google Ads has expired. Re-authenticate to restore access to Google Ads."
raise AirbyteTracedException(message=message, failure_type=FailureType.config_error) from e

def get_accessible_accounts(self):
customer_resource_names = self.customer_service.list_accessible_customers().resource_names
logger.info(f"Found {len(customer_resource_names)} accessible accounts: {customer_resource_names}")

for customer_resource_name in customer_resource_names:
customer_id = self.ga_service().parse_customer_path(customer_resource_name)["customer_id"]
yield customer_id

@backoff.on_exception(
backoff.expo,
(InternalServerError, ServerError, TooManyRequests),
Expand All @@ -46,13 +74,13 @@ def get_google_ads_client(credentials) -> GoogleAdsClient:
),
max_tries=5,
)
def send_request(self, query: str, customer_id: str) -> Iterator[SearchGoogleAdsResponse]:
client = self.client
def send_request(self, query: str, customer_id: str, login_customer_id: str = "default") -> Iterator[SearchGoogleAdsResponse]:
client = self.get_client(login_customer_id)
search_request = client.get_type("SearchGoogleAdsRequest")
search_request.query = query
search_request.page_size = self.DEFAULT_PAGE_SIZE
search_request.customer_id = customer_id
return [self.ga_service.search(search_request)]
return [self.ga_service(login_customer_id).search(search_request)]

def get_fields_metadata(self, fields: List[str]) -> Mapping[str, Any]:
"""
Expand All @@ -61,8 +89,8 @@ def get_fields_metadata(self, fields: List[str]) -> Mapping[str, Any]:
:return dict of fields type info.
"""

ga_field_service = self.client.get_service("GoogleAdsFieldService")
request = self.client.get_type("SearchGoogleAdsFieldsRequest")
ga_field_service = self.get_client().get_service("GoogleAdsFieldService")
request = self.get_client().get_type("SearchGoogleAdsFieldsRequest")
request.page_size = len(fields)
fields_sql = ",".join([f"'{field}'" for field in fields])
request.query = f"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,27 +4,32 @@


from dataclasses import dataclass
from typing import Any, Iterable, Mapping, Union
from typing import Any, Iterable, Mapping

from pendulum import timezone
from pendulum import local_timezone, timezone
from pendulum.tz.timezone import Timezone


@dataclass
class CustomerModel:
id: str
time_zone: Union[timezone, str] = "local"
time_zone: timezone = local_timezone()
is_manager_account: bool = False
login_customer_id: str = None

@classmethod
def from_accounts(cls, accounts: Iterable[Iterable[Mapping[str, Any]]]):
def from_accounts(cls, accounts: Iterable[Mapping[str, Any]]) -> Iterable["CustomerModel"]:
data_objects = []
for account_list in accounts:
for account in account_list:
time_zone_name = account.get("customer.time_zone")
tz = Timezone(time_zone_name) if time_zone_name else "local"
for account in accounts:
time_zone_name = account.get("customer_client.time_zone")
tz = Timezone(time_zone_name) if time_zone_name else local_timezone()

data_objects.append(
cls(id=str(account["customer.id"]), time_zone=tz, is_manager_account=bool(account.get("customer.manager")))
data_objects.append(
cls(
id=str(account["customer_client.id"]),
time_zone=tz,
is_manager_account=bool(account.get("customer_client.manager")),
login_customer_id=account.get("login_customer_id"),
)
)
return data_objects
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
{
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {
"customer_client.client_customer": {
"type": ["null", "boolean"]
},
"customer_client.level": {
"type": ["null", "string"]
},
"customer_client.id": {
"type": ["null", "integer"]
},
"customer_client.manager": {
"type": ["null", "boolean"]
},
"customer_client.time_zone": {
"type": ["null", "number"]
},
"customer_client.status": {
"type": ["null", "string"]
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
CampaignLabel,
ClickView,
Customer,
CustomerClient,
CustomerLabel,
DisplayKeywordView,
GeographicView,
Expand All @@ -47,6 +48,8 @@
)
from .utils import GAQL

logger = logging.getLogger("airbyte")


class SourceGoogleAds(AbstractSource):
# Skip exceptions on missing streams
Expand All @@ -65,6 +68,11 @@ def _validate_and_transform(config: Mapping[str, Any]):
"https://developers.google.com/google-ads/api/fields/v15/query_validator"
)
raise AirbyteTracedException(message=message, failure_type=FailureType.config_error)

if "customer_id" in config:
config["customer_ids"] = config["customer_id"].split(",")
config.pop("customer_id")

return config

@staticmethod
Expand All @@ -73,10 +81,6 @@ def get_credentials(config: Mapping[str, Any]) -> MutableMapping[str, Any]:
# use_proto_plus is set to True, because setting to False returned wrong value types, which breaks the backward compatibility.
# For more info read the related PR's description: https://github.com/airbytehq/airbyte/pull/9996
credentials.update(use_proto_plus=True)

# https://developers.google.com/google-ads/api/docs/concepts/call-structure#cid
if "login_customer_id" in config and config["login_customer_id"].strip():
credentials["login_customer_id"] = config["login_customer_id"]
return credentials

@staticmethod
Expand All @@ -98,12 +102,45 @@ def get_incremental_stream_config(google_api: GoogleAds, config: Mapping[str, An
)
return incremental_stream_config

@staticmethod
def get_account_info(google_api: GoogleAds, config: Mapping[str, Any]) -> Iterable[Iterable[Mapping[str, Any]]]:
dummy_customers = [CustomerModel(id=_id) for _id in config["customer_id"].split(",")]
accounts_stream = ServiceAccounts(google_api, customers=dummy_customers)
for slice_ in accounts_stream.stream_slices():
yield accounts_stream.read_records(sync_mode=SyncMode.full_refresh, stream_slice=slice_)
def get_all_accounts(self, google_api: GoogleAds, customers: List[CustomerModel], customer_status_filter: List[str]) -> List[str]:
customer_clients_stream = CustomerClient(api=google_api, customers=customers, customer_status_filter=customer_status_filter)
for slice in customer_clients_stream.stream_slices():
for record in customer_clients_stream.read_records(sync_mode=SyncMode.full_refresh, stream_slice=slice):
yield record

def _get_all_connected_accounts(
self, google_api: GoogleAds, customer_status_filter: List[str]
) -> Iterable[Iterable[Mapping[str, Any]]]:
customer_ids = [customer_id for customer_id in google_api.get_accessible_accounts()]
dummy_customers = [CustomerModel(id=_id, login_customer_id=_id) for _id in customer_ids]

yield from self.get_all_accounts(google_api, dummy_customers, customer_status_filter)

def get_customers(self, google_api: GoogleAds, config: Mapping[str, Any]) -> List[CustomerModel]:
customer_status_filter = config.get("customer_status_filter", [])
accounts = self._get_all_connected_accounts(google_api, customer_status_filter)
customers = CustomerModel.from_accounts(accounts)

# filter duplicates as one customer can be accessible from mutiple connected accounts
unique_customers = []
seen_ids = set()
for customer in customers:
if customer.id in seen_ids:
continue
seen_ids.add(customer.id)
unique_customers.append(customer)
customers = unique_customers
customers_dict = {customer.id: customer for customer in customers}

# filter only selected accounts
if config.get("customer_ids"):
customers = []
for customer_id in config["customer_ids"]:
if customer_id not in customers_dict:
logging.warning(f"Customer with id {customer_id} is not accessible. Skipping it.")
else:
customers.append(customers_dict[customer_id])
return customers

@staticmethod
def is_metrics_in_custom_query(query: GAQL) -> bool:
Expand Down Expand Up @@ -149,8 +186,9 @@ def check_connection(self, logger: logging.Logger, config: Mapping[str, Any]) ->
logger.info("Checking the config")
google_api = GoogleAds(credentials=self.get_credentials(config))

accounts = self.get_account_info(google_api, config)
customers = CustomerModel.from_accounts(accounts)
customers = self.get_customers(google_api, config)
logger.info(f"Found {len(customers)} customers: {[customer.id for customer in customers]}")

# Check custom query request validity by sending metric request with non-existent time window
for customer in customers:
for query in config.get("custom_queries_array", []):
Expand All @@ -168,7 +206,7 @@ def check_connection(self, logger: logging.Logger, config: Mapping[str, Any]) ->
query = IncrementalCustomQuery.insert_segments_date_expr(query, "1980-01-01", "1980-01-01")

query = query.set_limit(1)
response = google_api.send_request(str(query), customer_id=customer.id)
response = google_api.send_request(str(query), customer_id=customer.id, login_customer_id=customer.login_customer_id)
# iterate over the response otherwise exceptions will not be raised!
for _ in response:
pass
Expand All @@ -177,8 +215,10 @@ def check_connection(self, logger: logging.Logger, config: Mapping[str, Any]) ->
def streams(self, config: Mapping[str, Any]) -> List[Stream]:
config = self._validate_and_transform(config)
google_api = GoogleAds(credentials=self.get_credentials(config))
accounts = self.get_account_info(google_api, config)
customers = CustomerModel.from_accounts(accounts)

customers = self.get_customers(google_api, config)
logger.info(f"Found {len(customers)} customers: {[customer.id for customer in customers]}")

non_manager_accounts = [customer for customer in customers if not customer.is_manager_account]
default_config = dict(api=google_api, customers=customers)
incremental_config = self.get_incremental_stream_config(google_api, config, customers)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "Google Ads Spec",
"type": "object",
"required": ["credentials", "customer_id"],
"required": ["credentials"],
"additionalProperties": true,
"properties": {
"credentials": {
Expand Down Expand Up @@ -64,14 +64,26 @@
"examples": ["6783948572,5839201945"],
"order": 1
},
"customer_status_filter": {
"title": "Customer Statuses Filter",
"description": "A list of customer statuses to filter on. For detailed info about what each status mean refer to Google Ads <a href=\"https://developers.google.com/google-ads/api/reference/rpc/v15/CustomerStatusEnum.CustomerStatus\">documentation</a>.",
"default": [],
"order": 2,
"type": "array",
"items": {
"title": "CustomerStatus",
"description": "An enumeration.",
"enum": ["UNKNOWN", "ENABLED", "CANCELED", "SUSPENDED", "CLOSED"]
}
},
"start_date": {
"type": "string",
"title": "Start Date",
"description": "UTC date in the format YYYY-MM-DD. Any data before this date will not be replicated. (Default value of two years ago is used if not set)",
"pattern": "^[0-9]{4}-[0-9]{2}-[0-9]{2}$",
"pattern_descriptor": "YYYY-MM-DD",
"examples": ["2017-01-25"],
"order": 2,
"order": 3,
"format": "date"
},
"end_date": {
Expand All @@ -81,14 +93,14 @@
"pattern": "^$|^[0-9]{4}-[0-9]{2}-[0-9]{2}$",
"pattern_descriptor": "YYYY-MM-DD",
"examples": ["2017-01-30"],
"order": 6,
"order": 4,
"format": "date"
},
"custom_queries_array": {
"type": "array",
"title": "Custom GAQL Queries",
"description": "",
"order": 3,
"order": 5,
"items": {
"type": "object",
"required": ["query", "table_name"],
Expand All @@ -110,15 +122,6 @@
}
}
},
"login_customer_id": {
"type": "string",
"title": "Login Customer ID for Managed Accounts",
"description": "If your access to the customer account is through a manager account, this field is required, and must be set to the 10-digit customer ID of the manager account. For more information about this field, refer to <a href=\"https://developers.google.com/google-ads/api/docs/concepts/call-structure#cid\">Google's documentation</a>.",
"pattern_descriptor": ": 10 digits, with no dashes.",
"pattern": "^([0-9]{10})?$",
"examples": ["7349206847"],
"order": 4
},
"conversion_window_days": {
"title": "Conversion Window",
"type": "integer",
Expand All @@ -127,7 +130,7 @@
"maximum": 1095,
"default": 14,
"examples": [14],
"order": 5
"order": 6
}
}
},
Expand Down
Loading

0 comments on commit c1574b8

Please sign in to comment.