Skip to content

Commit

Permalink
[AIRFLOW-4971] Add Google Display & Video 360 integration (#6170)
Browse files Browse the repository at this point in the history
  • Loading branch information
turbaszek authored and mik-laj committed Oct 22, 2019
1 parent 4e661f5 commit 16d7acc
Show file tree
Hide file tree
Showing 12 changed files with 1,219 additions and 0 deletions.
@@ -0,0 +1,97 @@
# -*- coding: utf-8 -*-
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""
Example Airflow DAG that shows how to use DisplayVideo.
"""

from airflow import models
from airflow.providers.google.marketing_platform.operators.display_video import (
GoogleDisplayVideo360CreateReportOperator, GoogleDisplayVideo360DeleteReportOperator,
GoogleDisplayVideo360DownloadReportOperator, GoogleDisplayVideo360RunReportOperator,
)
from airflow.providers.google.marketing_platform.sensors.display_video import (
GoogleDisplayVideo360ReportSensor,
)
from airflow.utils import dates

# [START howto_display_video_env_variables]
BUCKET = "gs://test-display-video-bucket"
REPORT = {
"kind": "doubleclickbidmanager#query",
"metadata": {
"title": "Polidea Test Report",
"dataRange": "LAST_7_DAYS",
"format": "CSV",
"sendNotification": False,
},
"params": {
"type": "TYPE_GENERAL",
"groupBys": ["FILTER_DATE", "FILTER_PARTNER"],
"filters": [{"type": "FILTER_PARTNER", "value": 1486931}],
"metrics": ["METRIC_IMPRESSIONS", "METRIC_CLICKS"],
"includeInviteData": True,
},
"schedule": {"frequency": "ONE_TIME"},
}

PARAMS = {"dataRange": "LAST_14_DAYS", "timezoneCode": "America/New_York"}
# [END howto_display_video_env_variables]

default_args = {"start_date": dates.days_ago(1)}

with models.DAG(
"example_display_video",
default_args=default_args,
schedule_interval=None, # Override to match your needs
) as dag:
# [START howto_google_display_video_createquery_report_operator]
create_report = GoogleDisplayVideo360CreateReportOperator(
body=REPORT, task_id="create_report"
)
report_id = "{{ task_instance.xcom_pull('create_report', key='report_id') }}"
# [END howto_google_display_video_createquery_report_operator]

# [START howto_google_display_video_runquery_report_operator]
run_report = GoogleDisplayVideo360RunReportOperator(
report_id=report_id, params=PARAMS, task_id="run_report"
)
# [END howto_google_display_video_runquery_report_operator]

# [START howto_google_display_video_wait_report_operator]
wait_for_report = GoogleDisplayVideo360ReportSensor(
task_id="wait_for_report", report_id=report_id
)
# [END howto_google_display_video_wait_report_operator]

# [START howto_google_display_video_getquery_report_operator]
get_report = GoogleDisplayVideo360DownloadReportOperator(
report_id=report_id,
task_id="get_report",
bucket_name=BUCKET,
report_name="test1.csv",
)
# [END howto_google_display_video_getquery_report_operator]

# [START howto_google_display_video_deletequery_report_operator]
delete_report = GoogleDisplayVideo360DeleteReportOperator(
report_id=report_id, task_id="delete_report"
)
# [END howto_google_display_video_deletequery_report_operator]

create_report >> run_report >> wait_for_report >> get_report >> delete_report
130 changes: 130 additions & 0 deletions airflow/providers/google/marketing_platform/hooks/display_video.py
@@ -0,0 +1,130 @@
# -*- coding: utf-8 -*-
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""
This module contains Google DisplayVideo hook.
"""
from typing import Any, Dict, List, Optional

from googleapiclient.discovery import Resource, build

from airflow.gcp.hooks.base import GoogleCloudBaseHook


class GoogleDisplayVideo360Hook(GoogleCloudBaseHook):
"""
Hook for Google Display & Video 360.
"""

_conn = None # type: Optional[Any]

def __init__(
self,
api_version: str = "v1",
gcp_conn_id: str = "google_cloud_default",
delegate_to: Optional[str] = None,
) -> None:
super().__init__(gcp_conn_id, delegate_to)
self.api_version = api_version

def get_conn(self) -> Resource:
"""
Retrieves connection to DisplayVideo.
"""
if not self._conn:
http_authorized = self._authorize()
self._conn = build(
"doubleclickbidmanager",
self.api_version,
http=http_authorized,
cache_discovery=False,
)
return self._conn

def create_query(self, query: Dict[str, Any]) -> Dict:
"""
Creates a query.
:param query: Query object to be passed to request body.
:type query: Dict[str, Any]
"""
response = (
self.get_conn() # pylint:disable=no-member
.queries()
.createquery(body=query)
.execute(num_retries=self.num_retries)
)
return response

def delete_query(self, query_id: str) -> None:
"""
Deletes a stored query as well as the associated stored reports.
:param query_id: Query ID to delete.
:type query_id: str
"""
(
self.get_conn() # pylint:disable=no-member
.queries()
.deletequery(queryId=query_id)
.execute(num_retries=self.num_retries)
)

def get_query(self, query_id: str) -> Dict:
"""
Retrieves a stored query.
:param query_id: Query ID to retrieve.
:type query_id: str
"""
response = (
self.get_conn() # pylint:disable=no-member
.queries()
.getquery(queryId=query_id)
.execute(num_retries=self.num_retries)
)
return response

def list_queries(self, ) -> List[Dict]:
"""
Retrieves stored queries.
"""
response = (
self.get_conn() # pylint:disable=no-member
.queries()
.listqueries()
.execute(num_retries=self.num_retries)
)
return response.get('queries', [])

def run_query(self, query_id: str, params: Dict[str, Any]) -> None:
"""
Runs a stored query to generate a report.
:param query_id: Query ID to run.
:type query_id: str
:param params: Parameters for the report.
:type params: Dict[str, Any]
"""
(
self.get_conn() # pylint:disable=no-member
.queries()
.runquery(queryId=query_id, body=params)
.execute(num_retries=self.num_retries)
)

0 comments on commit 16d7acc

Please sign in to comment.