-
Notifications
You must be signed in to change notification settings - Fork 13.7k
/
search_ads.py
91 lines (76 loc) · 3.02 KB
/
search_ads.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""This module contains Google Search Ads 360 hook."""
from __future__ import annotations
from typing import Any, Sequence
from googleapiclient.discovery import build
from airflow.providers.google.common.hooks.base_google import GoogleBaseHook
class GoogleSearchAdsHook(GoogleBaseHook):
"""Hook for Google Search Ads 360."""
_conn: build | None = None
def __init__(
self,
api_version: str = "v2",
gcp_conn_id: str = "google_cloud_default",
delegate_to: str | None = None,
impersonation_chain: str | Sequence[str] | None = None,
) -> None:
super().__init__(
gcp_conn_id=gcp_conn_id,
delegate_to=delegate_to,
impersonation_chain=impersonation_chain,
)
self.api_version = api_version
def get_conn(self):
"""Retrieve connection to Google SearchAds."""
if not self._conn:
http_authorized = self._authorize()
self._conn = build(
"doubleclicksearch",
self.api_version,
http=http_authorized,
cache_discovery=False,
)
return self._conn
def insert_report(self, report: dict[str, Any]) -> Any:
"""
Insert a report request into the reporting system.
:param report: Report to be generated.
"""
response = self.get_conn().reports().request(body=report).execute(num_retries=self.num_retries)
return response
def get(self, report_id: str) -> Any:
"""
Poll for the status of a report request.
:param report_id: ID of the report request being polled.
"""
response = self.get_conn().reports().get(reportId=report_id).execute(num_retries=self.num_retries)
return response
def get_file(self, report_fragment: int, report_id: str) -> Any:
"""
Download a report file encoded in UTF-8.
:param report_fragment: The index of the report fragment to download.
:param report_id: ID of the report.
"""
response = (
self.get_conn()
.reports()
.getFile(reportFragment=report_fragment, reportId=report_id)
.execute(num_retries=self.num_retries)
)
return response