Skip to content

Commit

Permalink
🎉Source Bing Ads: Add Report streams (#5750)
Browse files Browse the repository at this point in the history
added bing ads report streams
  • Loading branch information
yaroslav-dudar committed Sep 9, 2021
1 parent 8179992 commit 70513bc
Show file tree
Hide file tree
Showing 22 changed files with 1,330 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
"sourceDefinitionId": "47f25999-dd5e-4636-8c39-e7cea2453331",
"name": "Bing Ads",
"dockerRepository": "airbyte/source-bing-ads",
"dockerImageTag": "0.1.0",
"dockerImageTag": "0.1.1",
"documentationUrl": "https://docs.airbyte.io/integrations/sources/bing-ads"
}
Original file line number Diff line number Diff line change
Expand Up @@ -435,7 +435,7 @@
- sourceDefinitionId: 47f25999-dd5e-4636-8c39-e7cea2453331
name: Bing Ads
dockerRepository: airbyte/source-bing-ads
dockerImageTag: 0.1.0
dockerImageTag: 0.1.1
documentationUrl: https://docs.airbyte.io/integrations/sources/bing-ads
- sourceDefinitionId: 59c5501b-9f95-411e-9269-7143c939adbd
name: BigCommerce
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-bing-ads/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,5 @@ RUN pip install .
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.1.0
LABEL io.airbyte.version=0.1.1
LABEL io.airbyte.name=airbyte/source-bing-ads
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ tests:
basic_read:
- config_path: "secrets/config.json"
configured_catalog_path: "integration_tests/configured_catalog.json"
# connector doesn't have incremental streams
# incremental test doesn't work if a single stream has multiple states
#incremental:
# - config_path: "secrets/config.json"
# configured_catalog_path: "integration_tests/configured_catalog.json"
Expand Down
44 changes: 44 additions & 0 deletions airbyte-integrations/connectors/source-bing-ads/bootstrap.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@

## Core streams

Bing Ads is a SOAP based API. Connector is implemented with [SDK](https://github.com/BingAds/BingAds-Python-SDK) library

Connector has such core streams, and all of them support full refresh only:
* [Account](https://docs.microsoft.com/en-us/advertising/customer-management-service/advertiseraccount?view=bingads-13)
* [Campaign](https://docs.microsoft.com/en-us/advertising/campaign-management-service/campaign?view=bingads-13)
* [AdGroup](https://docs.microsoft.com/en-us/advertising/campaign-management-service/getadgroupsbycampaignid?view=bingads-13)
* [Ad](https://docs.microsoft.com/en-us/advertising/campaign-management-service/getadsbyadgroupid?view=bingads-13)


## Report streams

Connector also has report streams, which support incremental sync.

* [AccountPerformanceReport](https://docs.microsoft.com/en-us/advertising/reporting-service/accountperformancereportrequest?view=bingads-13)
* [AdPerformanceReport](https://docs.microsoft.com/en-us/advertising/reporting-service/adperformancereportrequest?view=bingads-13)
* [AdGroupPerformanceReport](https://docs.microsoft.com/en-us/advertising/reporting-service/adgroupperformancereportrequest?view=bingads-13)
* [CampaignPerformanceReport](https://docs.microsoft.com/en-us/advertising/reporting-service/campaignperformancereportrequest?view=bingads-13)
* [BudgetSummaryReport](https://docs.microsoft.com/en-us/advertising/reporting-service/budgetsummaryreportrequest?view=bingads-13)
* [KeywordPerformanceReport](https://docs.microsoft.com/en-us/advertising/reporting-service/keywordperformancereportrequest?view=bingads-13)

To be able to pull report data you need to generate 2 separate requests.

* [First](https://docs.microsoft.com/en-us/advertising/reporting-service/submitgeneratereport?view=bingads-13) - to request appropriate report

* [Second](https://docs.microsoft.com/en-us/advertising/reporting-service/pollgeneratereport?view=bingads-13) - to poll acatual data. Report download timeout is 5 min

Initially all fields in report streams have string values, connector uses `reports.REPORT_FIELD_TYPES` collection to transform values to numerical fields if possible

Connector uses `reports_start_date` config for initial reports sync and current date as an end data.

Connector has `hourly_reports`, `daily_reports`, `weekly_reports`, `monthly_reports` configs which allows to enable appropriate report streams. For example `account_performance_report_daily`, `ad_group_performance_report_daily` etc ... By default all report streams are disabled

## Request caching

Based on [library](https://vcrpy.readthedocs.io/en/latest/)

Connector uses caching for these streams:

* Account
* Campaign
* AdGroup
14 changes: 14 additions & 0 deletions airbyte-integrations/connectors/source-bing-ads/build.gradle
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import ru.vyarus.gradle.plugin.python.task.PythonTask

plugins {
id 'airbyte-python'
id 'airbyte-docker'
Expand All @@ -7,3 +9,15 @@ plugins {
airbytePython {
moduleDirectory 'source_bing_ads'
}

// setuptools 58.* removed support for use_2to3 which leads to the following issue:
// error in suds-jurko setup command: use_2to3 is invalid.
// https://setuptools.readthedocs.io/en/latest/history.html#v58-0-0
// To be able to resolve this issue connector need to use 57.* version of setuptools
// TODO: Remove this step after resolution of this issue https://github.com/BingAds/BingAds-Python-SDK/issues/191
task("customSetupToolsInstall", type: PythonTask, dependsOn: flakeCheck){
module = "pip"
command = "install setuptools==57.5.0"
}

installLocalReqs.dependsOn("customSetupToolsInstall")
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,116 @@
},
"sync_mode": "full_refresh",
"destination_sync_mode": "append"
},
{
"stream": {
"name": "account_performance_report_weekly",
"json_schema": {},
"supported_sync_modes": ["incremental", "full_refresh"]
},
"sync_mode": "incremental",
"cursor_field": ["TimePeriod"],
"destination_sync_mode": "append"
},
{
"stream": {
"name": "account_performance_report_monthly",
"json_schema": {},
"supported_sync_modes": ["incremental", "full_refresh"]
},
"sync_mode": "incremental",
"cursor_field": ["TimePeriod"],
"destination_sync_mode": "append"
},
{
"stream": {
"name": "ad_group_performance_report_weekly",
"json_schema": {},
"supported_sync_modes": ["incremental", "full_refresh"]
},
"sync_mode": "incremental",
"cursor_field": ["TimePeriod"],
"destination_sync_mode": "append"
},
{
"stream": {
"name": "ad_group_performance_report_monthly",
"json_schema": {},
"supported_sync_modes": ["incremental", "full_refresh"]
},
"sync_mode": "incremental",
"cursor_field": ["TimePeriod"],
"destination_sync_mode": "append"
},
{
"stream": {
"name": "ad_performance_report_weekly",
"json_schema": {},
"supported_sync_modes": ["incremental", "full_refresh"]
},
"sync_mode": "incremental",
"cursor_field": ["TimePeriod"],
"destination_sync_mode": "append"
},
{
"stream": {
"name": "ad_performance_report_monthly",
"json_schema": {},
"supported_sync_modes": ["incremental", "full_refresh"]
},
"sync_mode": "incremental",
"cursor_field": ["TimePeriod"],
"destination_sync_mode": "append"
},
{
"stream": {
"name": "budget_summary_report",
"json_schema": {},
"supported_sync_modes": ["incremental", "full_refresh"]
},
"sync_mode": "incremental",
"cursor_field": ["TimePeriod"],
"destination_sync_mode": "append"
},
{
"stream": {
"name": "campaign_performance_report_weekly",
"json_schema": {},
"supported_sync_modes": ["incremental", "full_refresh"]
},
"sync_mode": "incremental",
"cursor_field": ["TimePeriod"],
"destination_sync_mode": "append"
},
{
"stream": {
"name": "campaign_performance_report_monthly",
"json_schema": {},
"supported_sync_modes": ["incremental", "full_refresh"]
},
"sync_mode": "incremental",
"cursor_field": ["TimePeriod"],
"destination_sync_mode": "append"
},
{
"stream": {
"name": "keyword_performance_report_weekly",
"json_schema": {},
"supported_sync_modes": ["incremental", "full_refresh"]
},
"sync_mode": "incremental",
"cursor_field": ["TimePeriod"],
"destination_sync_mode": "append"
},
{
"stream": {
"name": "keyword_performance_report_monthly",
"json_schema": {},
"supported_sync_modes": ["incremental", "full_refresh"]
},
"sync_mode": "incremental",
"cursor_field": ["TimePeriod"],
"destination_sync_mode": "append"
}
]
}
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
{
"accounts": { "type": "all" },
"accounts": { "selection_strategy": "all" },
"user_id": "2222",
"customer_id": "1111",
"developer_token": "asgag4gwag3",
"refresh_token": "as2Ggas23gsa236gasgaskjfhas7i8ygf78as7osa7gy87asg8as7tg6as",
"client_secret": "1234",
"client_id": "123"
"client_id": "123",
"reports_start_date": "2018-11-13",
"hourly_reports": true,
"daily_reports": false,
"weekly_reports": false,
"monthly_reports": true
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
{
"keyword_performance_report_hourly": {
"180278106": {
"TimePeriod": 1627820152
}
},
"budget_summary_report_hourly": {
"180278106": {
"Date": 1627800152
}
},
"ad_performance_report_hourly": {
"180278106": {
"TimePeriod": 1627795152
}
},
"campaign_performance_report_hourly": {
"180278106": {
"TimePeriod": 1727810152
}
}
}
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-bing-ads/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@

from setuptools import find_packages, setup

MAIN_REQUIREMENTS = ["airbyte-cdk", "bingads==13.0.10", "vcrpy==4.1.1", "backoff==1.10.0"]
MAIN_REQUIREMENTS = ["airbyte-cdk", "bingads==13.0.10", "vcrpy==4.1.1", "backoff==1.10.0", "pendulum==2.1.2"]

TEST_REQUIREMENTS = [
"pytest~=6.1",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,17 @@
#

import sys
from datetime import datetime, timedelta
from datetime import datetime, timedelta, timezone
from functools import lru_cache
from typing import Any, Iterator, Mapping, Optional

import backoff
import pendulum
from airbyte_cdk.logger import AirbyteLogger
from bingads.authorization import AuthorizationData, OAuthTokens, OAuthWebAuthCodeGrant
from bingads.service_client import ServiceClient
from bingads.util import errorcode_of_exception
from bingads.v13.reporting.reporting_service_manager import ReportingServiceManager
from suds import WebFault, sudsobject


Expand All @@ -42,11 +44,15 @@ class Client:
# retry on: rate limit errors, auth token expiration, internal errors
# https://docs.microsoft.com/en-us/advertising/guides/services-protocol?view=bingads-13#throttling
# https://docs.microsoft.com/en-us/advertising/guides/operation-error-codes?view=bingads-13
retry_on_codes: Iterator[int] = [117, 207, 4204, 109, 0]
retry_on_codes: Iterator[str] = ["117", "207", "4204", "109", "0"]
max_retries: int = 3
# A backoff factor to apply between attempts after the second try
# {retry_factor} * (2 ** ({number of total retries} - 1))
retry_factor: int = 15
# environments supported by Microsoft Advertising: sandbox, production
environment: str = "production"
# The time interval in milliseconds between two status polling attempts.
report_poll_interval: int = 15000

def __init__(
self,
Expand All @@ -55,6 +61,11 @@ def __init__(
client_secret: str,
client_id: str,
refresh_token: str,
reports_start_date: str,
hourly_reports: bool,
daily_reports: bool,
weekly_reports: bool,
monthly_reports: bool,
**kwargs: Mapping[str, Any],
) -> None:
self.authorization_data: Mapping[str, AuthorizationData] = {}
Expand All @@ -67,8 +78,13 @@ def __init__(
self.refresh_token = refresh_token
self.customer_id = customer_id
self.developer_token = developer_token
self.hourly_reports = hourly_reports
self.daily_reports = daily_reports
self.weekly_reports = weekly_reports
self.monthly_reports = monthly_reports

self.oauth: OAuthTokens = self._get_access_token()
self.reports_start_date = pendulum.parse(reports_start_date).astimezone(tz=timezone.utc)

@lru_cache(maxsize=None)
def _get_auth_data(self, account_id: Optional[str] = None) -> AuthorizationData:
Expand All @@ -95,18 +111,19 @@ def is_token_expiring(self) -> bool:
return False if token_updated_expires_in > self.refresh_token_safe_delta else True

def should_retry(self, error: WebFault) -> bool:
error_code = errorcode_of_exception(error)
error_code = str(errorcode_of_exception(error))
give_up = error_code not in self.retry_on_codes
if give_up:
self.logger.info(f"Giving up for returned error code: {error_code}")
self.logger.error(f"Giving up for returned error code: {error_code}. Error details: {self._get_error_message(error)}")
return give_up

def _get_error_message(self, error: WebFault) -> str:
return str(self.asdict(error.fault)) if hasattr(error, "fault") else str(error)

def log_retry_attempt(self, details: Mapping[str, Any]) -> None:
_, exc, _ = sys.exc_info()
error = self.asdict(exc.fault) if hasattr(exc, "fault") else exc

self.logger.info(
f"Caught retryable error: {str(error)} after {details['tries']} tries. Waiting {details['wait']} seconds then retrying..."
f"Caught retryable error: {self._get_error_message(exc)} after {details['tries']} tries. Waiting {details['wait']} seconds then retrying..."
)

def request(self, **kwargs: Mapping[str, Any]) -> Mapping[str, Any]:
Expand All @@ -122,18 +139,23 @@ def request(self, **kwargs: Mapping[str, Any]) -> Mapping[str, Any]:

def _request(
self,
service_name: str,
service_name: Optional[str],
operation_name: str,
account_id: Optional[str],
params: Mapping[str, Any],
is_report_service: bool = False,
) -> Mapping[str, Any]:
"""
Executes appropriate Service Operation on Bing Ads API
"""
if self.is_token_expiring():
self.oauth = self._get_access_token()

service = self.get_service(service_name=service_name, account_id=account_id)
if is_report_service:
service = self._get_reporting_service(account_id=account_id)
else:
service = self.get_service(service_name=service_name, account_id=account_id)

return getattr(service, operation_name)(**params)

@lru_cache(maxsize=None)
Expand All @@ -146,8 +168,18 @@ def get_service(
service=service_name,
version=self.api_version,
authorization_data=self._get_auth_data(account_id),
# environments supported by Microsoft Advertising: sandbox, production
environment="production",
environment=self.environment,
)

@lru_cache(maxsize=None)
def _get_reporting_service(
self,
account_id: Optional[str] = None,
) -> ServiceClient:
return ReportingServiceManager(
authorization_data=self._get_auth_data(account_id),
poll_interval_in_milliseconds=self.report_poll_interval,
environment=self.environment,
)

@classmethod
Expand Down
Loading

0 comments on commit 70513bc

Please sign in to comment.