Skip to content

Commit

Permalink
[AIRFLOW-6752] Add GoogleAnalyticsRetrieveAdsLinksListOperator
Browse files Browse the repository at this point in the history
  • Loading branch information
michalslowikowski00 committed Mar 17, 2020
1 parent 4ff9d63 commit 8c55966
Show file tree
Hide file tree
Showing 6 changed files with 186 additions and 11 deletions.
Expand Up @@ -20,12 +20,15 @@

from airflow import models
from airflow.providers.google.marketing_platform.operators.analytics import (
GoogleAnalyticsListAccountsOperator,
GoogleAnalyticsListAccountsOperator, GoogleAnalyticsRetrieveAdsLinksListOperator,
)
from airflow.utils import dates

default_args = {"start_date": dates.days_ago(1)}

account_id = "12345"
web_property_id = "web_property_id"

with models.DAG(
"example_google_analytics",
default_args=default_args,
Expand All @@ -34,3 +37,9 @@
# [START howto_marketing_platform_list_accounts_operator]
list_account = GoogleAnalyticsListAccountsOperator(task_id="list_account")
# [END howto_marketing_platform_list_accounts_operator]

# [START howto_marketing_platform_retrieve_ads_links_list_operator]
list_ad_link = GoogleAnalyticsRetrieveAdsLinksListOperator(task_id="list_ad_link",
account_id=account_id,
web_property_id=web_property_id)
# [END howto_marketing_platform_retrieve_ads_links_list_operator]
35 changes: 34 additions & 1 deletion airflow/providers/google/marketing_platform/hooks/analytics.py
Expand Up @@ -66,10 +66,43 @@ def list_accounts(self) -> List[Dict[str, Any]]:
# start index has value 1
request = accounts.list(start_index=len(result) + 1)
response = request.execute(num_retries=self.num_retries)
result.extend(response.get('items', []))
result.extend(response.get("items", []))
# result is the number of fetched accounts from Analytics
# when all accounts will be add to the result
# the loop will be break
if response["totalResults"] <= len(result):
break
return result

def list_ad_words_links(
self, account_id: str, web_property_id: str
) -> List[Dict[str, Any]]:
"""
Lists webProperty-Google Ads links for a given web property.
# :param account_id: ID of the account which the given web property belongs to.
# :type account_id: str
# :param web_property_id: Web property ID to retrieve the Google Ads links for.
# :type web_property_id: str
"""

self.log.info("Retrieving ad words list...")
result = [] # type: List[Dict]
conn = self.get_conn()
ads_links = conn.management().webPropertyAdWordsLinks() # pylint: disable=no-member
while True:
# start index has value 1
request = ads_links.list(
accountId=account_id,
webPropertyId=web_property_id,
start_index=len(result) + 1,
)
response = request.execute(num_retries=self.num_retries)
result.extend(response.get("items", []))
# result is the number of fetched links from Analytics
# when all links will be add to the result
# the loop will be break
if response["totalResults"] <= len(result):
break
return result
74 changes: 66 additions & 8 deletions airflow/providers/google/marketing_platform/operators/analytics.py
Expand Up @@ -44,21 +44,79 @@ class GoogleAnalyticsListAccountsOperator(BaseOperator):
:type gcp_conn_id: str
"""

template_fields = ("api_version", "gcp_connection_id",)
template_fields = (
"api_version",
"gcp_connection_id",
)

@apply_defaults
def __init__(self,
api_version: str = "v3",
gcp_connection_id: str = "google_cloud_default",
*args,
**kwargs):
def __init__(
self,
api_version: str = "v3",
gcp_connection_id: str = "google_cloud_default",
*args,
**kwargs
):
super().__init__(*args, **kwargs)

self.api_version = api_version
self.gcp_connection_id = gcp_connection_id

def execute(self, context):
hook = GoogleAnalyticsHook(api_version=self.api_version,
gcp_connection_id=self.gcp_connection_id)
hook = GoogleAnalyticsHook(
api_version=self.api_version, gcp_connection_id=self.gcp_connection_id
)
result = hook.list_accounts()
return result


class GoogleAnalyticsRetrieveAdsLinksListOperator(BaseOperator):
"""
Lists webProperty-Google Ads links for a given web property
.. seealso::
Check official API docs:
https://developers.google.com/analytics/devguides/config/mgmt/v3/mgmtReference/management/webPropertyAdWordsLinks/list#http-request
.. seealso::
For more information on how to use this operator, take a look at the guide:
:ref:`howto/operator:GoogleAnalyticsListAccountsOperator`
:param account_id: ID of the account which the given web property belongs to.
:type account_id: str
:param web_property_id: Web property ID to retrieve the Google Ads links for.
:type web_property_id: str
"""

template_fields = (
"api_version",
"gcp_connection_id",
"account_id",
"web_property_id",
)

@apply_defaults
def __init__(
self,
account_id: str,
web_property_id: str,
api_version: str = "v3",
gcp_connection_id: str = "google_cloud_default",
*args,
**kwargs
):
super().__init__(*args, **kwargs)

self.account_id = account_id
self.web_property_id = web_property_id
self.api_version = api_version
self.gcp_connection_id = gcp_connection_id

def execute(self, context):
hook = GoogleAnalyticsHook(
api_version=self.api_version, gcp_connection_id=self.gcp_connection_id
)
result = hook.list_ad_words_links(
account_id=self.account_id, web_property_id=self.web_property_id,
)
return result
15 changes: 15 additions & 0 deletions docs/howto/operator/gcp/analytics.rst
Expand Up @@ -48,3 +48,18 @@ To list accounts from Analytics you can use the

You can use :ref:`Jinja templating <jinja-templating>` with
:template-fields:`airflow.providers.google.marketing_platform.operators.analytics.GoogleAnalyticsListAccountsOperator`

List Google Ads Links
^^^^^^^^^^^^^^^^^^^^^

To list Google Ads links you can use the
:class:`~airflow.providers.google.marketing_platform.operators.analytics.GoogleAnalyticsRetrieveAdsLinksListOperator`.

.. exampleinclude:: ../../../../airflow/providers/google/marketing_platform/example_dags/example_analytics.py
:language: python
:dedent: 4
:start-after: [START howto_marketing_platform_retrieve_ads_links_list_operator]
:end-before: [END howto_marketing_platform_retrieve_ads_links_list_operator]

You can use :ref:`Jinja templating <jinja-templating>` with
:template-fields:`airflow.providers.google.marketing_platform.operators.analytics.GoogleAnalyticsRetrieveAdsLinksListOperator`
33 changes: 33 additions & 0 deletions tests/providers/google/marketing_platform/hooks/test_analytics.py
Expand Up @@ -26,6 +26,8 @@


class TestGoogleAnalyticsHook(unittest.TestCase):
NUM_RETRIES = 5

def setUp(self):
with mock.patch(
"airflow.providers.google.cloud.hooks.base.CloudBaseHook.__init__",
Expand Down Expand Up @@ -74,3 +76,34 @@ def test_list_accounts_for_multiple_pages(self, get_conn_mock):
]
list_accounts = self.hook.list_accounts()
self.assertEqual(list_accounts, ["a", "b"])

@mock.patch(
"airflow.providers.google.marketing_platform.hooks."
"analytics.GoogleAnalyticsHook.get_conn"
)
def test_list_ad_words_links(self, get_conn_mock):
account_id = "the_knight_who_says_ni!"
web_property_id = "web_property_id"
mock_ads_links = get_conn_mock.return_value.management.return_value.webPropertyAdWordsLinks
mock_list = mock_ads_links.return_value.list
mock_execute = mock_list.return_value.execute
mock_execute.return_value = {"items": ["a", "b"], "totalResults": 2}
list_ads_links = self.hook.list_ad_words_links(account_id=account_id, web_property_id=web_property_id)
self.assertEqual(list_ads_links, ["a", "b"])

@mock.patch(
"airflow.providers.google.marketing_platform.hooks."
"analytics.GoogleAnalyticsHook.get_conn"
)
def test_list_ad_words_links_for_multiple_pages(self, get_conn_mock):
account_id = "the_knight_who_says_ni!"
web_property_id = "web_property_id"
mock_ads_links = get_conn_mock.return_value.management.return_value.webPropertyAdWordsLinks
mock_list = mock_ads_links.return_value.list
mock_execute = mock_list.return_value.execute
mock_execute.side_effect = [
{"items": ["a"], "totalResults": 2},
{"items": ["b"], "totalResults": 2},
]
list_ads_links = self.hook.list_ad_words_links(account_id=account_id, web_property_id=web_property_id)
self.assertEqual(list_ads_links, ["a", "b"])
Expand Up @@ -19,7 +19,7 @@
from unittest import mock

from airflow.providers.google.marketing_platform.operators.analytics import (
GoogleAnalyticsListAccountsOperator,
GoogleAnalyticsListAccountsOperator, GoogleAnalyticsRetrieveAdsLinksListOperator,
)

API_VERSION = "api_version"
Expand All @@ -41,3 +41,30 @@ def test_execute(self, hook_mock):
op.execute(context=None)
hook_mock.assert_called_once()
hook_mock.return_value.list_accounts.assert_called_once()


class TestGoogleAnalyticsRetrieveAdsLinksListOperator(unittest.TestCase):
@mock.patch(
"airflow.providers.google.marketing_platform.operators."
"analytics.GoogleAnalyticsHook"
)
def test_execute(self, hook_mock):
account_id = "the_knight_who_says_ni!"
web_property_id = "42"

op = GoogleAnalyticsRetrieveAdsLinksListOperator(
account_id=account_id,
web_property_id=web_property_id,
api_version=API_VERSION,
gcp_connection_id=GCP_CONN_ID,
task_id="test_task",
)
op.execute(context=None)
hook_mock.assert_called_once()
hook_mock.return_value.list_ad_words_links.assert_called_once()
hook_mock.assert_called_once_with(
gcp_connection_id=GCP_CONN_ID, api_version=API_VERSION
)
hook_mock.return_value.list_ad_words_links.assert_called_once_with(
account_id=account_id, web_property_id=web_property_id
)

0 comments on commit 8c55966

Please sign in to comment.