Skip to content

Commit

Permalink
[AIRFLOW-6752] Add GoogleAnalyticsRetrieveAdsLinksListOperator (#7748)
Browse files Browse the repository at this point in the history
fixes after review

Co-authored-by: michalslowikowski00 <michal.slowikowski@polidea.com>
  • Loading branch information
michalslowikowski00 and michalslowikowski00 committed Mar 20, 2020
1 parent 6c4e90d commit 5106a29
Show file tree
Hide file tree
Showing 7 changed files with 225 additions and 12 deletions.
Expand Up @@ -17,15 +17,19 @@
"""
Example Airflow DAG that shows how to use Google Analytics 360.
"""
import os

from airflow import models
from airflow.providers.google.marketing_platform.operators.analytics import (
GoogleAnalyticsListAccountsOperator,
GoogleAnalyticsListAccountsOperator, GoogleAnalyticsRetrieveAdsLinksListOperator,
)
from airflow.utils import dates

default_args = {"start_date": dates.days_ago(1)}

ACCOUNT_ID = os.environ.get("GA_ACCOUNT_ID", "123456789")
WEB_PROPERTY = os.environ.get("GA_WEB_PROPERTY", "UA-12345678-1")

with models.DAG(
"example_google_analytics",
default_args=default_args,
Expand All @@ -34,3 +38,9 @@
# [START howto_marketing_platform_list_accounts_operator]
list_account = GoogleAnalyticsListAccountsOperator(task_id="list_account")
# [END howto_marketing_platform_list_accounts_operator]

# [START howto_marketing_platform_retrieve_ads_links_list_operator]
list_ad_link = GoogleAnalyticsRetrieveAdsLinksListOperator(task_id="list_ad_link",
account_id=ACCOUNT_ID,
web_property_id=WEB_PROPERTY)
# [END howto_marketing_platform_retrieve_ads_links_list_operator]
39 changes: 37 additions & 2 deletions airflow/providers/google/marketing_platform/hooks/analytics.py
Expand Up @@ -30,7 +30,7 @@ class GoogleAnalyticsHook(CloudBaseHook):
def __init__(
self,
api_version: str = "v3",
gcp_connection_id: str = "google cloud default",
gcp_connection_id: str = "google_cloud_default",
*args,
**kwargs
):
Expand Down Expand Up @@ -66,10 +66,45 @@ def list_accounts(self) -> List[Dict[str, Any]]:
# start index has value 1
request = accounts.list(start_index=len(result) + 1)
response = request.execute(num_retries=self.num_retries)
result.extend(response.get('items', []))
result.extend(response.get("items", []))
# result is the number of fetched accounts from Analytics
# when all accounts will be add to the result
# the loop will be break
if response["totalResults"] <= len(result):
break
return result

def list_ad_words_links(
self, account_id: str, web_property_id: str
) -> List[Dict[str, Any]]:
"""
Lists webProperty-Google Ads links for a given web property.
:param account_id: ID of the account which the given web property belongs to.
:type account_id: str
:param web_property_id: Web property UA-string to retrieve the Google Ads links for.
:type web_property_id: str
:returns: list of entity Google Ads links.
:rtype: list
"""

self.log.info("Retrieving ad words list...")
result = [] # type: List[Dict]
conn = self.get_conn()
ads_links = conn.management().webPropertyAdWordsLinks() # pylint: disable=no-member
while True:
# start index has value 1
request = ads_links.list(
accountId=account_id,
webPropertyId=web_property_id,
start_index=len(result) + 1,
)
response = request.execute(num_retries=self.num_retries)
result.extend(response.get("items", []))
# result is the number of fetched links from Analytics
# when all links will be added to the result
# the loop will break
if response["totalResults"] <= len(result):
break
return result
74 changes: 66 additions & 8 deletions airflow/providers/google/marketing_platform/operators/analytics.py
Expand Up @@ -44,21 +44,79 @@ class GoogleAnalyticsListAccountsOperator(BaseOperator):
:type gcp_conn_id: str
"""

template_fields = ("api_version", "gcp_connection_id",)
template_fields = (
"api_version",
"gcp_connection_id",
)

@apply_defaults
def __init__(self,
api_version: str = "v3",
gcp_connection_id: str = "google_cloud_default",
*args,
**kwargs):
def __init__(
self,
api_version: str = "v3",
gcp_connection_id: str = "google_cloud_default",
*args,
**kwargs
):
super().__init__(*args, **kwargs)

self.api_version = api_version
self.gcp_connection_id = gcp_connection_id

def execute(self, context):
hook = GoogleAnalyticsHook(api_version=self.api_version,
gcp_connection_id=self.gcp_connection_id)
hook = GoogleAnalyticsHook(
api_version=self.api_version, gcp_connection_id=self.gcp_connection_id
)
result = hook.list_accounts()
return result


class GoogleAnalyticsRetrieveAdsLinksListOperator(BaseOperator):
"""
Lists webProperty-Google Ads links for a given web property
.. seealso::
Check official API docs:
https://developers.google.com/analytics/devguides/config/mgmt/v3/mgmtReference/management/webPropertyAdWordsLinks/list#http-request
.. seealso::
For more information on how to use this operator, take a look at the guide:
:ref:`howto/operator:GoogleAnalyticsListAccountsOperator`
:param account_id: ID of the account which the given web property belongs to.
:type account_id: str
:param web_property_id: Web property UA-string to retrieve the Google Ads links for.
:type web_property_id: str
"""

template_fields = (
"api_version",
"gcp_connection_id",
"account_id",
"web_property_id",
)

@apply_defaults
def __init__(
self,
account_id: str,
web_property_id: str,
api_version: str = "v3",
gcp_connection_id: str = "google_cloud_default",
*args,
**kwargs
):
super().__init__(*args, **kwargs)

self.account_id = account_id
self.web_property_id = web_property_id
self.api_version = api_version
self.gcp_connection_id = gcp_connection_id

def execute(self, context):
hook = GoogleAnalyticsHook(
api_version=self.api_version, gcp_connection_id=self.gcp_connection_id
)
result = hook.list_ad_words_links(
account_id=self.account_id, web_property_id=self.web_property_id,
)
return result
16 changes: 16 additions & 0 deletions docs/howto/operator/gcp/analytics.rst
Expand Up @@ -48,3 +48,19 @@ To list accounts from Analytics you can use the

You can use :ref:`Jinja templating <jinja-templating>` with
:template-fields:`airflow.providers.google.marketing_platform.operators.analytics.GoogleAnalyticsListAccountsOperator`

List Google Ads Links
^^^^^^^^^^^^^^^^^^^^^

Operator returns a list of entity Google Ads links.
To list Google Ads links you can use the
:class:`~airflow.providers.google.marketing_platform.operators.analytics.GoogleAnalyticsRetrieveAdsLinksListOperator`.

.. exampleinclude:: ../../../../airflow/providers/google/marketing_platform/example_dags/example_analytics.py
:language: python
:dedent: 4
:start-after: [START howto_marketing_platform_retrieve_ads_links_list_operator]
:end-before: [END howto_marketing_platform_retrieve_ads_links_list_operator]

You can use :ref:`Jinja templating <jinja-templating>` with
:template-fields:`airflow.providers.google.marketing_platform.operators.analytics.GoogleAnalyticsRetrieveAdsLinksListOperator`
33 changes: 33 additions & 0 deletions tests/providers/google/marketing_platform/hooks/test_analytics.py
Expand Up @@ -26,6 +26,8 @@


class TestGoogleAnalyticsHook(unittest.TestCase):
NUM_RETRIES = 5

def setUp(self):
with mock.patch(
"airflow.providers.google.cloud.hooks.base.CloudBaseHook.__init__",
Expand Down Expand Up @@ -74,3 +76,34 @@ def test_list_accounts_for_multiple_pages(self, get_conn_mock):
]
list_accounts = self.hook.list_accounts()
self.assertEqual(list_accounts, ["a", "b"])

@mock.patch(
"airflow.providers.google.marketing_platform.hooks."
"analytics.GoogleAnalyticsHook.get_conn"
)
def test_list_ad_words_links(self, get_conn_mock):
account_id = "the_knight_who_says_ni!"
web_property_id = "web_property_id"
mock_ads_links = get_conn_mock.return_value.management.return_value.webPropertyAdWordsLinks
mock_list = mock_ads_links.return_value.list
mock_execute = mock_list.return_value.execute
mock_execute.return_value = {"items": ["a", "b"], "totalResults": 2}
list_ads_links = self.hook.list_ad_words_links(account_id=account_id, web_property_id=web_property_id)
self.assertEqual(list_ads_links, ["a", "b"])

@mock.patch(
"airflow.providers.google.marketing_platform.hooks."
"analytics.GoogleAnalyticsHook.get_conn"
)
def test_list_ad_words_links_for_multiple_pages(self, get_conn_mock):
account_id = "the_knight_who_says_ni!"
web_property_id = "web_property_id"
mock_ads_links = get_conn_mock.return_value.management.return_value.webPropertyAdWordsLinks
mock_list = mock_ads_links.return_value.list
mock_execute = mock_list.return_value.execute
mock_execute.side_effect = [
{"items": ["a"], "totalResults": 2},
{"items": ["b"], "totalResults": 2},
]
list_ads_links = self.hook.list_ad_words_links(account_id=account_id, web_property_id=web_property_id)
self.assertEqual(list_ads_links, ["a", "b"])
Expand Up @@ -19,7 +19,7 @@
from unittest import mock

from airflow.providers.google.marketing_platform.operators.analytics import (
GoogleAnalyticsListAccountsOperator,
GoogleAnalyticsListAccountsOperator, GoogleAnalyticsRetrieveAdsLinksListOperator,
)

API_VERSION = "api_version"
Expand All @@ -41,3 +41,30 @@ def test_execute(self, hook_mock):
op.execute(context=None)
hook_mock.assert_called_once()
hook_mock.return_value.list_accounts.assert_called_once()


class TestGoogleAnalyticsRetrieveAdsLinksListOperator(unittest.TestCase):
@mock.patch(
"airflow.providers.google.marketing_platform.operators."
"analytics.GoogleAnalyticsHook"
)
def test_execute(self, hook_mock):
account_id = "the_knight_who_says_ni!"
web_property_id = "42"

op = GoogleAnalyticsRetrieveAdsLinksListOperator(
account_id=account_id,
web_property_id=web_property_id,
api_version=API_VERSION,
gcp_connection_id=GCP_CONN_ID,
task_id="test_task",
)
op.execute(context=None)
hook_mock.assert_called_once()
hook_mock.return_value.list_ad_words_links.assert_called_once()
hook_mock.assert_called_once_with(
gcp_connection_id=GCP_CONN_ID, api_version=API_VERSION
)
hook_mock.return_value.list_ad_words_links.assert_called_once_with(
account_id=account_id, web_property_id=web_property_id
)
@@ -0,0 +1,34 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

import pytest

from tests.providers.google.cloud.utils.gcp_authenticator import GMP_KEY
from tests.test_utils.gcp_system_helpers import MARKETING_DAG_FOLDER, GoogleSystemTest, provide_gcp_context

# Required scopes
SCOPES = [
'https://www.googleapis.com/auth/analytics',
'https://www.googleapis.com/auth/analytics.edit',
'https://www.googleapis.com/auth/cloud-platform',
]
@pytest.mark.system("google.marketing_platform")
@pytest.mark.credential_file(GMP_KEY)
class TestSystemGoogleAds(GoogleSystemTest):
@provide_gcp_context(GMP_KEY, scopes=SCOPES)
def test_run_example_dag(self):
self.run_dag('example_google_analytics', MARKETING_DAG_FOLDER)

0 comments on commit 5106a29

Please sign in to comment.