Skip to content

Commit

Permalink
Add Google Ads list accounts operator (#8007)
Browse files Browse the repository at this point in the history
  • Loading branch information
turbaszek committed Apr 2, 2020
1 parent 99370fe commit ed2bc00
Show file tree
Hide file tree
Showing 6 changed files with 212 additions and 6 deletions.
11 changes: 10 additions & 1 deletion airflow/providers/google/ads/example_dags/example_ads.py
Expand Up @@ -21,13 +21,14 @@
import os

from airflow import models
from airflow.providers.google.ads.operators.ads import GoogleAdsToGcsOperator
from airflow.providers.google.ads.operators.ads import GoogleAdsListAccountsOperator, GoogleAdsToGcsOperator
from airflow.utils import dates

# [START howto_google_ads_env_variables]
CLIENT_IDS = ["1111111111", "2222222222"]
BUCKET = os.environ.get("GOOGLE_ADS_BUCKET", "gs://test-google-ads-bucket")
GCS_OBJ_PATH = "folder_name/google-ads-api-results.csv"
GCS_ACCOUNTS_CSV = "folder_name/accounts.csv"
QUERY = """
SELECT
segments.date,
Expand Down Expand Up @@ -79,3 +80,11 @@
task_id="run_operator",
)
# [END howto_google_ads_to_gcs_operator]

# [START howto_ads_list_accounts_operator]
list_accounts = GoogleAdsListAccountsOperator(
task_id="list_accounts",
bucket=BUCKET,
object_name=GCS_ACCOUNTS_CSV
)
# [END howto_ads_list_accounts_operator]
51 changes: 47 additions & 4 deletions airflow/providers/google/ads/hooks/ads.py
Expand Up @@ -21,6 +21,7 @@
from tempfile import NamedTemporaryFile
from typing import IO, Any, Dict, Generator, List

from cached_property import cached_property
from google.ads.google_ads.client import GoogleAdsClient
from google.ads.google_ads.errors import GoogleAdsException
from google.ads.google_ads.v2.types import GoogleAdsRow
Expand Down Expand Up @@ -52,7 +53,7 @@ def __init__(
self,
gcp_conn_id: str = "google_cloud_default",
google_ads_conn_id: str = "google_ads_default",
api_version: str = "v2",
api_version: str = "v3",
) -> None:
super().__init__()
self.gcp_conn_id = gcp_conn_id
Expand All @@ -61,8 +62,11 @@ def __init__(
self.api_version = api_version
self.google_ads_config: Dict[str, Any] = {}

def _get_service(self) -> GoogleAdsClient:
"""Connects and authenticates with the Google Ads API using a service account"""
@cached_property
def _get_service(self):
"""
Connects and authenticates with the Google Ads API using a service account
"""
with NamedTemporaryFile("w", suffix=".json") as secrets_temp:
self._get_config()
self._update_config_with_secret(secrets_temp)
Expand All @@ -73,6 +77,21 @@ def _get_service(self) -> GoogleAdsClient:
self.log.error("Google Auth Error: %s", e)
raise

@cached_property
def _get_customer_service(self):
"""
Connects and authenticates with the Google Ads API using a service account
"""
with NamedTemporaryFile("w", suffix=".json") as secrets_temp:
self._get_config()
self._update_config_with_secret(secrets_temp)
try:
client = GoogleAdsClient.load_from_dict(self.google_ads_config)
return client.get_service("CustomerService", version=self.api_version)
except GoogleAuthError as e:
self.log.error("Google Auth Error: %s", e)
raise

def _get_config(self) -> None:
"""
Gets google ads connection from meta db and sets google_ads_config attribute with returned config file
Expand Down Expand Up @@ -112,7 +131,7 @@ def search(
:return: Google Ads API response, converted to Google Ads Row objects
:rtype: list[GoogleAdsRow]
"""
service = self._get_service()
service = self._get_service
iterators = (
service.search(client_id, query=query, page_size=page_size, **kwargs)
for client_id in client_ids
Expand Down Expand Up @@ -150,3 +169,27 @@ def _extract_rows(
"\t\tOn field: %s", field_path_element.field_name
)
raise

def list_accessible_customers(self) -> List[str]:
"""
Returns resource names of customers directly accessible by the user authenticating the call.
The resulting list of customers is based on your OAuth credentials. The request returns a list
of all accounts that you are able to act upon directly given your current credentials. This will
not necessarily include all accounts within the account hierarchy; rather, it will only include
accounts where your authenticated user has been added with admin or other rights in the account.
..seealso::
https://developers.google.com/google-ads/api/reference/rpc
:return: List of names of customers
"""
try:
accessible_customers = self._get_customer_service.list_accessible_customers()
return accessible_customers.resource_names
except GoogleAdsException as ex:
for error in ex.failure.errors:
self.log.error('\tError with message "%s".', error.message)
if error.location:
for field_path_element in error.location.field_path_elements:
self.log.error('\t\tOn field: %s', field_path_element.field_name)
raise
80 changes: 80 additions & 0 deletions airflow/providers/google/ads/operators/ads.py
Expand Up @@ -120,3 +120,83 @@ def execute(self, context: Dict):
gzip=self.gzip,
)
self.log.info("%s uploaded to GCS", self.obj)


class GoogleAdsListAccountsOperator(BaseOperator):
"""
Saves list of customers on GCS in form of a csv file.
The resulting list of customers is based on your OAuth credentials. The request returns a list
of all accounts that you are able to act upon directly given your current credentials. This will
not necessarily include all accounts within the account hierarchy; rather, it will only include
accounts where your authenticated user has been added with admin or other rights in the account.
..seealso::
https://developers.google.com/google-ads/api/reference/rpc
.. seealso::
For more information on how to use this operator, take a look at the guide:
:ref:`howto/operator:GoogleAdsListAccountsOperator`
:param bucket: The GCS bucket to upload to
:type bucket: str
:param object_name: GCS path to save the csv file. Must be the full file path (ex. `path/to/file.csv`)
:type object_name: str
:param gcp_conn_id: Airflow Google Cloud Platform connection ID
:type gcp_conn_id: str
:param google_ads_conn_id: Airflow Google Ads connection ID
:type google_ads_conn_id: str
:param page_size: The number of results per API page request. Max 10,000
:type page_size: int
:param gzip: Option to compress local file or file data for upload
:type gzip: bool
"""

template_fields = ("bucket", "object_name")

@apply_defaults
def __init__(
self,
bucket: str,
object_name: str,
gcp_conn_id: str = "google_cloud_default",
google_ads_conn_id: str = "google_ads_default",
gzip: bool = False,
*args,
**kwargs,
) -> None:
super().__init__(*args, **kwargs)
self.bucket = bucket
self.object_name = object_name
self.gcp_conn_id = gcp_conn_id
self.google_ads_conn_id = google_ads_conn_id
self.gzip = gzip

def execute(self, context: Dict):
uri = f"gs://{self.bucket}/{self.object_name}"

ads_hook = GoogleAdsHook(
gcp_conn_id=self.gcp_conn_id,
google_ads_conn_id=self.google_ads_conn_id
)

gcs_hook = GCSHook(gcp_conn_id=self.gcp_conn_id)

with NamedTemporaryFile("w+") as temp_file:
# Download accounts
accounts = ads_hook.list_accessible_customers()
writer = csv.writer(temp_file)
writer.writerows(accounts)
temp_file.flush()

# Upload to GCS
gcs_hook.upload(
bucket_name=self.bucket,
object_name=self.object_name,
gzip=self.gzip,
filename=temp_file.name
)
self.log.info("Uploaded %s to %s", len(accounts), uri)

return uri
19 changes: 19 additions & 0 deletions docs/howto/operator/gcp/ads.rst
Expand Up @@ -47,3 +47,22 @@ Use :ref:`Jinja templating <jinja-templating>` with
:template-fields:`airflow.providers.google.ads.operators.ads.GoogleAdsToGcsOperator`
parameters which allow you to dynamically determine values.
The result is saved to :ref:`XCom <concepts:xcom>`, which allows the result to be used by other operators.

.. _howto/operator:GoogleAdsListAccountsOperator:

Upload Google Ads Accounts to GCS
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

To upload Google Ads accounts to Google Cloud Storage bucket use the
:class:`~airflow.providers.google.ads.operators.ads.GoogleAdsListAccountsOperator`.

.. exampleinclude:: ../../../../airflow/providers/google/ads/example_dags/example_ads.py
:language: python
:dedent: 4
:start-after: [START howto_ads_list_accounts_operator]
:end-before: [END howto_ads_list_accounts_operator]

Use :ref:`Jinja templating <jinja-templating>` with
:template-fields:`airflow.providers.google.ads.operators.ads.GoogleAdsToGcsOperator`
parameters which allow you to dynamically determine values.
The result is saved to :ref:`XCom <concepts:xcom>`, which allows the result to be used by other operators.
19 changes: 19 additions & 0 deletions tests/providers/google/ads/hooks/test_ads.py
Expand Up @@ -38,6 +38,15 @@ def mock_hook():


class TestGoogleAdsHook:
@mock.patch("airflow.providers.google.ads.hooks.ads.GoogleAdsClient")
def test_get_customer_service(self, mock_client, mock_hook):
mock_hook._get_customer_service()
client = mock_client.load_from_dict
client.assert_called_once_with(mock_hook.google_ads_config)
client.return_value.get_service.assert_called_once_with(
"CustomerService", version=API_VERSION
)

@mock.patch("airflow.providers.google.ads.hooks.ads.GoogleAdsClient")
def test_get_service(self, mock_client, mock_hook):
mock_hook._get_service()
Expand Down Expand Up @@ -66,3 +75,13 @@ def test_search(self, mock_client, mock_hook):
def test_extract_rows(self, mock_hook):
iterators = [[1, 2, 3], [4, 5, 6]]
assert mock_hook._extract_rows(iterators) == sum(iterators, [])

@mock.patch("airflow.providers.google.ads.hooks.ads.GoogleAdsClient")
def test_list_accessible_customers(self, mock_client, mock_hook):
accounts = ["a", "b", "c"]
service = mock_client.load_from_dict.return_value.get_service.return_value
service.list_accessible_customers.return_value = mock.MagicMock(resource_names=accounts)

result = mock_hook.list_accessible_customers()
service.list_accessible_customers.assert_called_once_with()
assert accounts == result
38 changes: 37 additions & 1 deletion tests/providers/google/ads/operators/test_ads.py
Expand Up @@ -16,7 +16,7 @@
# under the License.
from unittest import mock

from airflow.providers.google.ads.operators.ads import GoogleAdsToGcsOperator
from airflow.providers.google.ads.operators.ads import GoogleAdsListAccountsOperator, GoogleAdsToGcsOperator

CLIENT_IDS = ["1111111111", "2222222222"]
BUCKET = "gs://test-google-ads-bucket"
Expand Down Expand Up @@ -63,3 +63,39 @@ def test_execute(self, mock_gcs_hook, mock_ads_hook):
mock_gcs_hook.return_value.upload.assert_called_once_with(
bucket_name=BUCKET, object_name=GCS_OBJ_PATH, filename=mock.ANY, gzip=False
)


class TestGoogleAdsListAccountsOperator:
@mock.patch("airflow.providers.google.ads.operators.ads.GoogleAdsHook")
@mock.patch("airflow.providers.google.ads.operators.ads.GCSHook")
@mock.patch("airflow.providers.google.ads.operators.ads.NamedTemporaryFile")
@mock.patch("airflow.providers.google.ads.operators.ads.csv.writer")
def test_execute(self, mocks_csv_writer, mock_tempfile, mock_gcs_hook, mock_ads_hook):
filename = "test.csv"
file_object = mock_tempfile.return_value.__enter__.return_value
file_object.name = filename
accounts = ["a", "b", "c"]
mock_ads_hook.return_value.list_accessible_customers.return_value = accounts

op = GoogleAdsListAccountsOperator(
gcp_conn_id=gcp_conn_id,
google_ads_conn_id=google_ads_conn_id,
object_name=GCS_OBJ_PATH,
bucket=BUCKET,
task_id="run_operator",
)
op.execute({})

mock_ads_hook.assert_called_once_with(
gcp_conn_id=gcp_conn_id, google_ads_conn_id=google_ads_conn_id
)
mock_gcs_hook.assert_called_once_with(gcp_conn_id=gcp_conn_id)

mock_ads_hook.return_value.list_accessible_customers.assert_called_once_with()
mocks_csv_writer.assert_called_once_with(file_object)
mocks_csv_writer.return_value.writerows.assert_called_once_with(accounts)
file_object.flush.assert_called_once_with()

mock_gcs_hook.return_value.upload.assert_called_once_with(
bucket_name=BUCKET, object_name=GCS_OBJ_PATH, filename=filename, gzip=False
)

0 comments on commit ed2bc00

Please sign in to comment.