Skip to content

Commit

Permalink
Source: Amazon Ads - Improve error handling for attribution_report(s)…
Browse files Browse the repository at this point in the history
… streams (#25885)

Signed-off-by: Sergey Chvalyuk <grubberr@gmail.com>
  • Loading branch information
grubberr committed May 16, 2023
1 parent fc6165e commit 1cbd925
Show file tree
Hide file tree
Showing 11 changed files with 102 additions and 77 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8688,7 +8688,7 @@
"sourceDefinitionId": "c6b0a29e-1da9-4512-9002-7bfd0cba2246",
"name": "Amazon Ads",
"dockerRepository": "airbyte/source-amazon-ads",
"dockerImageTag": "1.0.4",
"dockerImageTag": "1.0.5",
"documentationUrl": "https://docs.airbyte.com/integrations/sources/amazon-ads",
"icon": "amazonads.svg",
"sourceType": "api",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@
- name: Amazon Ads
sourceDefinitionId: c6b0a29e-1da9-4512-9002-7bfd0cba2246
dockerRepository: airbyte/source-amazon-ads
dockerImageTag: 1.0.4
dockerImageTag: 1.0.5
documentationUrl: https://docs.airbyte.com/integrations/sources/amazon-ads
icon: amazonads.svg
sourceType: api
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -852,7 +852,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-amazon-ads:1.0.4"
- dockerImage: "airbyte/source-amazon-ads:1.0.5"
spec:
documentationUrl: "https://docs.airbyte.com/integrations/sources/amazon-ads"
connectionSpecification:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,5 @@ ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]



LABEL io.airbyte.version=1.0.4
LABEL io.airbyte.version=1.0.5
LABEL io.airbyte.name=airbyte/source-amazon-ads
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ data:
connectorSubtype: api
connectorType: source
definitionId: c6b0a29e-1da9-4512-9002-7bfd0cba2246
dockerImageTag: 1.0.4
dockerImageTag: 1.0.5
dockerRepository: airbyte/source-amazon-ads
githubIssueLabel: source-amazon-ads
icon: amazonads.svg
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-amazon-ads/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
"pytest~=6.1",
"pytest-mock~=3.7.0",
"jsonschema~=3.2.0",
"responses~=0.13.3",
"responses~=0.23.1",
"freezegun~=1.2.0",
]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

from typing import Any, Iterable, Mapping, MutableMapping, Optional
from typing import Any, Iterable, List, Mapping, Optional

import pendulum
import requests
from source_amazon_ads.schemas import AttributionReportModel, Profile
from airbyte_cdk.models import SyncMode
from requests.exceptions import HTTPError
from source_amazon_ads.schemas import AttributionReportModel
from source_amazon_ads.streams.common import AmazonAdsStream

BRAND_REFERRAL_BONUS = "brb_bonus_amount"
Expand Down Expand Up @@ -70,40 +72,51 @@ class AttributionReport(AmazonAdsStream):

def __init__(self, config: Mapping[str, Any], *args, **kwargs):
self._start_date = config.get("start_date")
self._req_start_date = ""
self._req_end_date = ""

super().__init__(config, *args, **kwargs)

def _set_dates(self, profile: Profile):
new_start_date = pendulum.now(tz=profile.timezone).subtract(days=1).date()
new_end_date = pendulum.now(tz=profile.timezone).date()

if self._start_date:
new_start_date = max(self._start_date, new_end_date.subtract(days=self.REPORTING_PERIOD))

self._req_start_date = new_start_date.format(self.REPORT_DATE_FORMAT)
self._req_end_date = new_end_date.format(self.REPORT_DATE_FORMAT)

@property
def http_method(self) -> str:
return "POST"

def read_records(self, *args, **kvargs) -> Iterable[Mapping[str, Any]]:
"""
Iterate through self._profiles list and send read all records for each profile.
"""
def path(self, **kvargs) -> str:
return "/attribution/report"

def stream_slices(
self, sync_mode: SyncMode, cursor_field: List[str] = None, stream_state: Mapping[str, Any] = None
) -> Iterable[Optional[Mapping[str, Any]]]:
for profile in self._profiles:
try:
self._set_dates(profile)
self._current_profile_id = profile.profileId
yield from super().read_records(*args, **kvargs)
except Exception as err:
self.logger.info("some error occurred: %s", err)

def request_headers(self, *args, **kvargs) -> MutableMapping[str, Any]:
headers = super().request_headers(*args, **kvargs)
headers["Amazon-Advertising-API-Scope"] = str(self._current_profile_id)
start_date = pendulum.now(tz=profile.timezone).subtract(days=1).date()
end_date = pendulum.now(tz=profile.timezone).date()
if self._start_date:
start_date = max(self._start_date, end_date.subtract(days=self.REPORTING_PERIOD))

yield {
"profileId": profile.profileId,
"startDate": start_date.format(self.REPORT_DATE_FORMAT),
"endDate": end_date.format(self.REPORT_DATE_FORMAT),
}

def read_records(
self,
sync_mode: SyncMode,
cursor_field: List[str] = None,
stream_slice: Mapping[str, Any] = None,
stream_state: Mapping[str, Any] = None,
) -> Iterable[Mapping[str, Any]]:
try:
yield from super().read_records(sync_mode, cursor_field, stream_slice, stream_state)
except HTTPError as e:
if e.response.status_code == 400:
if e.response.json()["message"] == "This profileID is not authorized to use Amazon Attribution":
self.logger.warning(f"This profileID {stream_slice['profileId']} is not authorized to use Amazon Attribution")
return
raise e

def request_headers(
self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None
) -> Mapping[str, Any]:
headers = super().request_headers(stream_state, stream_slice, next_page_token)
headers["Amazon-Advertising-API-Scope"] = str(stream_slice["profileId"])
return headers

def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
Expand All @@ -112,21 +125,19 @@ def next_page_token(self, response: requests.Response) -> Optional[Mapping[str,
if next_page_token:
return {self._next_page_token_field: next_page_token}

def path(self, **kvargs) -> str:
return "/attribution/report"

def request_body_json(
self,
stream_state: Mapping[str, Any],
stream_slice: Mapping[str, Any] = None,
next_page_token: Mapping[str, Any] = None,
) -> Optional[Mapping]:

body = {
"reportType": self.report_type,
"count": self.page_size,
"metrics": self.metrics,
"startDate": self._req_start_date,
"endDate": self._req_end_date,
"startDate": stream_slice["startDate"],
"endDate": stream_slice["endDate"],
self._next_page_token_field: "",
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,8 +177,3 @@ def _internal(report_type: str):
return json.dumps(responses[report_type])

return _internal


@fixture
def attribution_report_bad_response():
return "bad response"
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,18 @@

import json

import pendulum
import pytest
import requests
import responses
from airbyte_cdk.models import SyncMode
from freezegun import freeze_time
from jsonschema import validate
from source_amazon_ads import SourceAmazonAds
from source_amazon_ads.schemas.profile import AccountInfo, Profile
from source_amazon_ads.streams import AttributionReportProducts

from .utils import read_full_refresh


def setup_responses(
Expand All @@ -19,7 +25,7 @@ def setup_responses(
responses.add(
responses.POST,
"https://api.amazon.com/auth/o2/token",
json={"access_token": "alala", "expires_in": 10},
json={"access_token": "access_token", "expires_in": 10},
)
if profiles_response:
responses.add(
Expand All @@ -35,11 +41,6 @@ def setup_responses(
)


def get_all_stream_records(stream):
records = stream.read_records(SyncMode.full_refresh)
return [r for r in records]


def get_stream_by_name(streams, stream_name):
for stream in streams:
if stream.name == stream_name:
Expand Down Expand Up @@ -70,8 +71,8 @@ def test_attribution_report_schema(config, profiles_response, attribution_report
attribution_report_stream = get_stream_by_name(streams, stream_name)
schema = attribution_report_stream.get_json_schema()

profile_records = get_all_stream_records(profile_stream)
attribution_records = get_all_stream_records(attribution_report_stream)
profile_records = list(read_full_refresh(profile_stream))
attribution_records = list(read_full_refresh(attribution_report_stream))
assert len(attribution_records) == len(profile_records) * len(json.loads(attribution_report_response(report_type)).get("reports"))

for record in attribution_records:
Expand Down Expand Up @@ -118,31 +119,40 @@ def _callback(request: requests.PreparedRequest):
callback=_callback,
)

attribution_records = get_all_stream_records(attribution_report_stream)
attribution_records = list(read_full_refresh(attribution_report_stream))

# request should be called 2 times for a single profile
assert len(attribution_records) == 2 * len(attribution_data.get("reports"))


@pytest.mark.parametrize(
("stream_name", "report_type"),
[
("attribution_report_products", "PRODUCTS"),
("attribution_report_performance_adgroup", "PERFORMANCE_ADGROUP"),
("attribution_report_performance_campaign", "PERFORMANCE_CAMPAIGN"),
("attribution_report_performance_creative", "PERFORMANCE_CREATIVE"),
],
)
@responses.activate
def test_attribution_report_no_data(config, profiles_response, attribution_report_bad_response, stream_name, report_type):
# custom start date
config["start_date"] = "2022-09-03"

setup_responses(profiles_response=profiles_response, attribution_report_response=attribution_report_bad_response)

source = SourceAmazonAds()
streams = source.streams(config)

attribution_report_stream = get_stream_by_name(streams, stream_name)
attribution_records = get_all_stream_records(attribution_report_stream)
assert len(attribution_records) == 0
@freeze_time("2022-05-15 12:00:00")
def test_attribution_report_slices(config):

profiles = [
Profile(profileId=1, timezone="America/Los_Angeles", accountInfo=AccountInfo(id="1", type="seller", marketplaceStringId="")),
Profile(profileId=2, timezone="America/Los_Angeles", accountInfo=AccountInfo(id="1", type="seller", marketplaceStringId="")),
]

stream = AttributionReportProducts(config, profiles=profiles)
slices = list(stream.stream_slices(sync_mode=SyncMode.full_refresh))

assert slices == [
{'profileId': 1, 'startDate': '20220514', 'endDate': '20220515'},
{'profileId': 2, 'startDate': '20220514', 'endDate': '20220515'}
]

config["start_date"] = pendulum.from_format("2022-05-01", "YYYY-MM-DD").date()
stream = AttributionReportProducts(config, profiles=profiles)
slices = list(stream.stream_slices(sync_mode=SyncMode.full_refresh))
assert slices == [
{'profileId': 1, 'startDate': '20220501', 'endDate': '20220515'},
{'profileId': 2, 'startDate': '20220501', 'endDate': '20220515'}
]

config["start_date"] = pendulum.from_format("2022-01-01", "YYYY-MM-DD").date()
stream = AttributionReportProducts(config, profiles=profiles)
slices = list(stream.stream_slices(sync_mode=SyncMode.full_refresh))
assert slices == [
{'profileId': 1, 'startDate': '20220214', 'endDate': '20220515'},
{'profileId': 2, 'startDate': '20220214', 'endDate': '20220515'}
]
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,14 @@ def read_incremental(stream_instance: Stream, stream_state: MutableMapping[str,
stream_state.update(stream_instance.state)


def read_full_refresh(stream_instance: Stream):
slices = stream_instance.stream_slices(sync_mode=SyncMode.full_refresh)
for _slice in slices:
records = stream_instance.read_records(stream_slice=_slice, sync_mode=SyncMode.full_refresh)
for record in records:
yield record


def command_check(source: Source, config):
logger = mock.MagicMock()
connector_config, _ = split_config(config)
Expand Down
3 changes: 2 additions & 1 deletion docs/integrations/sources/amazon-ads.md
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,8 @@ Information about expected report generation waiting time you may find [here](ht

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:---------------------------------------------------------|:----------------------------------------------------------------------------------------------------------------|
| 1.0.4 | 2023-05-04 | [00000](https://github.com/airbytehq/airbyte/pull/00000) | Add availability strategy for basic streams (not including report streams) |
| 1.0.5 | 2023-05-08 | [25885](https://github.com/airbytehq/airbyte/pull/25885) | Improve error handling for attribution_report(s) streams |
| 1.0.4 | 2023-05-04 | [25792](https://github.com/airbytehq/airbyte/pull/25792) | Add availability strategy for basic streams (not including report streams) |
| 1.0.3 | 2023-04-13 | [25146](https://github.com/airbytehq/airbyte/pull/25146) | Validate pk for reports when expected pk is not returned |
| 1.0.2 | 2023-02-03 | [22355](https://github.com/airbytehq/airbyte/pull/22355) | Migrate `products_report` stream to API v3 |
| 1.0.1 | 2022-11-01 | [18677](https://github.com/airbytehq/airbyte/pull/18677) | Add optional config report_record_types |
Expand Down

0 comments on commit 1cbd925

Please sign in to comment.