Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🎉 Source Bing Ads: implement OAuth2.0 support, remove redirect_uri, change Account ID to User ID in spec #12937

Merged
merged 21 commits into from
May 23, 2022
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@
documentationUrl: https://docs.airbyte.io/integrations/sources/bing-ads
icon: bingads.svg
sourceType: api
releaseStage: alpha
releaseStage: beta
bazarnov marked this conversation as resolved.
Show resolved Hide resolved
- name: Braintree
sourceDefinitionId: 63cea06f-1c75-458d-88fe-ad48c7cb27fd
dockerRepository: airbyte/source-braintree
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-bing-ads/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,5 @@ RUN pip install .
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.1.6
LABEL io.airbyte.version=0.1.7
LABEL io.airbyte.name=airbyte/source-bing-ads
3 changes: 2 additions & 1 deletion airbyte-integrations/connectors/source-bing-ads/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,8 @@ Customize `acceptance-test-config.yml` file to configure tests. See [Source Acce
If your connector requires to create or destroy resources for use during acceptance tests create fixtures for it and place them inside integration_tests/acceptance.py.
To run your integration tests with acceptance tests, from the connector root, run
```
python -m pytest integration_tests -p integration_tests.acceptance
docker build . --no-cache -t airbyte/source-bing-ads:dev \
&& python -m pytest -p source_acceptance_test.plugin
```
To run your integration tests with docker

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ tests:
spec:
- spec_path: "source_bing_ads/spec.json"
connection:
- config_path: "secrets/config_old.json"
girarda marked this conversation as resolved.
Show resolved Hide resolved
status: "succeed"
- config_path: "secrets/config.json"
status: "succeed"
- config_path: "integration_tests/invalid_config.json"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,15 @@
{
"accounts": { "selection_strategy": "all" },
"user_id": "2222",
"customer_id": "1111",
"developer_token": "asgag4gwag3",
"refresh_token": "as2Ggas23gsa236gasgaskjfhas7i8ygf78as7osa7gy87asg8as7tg6as",
"client_secret": "1234",
"client_id": "123",
"tenant_id": "common",
"redirect_uri": "",
"credentials": {
"auth_method": "oauth2.0",
"developer_token": "asgag4gwag3",
"refresh_token": "as2Ggas23gsa236gasgaskjfhas7i8ygf78as7osa7gy87asg8as7tg6as",
"client_secret": "1234",
"client_id": "123"
},
"reports_start_date": "2018-11-13",
"hourly_reports": true,
"hourly_reports": false,
"daily_reports": false,
"weekly_reports": false,
"monthly_reports": true
"weekly_reports": true,
"monthly_reports": true,
"tenant_id": "common"
}
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-bing-ads/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

from setuptools import find_packages, setup

MAIN_REQUIREMENTS = ["airbyte-cdk", "bingads~=13.0.11", "vcrpy==4.1.1", "backoff==1.10.0", "pendulum==2.1.2"]
MAIN_REQUIREMENTS = ["airbyte-cdk", "bingads~=13.0.13", "vcrpy==4.1.1", "backoff==1.10.0", "pendulum==2.1.2"]

TEST_REQUIREMENTS = [
"pytest~=6.1",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,44 +36,64 @@ class Client:

def __init__(
self,
developer_token: str,
customer_id: str,
client_secret: str,
client_id: str,
tenant_id: str,
redirect_uri: str,
refresh_token: str,
reports_start_date: str,
hourly_reports: bool,
daily_reports: bool,
weekly_reports: bool,
monthly_reports: bool,
credentials: dict = None,
client_id: str = None, # deprecated
client_secret: str = None, # deprecated
developer_token: str = None, # deprecated
refresh_token: str = None, # deprecated
**kwargs: Mapping[str, Any],
) -> None:
self.authorization_data: Mapping[str, AuthorizationData] = {}
self.authentication = OAuthWebAuthCodeGrant(
client_id,
client_secret,
redirect_uri,
tenant=tenant_id,
)

self.refresh_token = refresh_token
self.customer_id = customer_id
self.developer_token = developer_token
self.refresh_token = credentials["refresh_token"] if credentials else refresh_token
self.developer_token = credentials["developer_token"] if credentials else developer_token
self.hourly_reports = hourly_reports
self.daily_reports = daily_reports
self.weekly_reports = weekly_reports
self.monthly_reports = monthly_reports

self.client_id = client_id # deprecated
self.client_secret = client_secret # deprecated

self.authentication = self._get_auth_client(credentials, tenant_id)
self.oauth: OAuthTokens = self._get_access_token()
self.reports_start_date = pendulum.parse(reports_start_date).astimezone(tz=timezone.utc)

def _get_auth_client(self, credentials: dict, tenant_id: str) -> OAuthWebAuthCodeGrant:

# support the deprecated old input configuration
if self.client_id or self.client_secret:
auth_creds = {
"client_id": self.client_id,
"client_secret": self.client_secret,
"redirection_uri": "", # should be empty string
"tenant": tenant_id,
}
return OAuthWebAuthCodeGrant(**auth_creds)

# https://github.com/BingAds/BingAds-Python-SDK/blob/e7b5a618e87a43d0a5e2c79d9aa4626e208797bd/bingads/authorization.py#L390
auth_creds = {
"client_id": credentials["client_id"],
"client_secret": None,
"redirection_uri": "", # should be empty string
"tenant": tenant_id,
}
if credentials["auth_method"] == "private_client":
# the `client_secret` should be provided for `non-public clients` ONLY
# https://docs.microsoft.com/en-us/advertising/guides/authentication-oauth-get-tokens?view=bingads-13#request-accesstoken
auth_creds["client_secret"] = credentials["client_secret"]
return OAuthWebAuthCodeGrant(**auth_creds)

@lru_cache(maxsize=None)
def _get_auth_data(self, account_id: Optional[str] = None) -> AuthorizationData:
def _get_auth_data(self, customer_id: str = None, account_id: Optional[str] = None) -> AuthorizationData:
return AuthorizationData(
account_id=account_id,
customer_id=self.customer_id,
customer_id=customer_id,
developer_token=self.developer_token,
authentication=self.authentication,
)
Expand Down Expand Up @@ -124,6 +144,7 @@ def _request(
self,
service_name: Optional[str],
operation_name: str,
customer_id: Optional[str],
account_id: Optional[str],
params: Mapping[str, Any],
is_report_service: bool = False,
Expand All @@ -135,32 +156,34 @@ def _request(
self.oauth = self._get_access_token()

if is_report_service:
service = self._get_reporting_service(account_id=account_id)
service = self._get_reporting_service(customer_id=customer_id, account_id=account_id)
else:
service = self.get_service(service_name=service_name, account_id=account_id)
service = self.get_service(service_name=service_name, customer_id=customer_id, account_id=account_id)

return getattr(service, operation_name)(**params)

@lru_cache(maxsize=None)
def get_service(
self,
service_name: str,
customer_id: str = None,
account_id: Optional[str] = None,
) -> ServiceClient:
return ServiceClient(
service=service_name,
version=self.api_version,
authorization_data=self._get_auth_data(account_id),
authorization_data=self._get_auth_data(customer_id, account_id),
environment=self.environment,
)

@lru_cache(maxsize=None)
def _get_reporting_service(
self,
customer_id: Optional[str] = None,
account_id: Optional[str] = None,
) -> ServiceClient:
return ReportingServiceManager(
authorization_data=self._get_auth_data(account_id),
authorization_data=self._get_auth_data(customer_id, account_id),
poll_interval_in_milliseconds=self.report_poll_interval,
environment=self.environment,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,9 +170,10 @@ def get_updated_state(
)
return current_stream_state

def send_request(self, params: Mapping[str, Any], account_id: str) -> _RowReport:
def send_request(self, params: Mapping[str, Any], customer_id: str, account_id: str) -> _RowReport:
request_kwargs = {
"service_name": None,
"customer_id": customer_id,
"account_id": account_id,
"operation_name": self.operation_name,
"is_report_service": True,
Expand Down Expand Up @@ -262,6 +263,6 @@ def stream_slices(
**kwargs: Mapping[str, Any],
) -> Iterable[Optional[Mapping[str, Any]]]:
for account in source_bing_ads.source.Accounts(self.client, self.config).read_records(SyncMode.full_refresh):
yield {"account_id": account["Id"]}
yield {"account_id": account["Id"], "customer_id": account["ParentCustomerId"]}

yield from []
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@
"type": ["null", "number"]
},
"PauseReason": {
"type": ["null", "string"]
"type": ["null", "number"]
bazarnov marked this conversation as resolved.
Show resolved Hide resolved
},
"PaymentMethodId": {
"type": ["null", "number"]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
from airbyte_cdk.models import SyncMode
from airbyte_cdk.sources import AbstractSource
from airbyte_cdk.sources.streams import Stream
from bingads.service_client import ServiceClient
from bingads.v13.reporting.reporting_service_manager import ReportingServiceManager
from source_bing_ads.cache import VcrCache
from source_bing_ads.client import Client
from source_bing_ads.reports import ReportsMixin
Expand Down Expand Up @@ -61,6 +63,14 @@ def additional_fields(self) -> Optional[str]:
"""
pass

@property
def _service(self) -> Union[ServiceClient, ReportingServiceManager]:
return self.client.get_service(service_name=self.service_name)

@property
def _user_id(self) -> int:
return self._service.GetUser().User.Id

def next_page_token(self, response: sudsobject.Object, **kwargs: Mapping[str, Any]) -> Optional[Mapping[str, Any]]:
"""
Default method for streams that don't support pagination
Expand All @@ -73,24 +83,20 @@ def parse_response(self, response: sudsobject.Object, **kwargs) -> Iterable[Mapp

yield from []

def send_request(self, params: Mapping[str, Any], account_id: str = None) -> Mapping[str, Any]:
def send_request(self, params: Mapping[str, Any], customer_id: str, account_id: str = None) -> Mapping[str, Any]:
request_kwargs = {
"service_name": self.service_name,
"customer_id": customer_id,
"account_id": account_id,
"operation_name": self.operation_name,
"params": params,
}
if not self.use_cache:
return self.client.request(**request_kwargs)

with CACHE.use_cassette():
return self.client.request(**request_kwargs)

def get_account_id(self, stream_slice: Mapping[str, Any] = None) -> Optional[str]:
"""
Fetches account_id from slice object
"""
return str(stream_slice.get("account_id")) if stream_slice else None
request = self.client.request(**request_kwargs)
if self.use_cache:
with CACHE.use_cassette():
return request
else:
return request

def read_records(
self,
Expand All @@ -101,14 +107,17 @@ def read_records(
) -> Iterable[Mapping[str, Any]]:
stream_state = stream_state or {}
next_page_token = None
account_id = self.get_account_id(stream_slice)
account_id = str(stream_slice.get("account_id")) if stream_slice else None
customer_id = str(stream_slice.get("customer_id")) if stream_slice else None

while True:
params = self.request_params(
stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token, account_id=account_id
stream_state=stream_state,
stream_slice=stream_slice,
next_page_token=next_page_token,
account_id=account_id,
)

response = self.send_request(params, account_id=account_id)
response = self.send_request(params, customer_id=customer_id, account_id=account_id)
for record in self.parse_response(response):
yield record

Expand Down Expand Up @@ -154,21 +163,12 @@ def request_params(
{
"Field": "UserId",
"Operator": "Equals",
"Value": self.config["user_id"],
"Value": self._user_id,
}
]
}

if self.config["accounts"]["selection_strategy"] == "subset":
predicates["Predicate"].append(
{
"Field": "AccountId",
"Operator": "In",
"Value": ",".join(self.config["accounts"]["ids"]),
}
)

paging = self.client.get_service(service_name=self.service_name).factory.create("ns5:Paging")
paging = self._service.factory.create("ns5:Paging")
paging.Index = next_page_token or 0
paging.Size = self.page_size_limit
return {
Expand Down Expand Up @@ -213,7 +213,7 @@ def stream_slices(
**kwargs: Mapping[str, Any],
) -> Iterable[Optional[Mapping[str, Any]]]:
for account in Accounts(self.client, self.config).read_records(SyncMode.full_refresh):
yield {"account_id": account["Id"]}
yield {"account_id": account["Id"], "customer_id": account["ParentCustomerId"]}

yield from []

Expand Down Expand Up @@ -247,8 +247,10 @@ def stream_slices(
) -> Iterable[Optional[Mapping[str, Any]]]:
campaigns = Campaigns(self.client, self.config)
for account in Accounts(self.client, self.config).read_records(SyncMode.full_refresh):
for campaign in campaigns.read_records(sync_mode=SyncMode.full_refresh, stream_slice={"account_id": account["Id"]}):
yield {"campaign_id": campaign["Id"], "account_id": account["Id"]}
for campaign in campaigns.read_records(
sync_mode=SyncMode.full_refresh, stream_slice={"account_id": account["Id"], "customer_id": account["ParentCustomerId"]}
):
yield {"campaign_id": campaign["Id"], "account_id": account["Id"], "customer_id": account["ParentCustomerId"]}

yield from []

Expand Down Expand Up @@ -294,7 +296,7 @@ def stream_slices(
ad_groups = AdGroups(self.client, self.config)
for slice in ad_groups.stream_slices(sync_mode=SyncMode.full_refresh):
for ad_group in ad_groups.read_records(sync_mode=SyncMode.full_refresh, stream_slice=slice):
yield {"ad_group_id": ad_group["Id"], "account_id": slice["account_id"]}
yield {"ad_group_id": ad_group["Id"], "account_id": slice["account_id"], "customer_id": slice["customer_id"]}
yield from []


Expand Down Expand Up @@ -570,21 +572,13 @@ def check_connection(self, logger: AirbyteLogger, config: Mapping[str, Any]) ->
try:
client = Client(**config)
account_ids = {str(account["Id"]) for account in Accounts(client, config).read_records(SyncMode.full_refresh)}

if config["accounts"]["selection_strategy"] == "subset":
config_account_ids = set(config["accounts"]["ids"])
if not config_account_ids.issubset(account_ids):
raise Exception(f"Accounts with ids: {config_account_ids.difference(account_ids)} not found on this user.")
elif config["accounts"]["selection_strategy"] == "all":
if not account_ids:
raise Exception("You don't have accounts assigned to this user.")
if account_ids:
return True, None
else:
raise Exception("Incorrect account selection strategy.")
raise Exception("You don't have accounts assigned to this user.")
except Exception as error:
return False, error

return True, None

def get_report_streams(self, aggregation_type: str) -> List[Stream]:
return [
globals()[f"AccountPerformanceReport{aggregation_type}"],
Expand Down
Loading