Skip to content

Commit

Permalink
✨ Source Bing Ads: custom reports (#32306)
Browse files Browse the repository at this point in the history
Co-authored-by: darynaishchenko <darynaishchenko@users.noreply.github.com>
  • Loading branch information
darynaishchenko and darynaishchenko committed Nov 14, 2023
1 parent f65569e commit dc48c4f
Show file tree
Hide file tree
Showing 8 changed files with 516 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ data:
connectorSubtype: api
connectorType: source
definitionId: 47f25999-dd5e-4636-8c39-e7cea2453331
dockerImageTag: 1.12.1
dockerImageTag: 1.13.0
dockerRepository: airbyte/source-bing-ads
documentationUrl: https://docs.airbyte.com/integrations/sources/bing-ads
githubIssueLabel: source-bing-ads
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ class Client:
# https://docs.microsoft.com/en-us/advertising/guides/services-protocol?view=bingads-13#throttling
# https://docs.microsoft.com/en-us/advertising/guides/operation-error-codes?view=bingads-13
retry_on_codes: Iterator[str] = ["117", "207", "4204", "109", "0"]
max_retries: int = 10
max_retries: int = 5
# A backoff factor to apply between attempts after the second try
# {retry_factor} * (2 ** ({number of total retries} - 1))
retry_factor: int = 15
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#
from itertools import product
from typing import Any, List, Mapping, Tuple
from typing import Any, List, Mapping, Optional, Tuple

from airbyte_cdk import AirbyteLogger
from airbyte_cdk.models import FailureType, SyncMode
Expand Down Expand Up @@ -41,6 +41,7 @@
AgeGenderAudienceReportWeekly,
AppInstallAdLabels,
AppInstallAds,
BingAdsReportingServiceStream,
BudgetSummaryReport,
CampaignImpressionPerformanceReportDaily,
CampaignImpressionPerformanceReportHourly,
Expand All @@ -52,6 +53,7 @@
CampaignPerformanceReportMonthly,
CampaignPerformanceReportWeekly,
Campaigns,
CustomReport,
GeographicPerformanceReportDaily,
GeographicPerformanceReportHourly,
GeographicPerformanceReportMonthly,
Expand Down Expand Up @@ -83,17 +85,49 @@ def check_connection(self, logger: AirbyteLogger, config: Mapping[str, Any]) ->
try:
client = Client(**config)
account_ids = {str(account["Id"]) for account in Accounts(client, config).read_records(SyncMode.full_refresh)}
self.validate_custom_reposts(config, client)
if account_ids:
return True, None
else:
raise AirbyteTracedException(
message="Config validation error: You don't have accounts assigned to this user.",
message="Config validation error: You don't have accounts assigned to this user. Please verify your developer token.",
internal_message="You don't have accounts assigned to this user.",
failure_type=FailureType.config_error,
)
except Exception as error:
return False, error

def validate_custom_reposts(self, config: Mapping[str, Any], client: Client):
custom_reports = self.get_custom_reports(config, client)
for custom_report in custom_reports:
is_valid, reason = custom_report.validate_report_configuration()
if not is_valid:
raise AirbyteTracedException(
message=f"Config validation error: {custom_report.name}: {reason}",
internal_message=f"{custom_report.name}: {reason}",
failure_type=FailureType.config_error,
)

def _clear_reporting_object_name(self, report_object: str) -> str:
# reporting mixin adds it
if report_object.endswith("Request"):
return report_object.replace("Request", "")
return report_object

def get_custom_reports(self, config: Mapping[str, Any], client: Client) -> List[Optional[Stream]]:
return [
type(
report["name"],
(CustomReport,),
{
"report_name": self._clear_reporting_object_name(report["reporting_object"]),
"custom_report_columns": report["report_columns"],
"report_aggregation": report["report_aggregation"],
},
)(client, config)
for report in config.get("custom_reports", [])
]

def streams(self, config: Mapping[str, Any]) -> List[Stream]:
client = Client(**config)
streams = [
Expand Down Expand Up @@ -127,4 +161,7 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:
)
report_aggregation = ("Hourly", "Daily", "Weekly", "Monthly")
streams.extend([eval(f"{report}{aggregation}")(client, config) for (report, aggregation) in product(reports, report_aggregation)])

custom_reports = self.get_custom_reports(config, client)
streams.extend(custom_reports)
return streams
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,105 @@
"minimum": 0,
"maximum": 90,
"order": 6
},
"custom_reports": {
"title": "Custom Reports",
"description": "You can add your Custom Bing Ads report by creating one.",
"order": 7,
"type": "array",
"items": {
"title": "Custom Report Config",
"type": "object",
"properties": {
"name": {
"title": "Report Name",
"description": "The name of the custom report, this name would be used as stream name",
"type": "string",
"examples": [
"Account Performance",
"AdDynamicTextPerformanceReport",
"custom report"
]
},
"reporting_object": {
"title": "Reporting Data Object",
"description": "The name of the the object derives from the ReportRequest object. You can find it in Bing Ads Api docs - Reporting API - Reporting Data Objects.",
"type": "string",
"enum": [
"AccountPerformanceReportRequest",
"AdDynamicTextPerformanceReportRequest",
"AdExtensionByAdReportRequest",
"AdExtensionByKeywordReportRequest",
"AdExtensionDetailReportRequest",
"AdGroupPerformanceReportRequest",
"AdPerformanceReportRequest",
"AgeGenderAudienceReportRequest",
"AudiencePerformanceReportRequest",
"CallDetailReportRequest",
"CampaignPerformanceReportRequest",
"ConversionPerformanceReportRequest",
"DestinationUrlPerformanceReportRequest",
"DSAAutoTargetPerformanceReportRequest",
"DSACategoryPerformanceReportRequest",
"DSASearchQueryPerformanceReportRequest",
"GeographicPerformanceReportRequest",
"GoalsAndFunnelsReportRequest",
"HotelDimensionPerformanceReportRequest",
"HotelGroupPerformanceReportRequest",
"KeywordPerformanceReportRequest",
"NegativeKeywordConflictReportRequest",
"ProductDimensionPerformanceReportRequest",
"ProductMatchCountReportRequest",
"ProductNegativeKeywordConflictReportRequest",
"ProductPartitionPerformanceReportRequest",
"ProductPartitionUnitPerformanceReportRequest",
"ProductSearchQueryPerformanceReportRequest",
"ProfessionalDemographicsAudienceReportRequest",
"PublisherUsagePerformanceReportRequest",
"SearchCampaignChangeHistoryReportRequest",
"SearchQueryPerformanceReportRequest",
"ShareOfVoiceReportRequest",
"UserLocationPerformanceReportRequest"
]
},
"report_columns": {
"title": "Columns",
"description": "A list of available report object columns. You can find it in description of reporting object that you want to add to custom report.",
"type": "array",
"items": {
"description": "Name of report column.",
"type": "string"
},
"minItems": 1
},
"report_aggregation": {
"title": "Aggregation",
"description": "A list of available aggregations.",
"type": "string",
"items": {
"title": "ValidEnums",
"description": "An enumeration of aggregations.",
"enum": [
"Hourly",
"Daily",
"Weekly",
"Monthly",
"DayOfWeek",
"HourOfDay",
"WeeklyStartingMonday",
"Summary"
]
},
"default": ["Hourly"]
}
},
"required": [
"name",
"reporting_object",
"report_columns",
"report_aggregation"
]
}
}
}
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,15 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#
import os
import re
import ssl
import time
import xml.etree.ElementTree as ET
from abc import ABC, abstractmethod
from datetime import timezone
from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Tuple, Union
from urllib.error import URLError
from urllib.parse import urlparse

import _csv
import pandas as pd
Expand All @@ -16,6 +19,7 @@
from airbyte_cdk.sources.streams import IncrementalMixin, Stream
from airbyte_cdk.sources.utils.transform import TransformConfig, TypeTransformer
from bingads.service_client import ServiceClient
from bingads.v13.internal.reporting.row_report import _RowReport
from bingads.v13.internal.reporting.row_report_iterator import _RowReportRecord
from bingads.v13.reporting.reporting_service_manager import ReportingServiceManager
from numpy import nan
Expand All @@ -32,7 +36,7 @@
PerformanceReportsMixin,
ReportsMixin,
)
from suds import sudsobject
from suds import WebFault, sudsobject


class BingAdsBaseStream(Stream, ABC):
Expand Down Expand Up @@ -1260,3 +1264,95 @@ class UserLocationPerformanceReportWeekly(UserLocationPerformanceReport):

class UserLocationPerformanceReportMonthly(UserLocationPerformanceReport):
report_aggregation = "Monthly"


class CustomReport(PerformanceReportsMixin, BingAdsReportingServiceStream, ABC):
transformer: TypeTransformer = TypeTransformer(TransformConfig.DefaultSchemaNormalization)
custom_report_columns = []
report_schema_name = None
primary_key = None

@property
def cursor_field(self) -> Union[str, List[str]]:
# Summary aggregation doesn't include TimePeriod field
if self.report_aggregation != "Summary":
return "TimePeriod"

@property
def report_columns(self):
# adding common and default columns
if "AccountId" not in self.custom_report_columns:
self.custom_report_columns.append("AccountId")
if self.cursor_field and self.cursor_field not in self.custom_report_columns:
self.custom_report_columns.append(self.cursor_field)
return list(frozenset(self.custom_report_columns))

def get_json_schema(self) -> Mapping[str, Any]:
columns_schema = {col: {"type": ["null", "string"]} for col in self.report_columns}
schema: Mapping[str, Any] = {
"$schema": "https://json-schema.org/draft-07/schema#",
"type": ["null", "object"],
"additionalProperties": True,
"properties": columns_schema,
}
return schema

def validate_report_configuration(self) -> Tuple[bool, str]:
# gets /bingads/v13/proxies/production/reporting_service.xml
reporting_service_file = self.client.get_service(self.service_name)._get_service_info_dict(self.client.api_version)[
("reporting", self.client.environment)
]
tree = ET.parse(urlparse(reporting_service_file).path)
request_object = tree.find(f".//{{*}}complexType[@name='{self.report_name}Request']")

report_object_columns = self._get_object_columns(request_object, tree)
is_custom_cols_in_report_object_cols = all(x in report_object_columns for x in self.custom_report_columns)

if not is_custom_cols_in_report_object_cols:
return False, (
f"Reporting Columns are invalid. Columns that you provided don't belong to Reporting Data Object Columns:"
f" {self.custom_report_columns}. Please ensure it is correct in Bing Ads Docs."
)

return True, ""

def _clear_namespace(self, type: str) -> str:
return re.sub(r"^[a-z]+:", "", type)

def _get_object_columns(self, request_el: ET.Element, tree: ET.ElementTree) -> List[str]:
column_el = request_el.find(".//{*}element[@name='Columns']")
array_of_columns_name = self._clear_namespace(column_el.get("type"))

array_of_columns_elements = tree.find(f".//{{*}}complexType[@name='{array_of_columns_name}']")
inner_array_of_columns_elements = array_of_columns_elements.find(".//{*}element")
column_el_name = self._clear_namespace(inner_array_of_columns_elements.get("type"))

column_el = tree.find(f".//{{*}}simpleType[@name='{column_el_name}']")
column_enum_items = column_el.findall(".//{*}enumeration")
column_enum_items_values = [el.get("value") for el in column_enum_items]
return column_enum_items_values

def get_report_record_timestamp(self, datestring: str) -> int:
"""
Parse report date field based on aggregation type
"""
if not self.report_aggregation:
date = pendulum.from_format(datestring, "M/D/YYYY")
else:
if self.report_aggregation in ["DayOfWeek", "HourOfDay"]:
return int(datestring)
if self.report_aggregation == "Hourly":
date = pendulum.from_format(datestring, "YYYY-MM-DD|H")
else:
date = pendulum.parse(datestring)

return date.int_timestamp

def send_request(self, params: Mapping[str, Any], customer_id: str, account_id: str) -> _RowReport:
try:
return super().send_request(params, customer_id, account_id)
except WebFault as e:
self.logger.error(
f"Could not sync custom report {self.name}: Please validate your column and aggregation configuration. "
f"Error form server: [{e.fault.faultstring}]"
)
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import source_bing_ads.client
from airbyte_cdk.utils import AirbyteTracedException
from bingads.authorization import AuthorizationData, OAuthTokens
from bingads.v13.bulk import BulkServiceManager
from bingads.v13.reporting.exceptions import ReportingDownloadException
from suds import sudsobject

Expand Down Expand Up @@ -176,3 +177,15 @@ def test_bulk_service_manager(patched_request_tokens):
client = source_bing_ads.client.Client("tenant_id", "2020-01-01", client_id="client_id", refresh_token="refresh_token")
service = client._bulk_service_manager()
assert (service._poll_interval_in_milliseconds, service._environment) == (5000, client.environment)


def test_get_bulk_entity(requests_mock):
requests_mock.post(
"https://login.microsoftonline.com/tenant_id/oauth2/v2.0/token",
status_code=200,
json={"access_token": "test", "expires_in": "9000", "refresh_token": "test"},
)
client = source_bing_ads.client.Client("tenant_id", "2020-01-01", client_id="client_id", refresh_token="refresh_token")
with patch.object(BulkServiceManager, "download_file", return_value="file.csv"):
bulk_entity = client.get_bulk_entity(data_scope=["EntityData"], download_entities=["AppInstallAds"])
assert bulk_entity == "file.csv"

0 comments on commit dc48c4f

Please sign in to comment.