Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🎉Source Bing Ads: Add Report streams #5750

Merged
merged 20 commits into from
Sep 9, 2021
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
0cf147b
initial report streams
yaroslav-dudar Aug 18, 2021
6df7000
refactoring; added performance streams; start_date param
yaroslav-dudar Aug 23, 2021
ca549ef
basic incremental sync for reports
yaroslav-dudar Aug 24, 2021
4648166
Merge branch 'master' of github.com:airbytehq/airbyte into yaroslav-d…
yaroslav-dudar Aug 25, 2021
04cba0b
added cursor calculation based of aggregated report data; refactoring
yaroslav-dudar Aug 25, 2021
9588051
updated state management; added schemas for reports
yaroslav-dudar Aug 26, 2021
cbea441
upd PK; fix acceptance tests; improve error logging
yaroslav-dudar Aug 28, 2021
0e6fc48
added unit tests
yaroslav-dudar Aug 29, 2021
986a505
upd spec; added unit tests; version bump
yaroslav-dudar Aug 31, 2021
d31e2c0
Merge branch 'master' of github.com:airbytehq/airbyte into yaroslav-d…
yaroslav-dudar Aug 31, 2021
d8b0923
minor fixes; refactoring
yaroslav-dudar Sep 1, 2021
94beee7
Merge branch 'master' of github.com:airbytehq/airbyte into yaroslav-d…
yaroslav-dudar Sep 1, 2021
5f17a53
typing fix
yaroslav-dudar Sep 2, 2021
66d69d5
added bootstrap
yaroslav-dudar Sep 6, 2021
96bac88
Merge branch 'master' of github.com:airbytehq/airbyte into yaroslav-d…
yaroslav-dudar Sep 7, 2021
071ade4
upd spec; added report per agg
yaroslav-dudar Sep 7, 2021
e992852
modified schema fetching and streams selection
yaroslav-dudar Sep 7, 2021
1af74dc
upd docs; fixed tests
yaroslav-dudar Sep 8, 2021
e1c17c0
added custom setuptools version
yaroslav-dudar Sep 9, 2021
f6c7383
upd primary_keys
yaroslav-dudar Sep 9, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
"sourceDefinitionId": "47f25999-dd5e-4636-8c39-e7cea2453331",
"name": "Bing Ads",
"dockerRepository": "airbyte/source-bing-ads",
"dockerImageTag": "0.1.0",
"dockerImageTag": "0.1.1",
"documentationUrl": "https://docs.airbyte.io/integrations/sources/bing-ads"
}
Original file line number Diff line number Diff line change
Expand Up @@ -441,7 +441,7 @@
- sourceDefinitionId: 47f25999-dd5e-4636-8c39-e7cea2453331
name: Bing Ads
dockerRepository: airbyte/source-bing-ads
dockerImageTag: 0.1.0
dockerImageTag: 0.1.1
documentationUrl: https://docs.airbyte.io/integrations/sources/bing-ads
- sourceDefinitionId: 59c5501b-9f95-411e-9269-7143c939adbd
name: BigCommerce
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-bing-ads/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,5 @@ RUN pip install .
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.1.0
LABEL io.airbyte.version=0.1.1
LABEL io.airbyte.name=airbyte/source-bing-ads
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ tests:
basic_read:
- config_path: "secrets/config.json"
configured_catalog_path: "integration_tests/configured_catalog.json"
# connector doesn't have incremental streams
# incremental test doesn't work if a single stream has multiple states
#incremental:
# - config_path: "secrets/config.json"
# configured_catalog_path: "integration_tests/configured_catalog.json"
Expand Down
44 changes: 44 additions & 0 deletions airbyte-integrations/connectors/source-bing-ads/bootstrap.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@

## Core streams

Bing Ads is a SOAP based API. Connector is implemented with [SDK](https://github.com/BingAds/BingAds-Python-SDK) library

Connector has such core streams, and all of them support full refresh only:
* [Account](https://docs.microsoft.com/en-us/advertising/customer-management-service/advertiseraccount?view=bingads-13)
* [Campaign](https://docs.microsoft.com/en-us/advertising/campaign-management-service/campaign?view=bingads-13)
* [AdGroup](https://docs.microsoft.com/en-us/advertising/campaign-management-service/getadgroupsbycampaignid?view=bingads-13)
* [Ad](https://docs.microsoft.com/en-us/advertising/campaign-management-service/getadsbyadgroupid?view=bingads-13)


## Report streams

Connector also has report streams, which support incremental sync.

* [AccountPerformanceReport](https://docs.microsoft.com/en-us/advertising/reporting-service/accountperformancereportrequest?view=bingads-13)
* [AdPerformanceReport](https://docs.microsoft.com/en-us/advertising/reporting-service/adperformancereportrequest?view=bingads-13)
* [AdGroupPerformanceReport](https://docs.microsoft.com/en-us/advertising/reporting-service/adgroupperformancereportrequest?view=bingads-13)
* [CampaignPerformanceReport](https://docs.microsoft.com/en-us/advertising/reporting-service/campaignperformancereportrequest?view=bingads-13)
* [BudgetSummaryReport](https://docs.microsoft.com/en-us/advertising/reporting-service/budgetsummaryreportrequest?view=bingads-13)
* [KeywordPerformanceReport](https://docs.microsoft.com/en-us/advertising/reporting-service/keywordperformancereportrequest?view=bingads-13)

To be able to pull report data you need to generate 2 separate requests.

* [First](https://docs.microsoft.com/en-us/advertising/reporting-service/submitgeneratereport?view=bingads-13) - to request appropriate report

* [Second](https://docs.microsoft.com/en-us/advertising/reporting-service/pollgeneratereport?view=bingads-13) - to poll acatual data. Report download timeout is 5 min

Initially all fields in report streams have string values, connector uses `reports.REPORT_FIELD_TYPES` collection to transform values to numerical fields if possible

Connector uses `reports_start_date` config for initial reports sync and current date as an end data.

Connector has `report_aggregation` config which allows to select aggregation time range. Available options: Daily, Hourly, Weekly, Monthly

## Request caching

Based on [library](https://vcrpy.readthedocs.io/en/latest/)

Connector uses caching for these streams:

* Account
* Campaign
* AdGroup
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,66 @@
},
"sync_mode": "full_refresh",
"destination_sync_mode": "append"
},
{
"stream": {
"name": "campaign_performance_report",
"json_schema": {},
"supported_sync_modes": ["incremental", "full_refresh"]
},
"sync_mode": "incremental",
"cursor_field": ["TimePeriod"],
"destination_sync_mode": "append"
},
{
"stream": {
"name": "budget_summary_report",
"json_schema": {},
"supported_sync_modes": ["incremental", "full_refresh"]
},
"sync_mode": "incremental",
"cursor_field": ["Date"],
"destination_sync_mode": "append"
},
{
"stream": {
"name": "ad_performance_report",
"json_schema": {},
"supported_sync_modes": ["incremental", "full_refresh"]
},
"sync_mode": "incremental",
"cursor_field": ["TimePeriod"],
"destination_sync_mode": "append"
},
{
"stream": {
"name": "ad_group_performance_report",
"json_schema": {},
"supported_sync_modes": ["incremental", "full_refresh"]
},
"sync_mode": "incremental",
"cursor_field": ["TimePeriod"],
"destination_sync_mode": "append"
},
{
"stream": {
"name": "account_performance_report",
"json_schema": {},
"supported_sync_modes": ["incremental", "full_refresh"]
},
"sync_mode": "incremental",
"cursor_field": ["TimePeriod"],
"destination_sync_mode": "append"
},
{
"stream": {
"name": "keyword_performance_report",
"json_schema": {},
"supported_sync_modes": ["incremental", "full_refresh"]
},
"sync_mode": "incremental",
"cursor_field": ["TimePeriod"],
"destination_sync_mode": "append"
}
]
}
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
{
"accounts": { "type": "all" },
"accounts": { "selection_strategy": "all" },
"user_id": "2222",
"customer_id": "1111",
"developer_token": "asgag4gwag3",
"refresh_token": "as2Ggas23gsa236gasgaskjfhas7i8ygf78as7osa7gy87asg8as7tg6as",
"client_secret": "1234",
"client_id": "123"
"client_id": "123",
"reports_start_date": "2018-11-13",
"report_aggregation": { "value": "Monthly" }
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
{
"keyword_performance_report": {
"180278106": {
"TimePeriod": 1627820152
}
},
"budget_summary_report": {
"180278106": {
"Date": 1627800152
}
},
"ad_performance_report": {
"180278106": {
"TimePeriod": 1627795152
}
},
"campaign_performance_report": {
"180278106": {
"TimePeriod": 1727810152
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,17 @@
#

import sys
from datetime import datetime, timedelta
from datetime import datetime, timedelta, timezone
from functools import lru_cache
from typing import Any, Iterator, Mapping, Optional

import backoff
import pendulum
from airbyte_cdk.logger import AirbyteLogger
from bingads.authorization import AuthorizationData, OAuthTokens, OAuthWebAuthCodeGrant
from bingads.service_client import ServiceClient
from bingads.util import errorcode_of_exception
from bingads.v13.reporting.reporting_service_manager import ReportingServiceManager
from suds import WebFault, sudsobject


Expand All @@ -42,11 +44,15 @@ class Client:
# retry on: rate limit errors, auth token expiration, internal errors
# https://docs.microsoft.com/en-us/advertising/guides/services-protocol?view=bingads-13#throttling
# https://docs.microsoft.com/en-us/advertising/guides/operation-error-codes?view=bingads-13
retry_on_codes: Iterator[int] = [117, 207, 4204, 109, 0]
retry_on_codes: Iterator[str] = ["117", "207", "4204", "109", "0"]
max_retries: int = 3
# A backoff factor to apply between attempts after the second try
# {retry_factor} * (2 ** ({number of total retries} - 1))
retry_factor: int = 15
# environments supported by Microsoft Advertising: sandbox, production
environment: str = "production"
# The time interval in milliseconds between two status polling attempts.
report_poll_interval: int = 15000

def __init__(
self,
Expand All @@ -55,6 +61,8 @@ def __init__(
client_secret: str,
client_id: str,
refresh_token: str,
reports_start_date: str,
report_aggregation: Mapping[str, str],
**kwargs: Mapping[str, Any],
) -> None:
self.authorization_data: Mapping[str, AuthorizationData] = {}
Expand All @@ -67,8 +75,10 @@ def __init__(
self.refresh_token = refresh_token
self.customer_id = customer_id
self.developer_token = developer_token
self.report_aggregation = report_aggregation["value"]

self.oauth: OAuthTokens = self._get_access_token()
self.reports_start_date = pendulum.parse(reports_start_date).astimezone(tz=timezone.utc)

@lru_cache(maxsize=None)
def _get_auth_data(self, account_id: Optional[str] = None) -> AuthorizationData:
Expand All @@ -95,18 +105,19 @@ def is_token_expiring(self) -> bool:
return False if token_updated_expires_in > self.refresh_token_safe_delta else True

def should_retry(self, error: WebFault) -> bool:
error_code = errorcode_of_exception(error)
error_code = str(errorcode_of_exception(error))
give_up = error_code not in self.retry_on_codes
if give_up:
self.logger.info(f"Giving up for returned error code: {error_code}")
self.logger.error(f"Giving up for returned error code: {error_code}. Error details: {self._get_error_message(error)}")
return give_up

def _get_error_message(self, error: WebFault) -> str:
return str(self.asdict(error.fault)) if hasattr(error, "fault") else str(error)

def log_retry_attempt(self, details: Mapping[str, Any]) -> None:
_, exc, _ = sys.exc_info()
error = self.asdict(exc.fault) if hasattr(exc, "fault") else exc

self.logger.info(
f"Caught retryable error: {str(error)} after {details['tries']} tries. Waiting {details['wait']} seconds then retrying..."
f"Caught retryable error: {self._get_error_message(exc)} after {details['tries']} tries. Waiting {details['wait']} seconds then retrying..."
)

def request(self, **kwargs: Mapping[str, Any]) -> Mapping[str, Any]:
Expand All @@ -122,18 +133,23 @@ def request(self, **kwargs: Mapping[str, Any]) -> Mapping[str, Any]:

def _request(
self,
service_name: str,
service_name: Optional[str],
operation_name: str,
account_id: Optional[str],
params: Mapping[str, Any],
is_report_service: bool = False,
) -> Mapping[str, Any]:
"""
Executes appropriate Service Operation on Bing Ads API
"""
if self.is_token_expiring():
self.oauth = self._get_access_token()

service = self.get_service(service_name=service_name, account_id=account_id)
if is_report_service:
service = self._get_reporting_service(account_id=account_id)
else:
service = self.get_service(service_name=service_name, account_id=account_id)

return getattr(service, operation_name)(**params)

@lru_cache(maxsize=None)
Expand All @@ -146,8 +162,18 @@ def get_service(
service=service_name,
version=self.api_version,
authorization_data=self._get_auth_data(account_id),
# environments supported by Microsoft Advertising: sandbox, production
environment="production",
environment=self.environment,
)

@lru_cache(maxsize=None)
def _get_reporting_service(
self,
account_id: Optional[str] = None,
) -> ServiceClient:
return ReportingServiceManager(
authorization_data=self._get_auth_data(account_id),
poll_interval_in_milliseconds=self.report_poll_interval,
environment=self.environment,
)

@classmethod
Expand Down
Loading