Skip to content

Commit

Permalink
🎉 Source Amazon Ads : Add attribution reports (#16342)
Browse files Browse the repository at this point in the history
* use data field for json response

* add attribution reports

* update changelog

* add atrribution report integration test

* clean up expected_records
add empty streams to acceptance test config

* handle exception for profile

* update tests

Co-authored-by: Yiyang Li <yiyangli2010@gmail.com>
Co-authored-by: Harshith Mullapudi <harshithmullapudi@gmail.com>
Co-authored-by: Sajarin <sajarindider@gmail.com>
  • Loading branch information
4 people committed Oct 3, 2022
1 parent 06fafa8 commit 454f84d
Show file tree
Hide file tree
Showing 13 changed files with 555 additions and 131 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,19 @@ tests:
basic_read:
- config_path: "secrets/config.json"
configured_catalog_path: "integration_tests/configured_catalog.json"
empty_streams: ["sponsored_brands_campaigns", "sponsored_brands_ad_groups", "sponsored_brands_keywords"]
expect_records:
path: "integration_tests/expected_records.txt"
extra_fields: no
exact_order: no
extra_records: no
empty_streams:
[
"sponsored_brands_ad_groups",
"sponsored_brands_campaigns",
"sponsored_brands_keywords",
"sponsored_product_keywords",
"sponsored_product_negative_keywords",
"sponsored_product_targetings",
"attribution_report_performance_creative",
"attribution_report_performance_adgroup",
"attribution_report_products",
"attribution_report_performance_campaign",
]
- config_path: "secrets/config_report.json"
configured_catalog_path: "integration_tests/configured_catalog_report.json"
timeout_seconds: 2400
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,42 @@
},
"sync_mode": "full_refresh",
"destination_sync_mode": "overwrite"
},
{
"stream": {
"name": "attribution_report_products",
"json_schema": {},
"supported_sync_modes": ["full_refresh"]
},
"sync_mode": "full_refresh",
"destination_sync_mode": "overwrite"
},
{
"stream": {
"name": "attribution_report_performance_adgroup",
"json_schema": {},
"supported_sync_modes": ["full_refresh"]
},
"sync_mode": "full_refresh",
"destination_sync_mode": "overwrite"
},
{
"stream": {
"name": "attribution_report_performance_campaign",
"json_schema": {},
"supported_sync_modes": ["full_refresh"]
},
"sync_mode": "full_refresh",
"destination_sync_mode": "overwrite"
},
{
"stream": {
"name": "attribution_report_performance_creative",
"json_schema": {},
"supported_sync_modes": ["full_refresh"]
},
"sync_mode": "full_refresh",
"destination_sync_mode": "overwrite"
}
]
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
#
from .attribution_report import AttributionReportModel
from .common import CatalogModel, Keywords, MetricsReport, NegativeKeywords
from .profile import Profile
from .sponsored_brands import BrandsAdGroup, BrandsCampaign
Expand All @@ -43,4 +44,5 @@
"ProductCampaign",
"ProductTargeting",
"Profile",
"AttributionReportModel",
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
#
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#

from typing import List

from .common import CatalogModel


class Report(CatalogModel):
date: str
brandName: str
marketplace: str
campaignId: str
productAsin: str
productConversionType: str
advertiserName: str
adGroupId: str
creativeId: str
productName: str
productCategory: str
productSubcategory: str
productGroup: str
publisher: str


class AttributionReportModel(CatalogModel):
reports: List[Report]
size: int
cursorId: str
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@

from .schemas import Profile
from .streams import (
AttributionReportPerformanceAdgroup,
AttributionReportPerformanceCampaign,
AttributionReportPerformanceCreative,
AttributionReportProducts,
Profiles,
SponsoredBrandsAdGroups,
SponsoredBrandsCampaigns,
Expand Down Expand Up @@ -102,6 +106,10 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:
SponsoredBrandsKeywords,
SponsoredBrandsReportStream,
SponsoredBrandsVideoReportStream,
AttributionReportPerformanceAdgroup,
AttributionReportPerformanceCampaign,
AttributionReportPerformanceCreative,
AttributionReportProducts,
]
return [profiles_stream, *[stream_class(**stream_args) for stream_class in non_profile_stream_classes]]

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
#
# Copyright (c) 2021 Airbyte, Inc., all rights reserved.
#
from .attribution_report import (
AttributionReportPerformanceAdgroup,
AttributionReportPerformanceCampaign,
AttributionReportPerformanceCreative,
AttributionReportProducts,
)
from .profiles import Profiles
from .report_streams import (
SponsoredBrandsReportStream,
Expand Down Expand Up @@ -38,4 +44,8 @@
"SponsoredProductsReportStream",
"SponsoredBrandsReportStream",
"SponsoredBrandsVideoReportStream",
"AttributionReportPerformanceAdgroup",
"AttributionReportPerformanceCampaign",
"AttributionReportPerformanceCreative",
"AttributionReportProducts",
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
#
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#

from typing import Any, Iterable, Mapping, MutableMapping, Optional

import pendulum
import requests
from source_amazon_ads.schemas import AttributionReportModel, Profile
from source_amazon_ads.streams.common import AmazonAdsStream

BRAND_REFERRAL_BONUS = "brb_bonus_amount"

METRICS_MAP = {
"PERFORMANCE": [
"Click-throughs",
"attributedDetailPageViewsClicks14d",
"attributedAddToCartClicks14d",
"attributedPurchases14d",
"unitsSold14d",
"attributedSales14d",
"attributedTotalDetailPageViewsClicks14d",
"attributedTotalAddToCartClicks14d",
"attributedTotalPurchases14d",
"totalUnitsSold14d",
"totalAttributedSales14d",
],
"PRODUCTS": [
"attributedDetailPageViewsClicks14d",
"attributedAddToCartClicks14d",
"attributedPurchases14d",
"unitsSold14d",
"attributedSales14d",
"brandHaloDetailPageViewsClicks14d",
"brandHaloAttributedAddToCartClicks14d",
"brandHaloAttributedPurchases14d",
"brandHaloUnitsSold14d",
"brandHaloAttributedSales14d",
"attributedNewToBrandPurchases14d",
"attributedNewToBrandUnitsSold14d",
"attributedNewToBrandSales14d",
"brandHaloNewToBrandPurchases14d",
"brandHaloNewToBrandUnitsSold14d",
"brandHaloNewToBrandSales14d",
],
}


class AttributionReport(AmazonAdsStream):
"""
This stream corresponds to Amazon Advertising API - Attribution Reports
https://advertising.amazon.com/API/docs/en-us/amazon-attribution-prod-3p/#/
"""

model = AttributionReportModel
primary_key = None
data_field = "reports"
page_size = 300

report_type = ""
metrics = ""
group_by = ""

_next_page_token_field = "cursorId"
_current_profile_id = ""

REPORT_DATE_FORMAT = "YYYYMMDD"
CONFIG_DATE_FORMAT = "YYYY-MM-DD"
REPORTING_PERIOD = 90

def __init__(self, config: Mapping[str, Any], *args, **kwargs):
self._start_date = config.get("start_date")
self._req_start_date = ""
self._req_end_date = ""

super().__init__(config, *args, **kwargs)

def _set_dates(self, profile: Profile):
new_start_date = pendulum.now(tz=profile.timezone).subtract(days=1).date()
new_end_date = pendulum.now(tz=profile.timezone).date()

if self._start_date:
new_start_date = max(self._start_date, new_end_date.subtract(days=self.REPORTING_PERIOD))

self._req_start_date = new_start_date.format(self.REPORT_DATE_FORMAT)
self._req_end_date = new_end_date.format(self.REPORT_DATE_FORMAT)

@property
def http_method(self) -> str:
return "POST"

def read_records(self, *args, **kvargs) -> Iterable[Mapping[str, Any]]:
"""
Iterate through self._profiles list and send read all records for each profile.
"""
for profile in self._profiles:
try:
self._set_dates(profile)
self._current_profile_id = profile.profileId
yield from super().read_records(*args, **kvargs)
except Exception as err:
self.logger.info("some error occurred: %s", err)

def request_headers(self, *args, **kvargs) -> MutableMapping[str, Any]:
headers = super().request_headers(*args, **kvargs)
headers["Amazon-Advertising-API-Scope"] = str(self._current_profile_id)
return headers

def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
stream_data = response.json()
next_page_token = stream_data.get(self._next_page_token_field)
if next_page_token:
return {self._next_page_token_field: next_page_token}

def path(self, **kvargs) -> str:
return "/attribution/report"

def request_body_json(
self,
stream_state: Mapping[str, Any],
stream_slice: Mapping[str, Any] = None,
next_page_token: Mapping[str, Any] = None,
) -> Optional[Mapping]:
body = {
"reportType": self.report_type,
"count": self.page_size,
"metrics": self.metrics,
"startDate": self._req_start_date,
"endDate": self._req_end_date,
self._next_page_token_field: "",
}

if self.group_by:
body["groupBy"] = self.group_by

if next_page_token:
body[self._next_page_token_field] = next_page_token[self._next_page_token_field]

return body


class AttributionReportProducts(AttributionReport):
report_type = "PRODUCTS"

metrics = ",".join(METRICS_MAP[report_type])

group_by = ""


class AttributionReportPerformanceCreative(AttributionReport):
report_type = "PERFORMANCE"

metrics = ",".join(METRICS_MAP[report_type])

group_by = "CREATIVE"


class AttributionReportPerformanceAdgroup(AttributionReport):
report_type = "PERFORMANCE"

metrics_list = METRICS_MAP[report_type]
metrics_list.append(BRAND_REFERRAL_BONUS)
metrics = ",".join(metrics_list)

group_by = "ADGROUP"


class AttributionReportPerformanceCampaign(AttributionReport):
report_type = "PERFORMANCE"

metrics_list = METRICS_MAP[report_type]
metrics_list.append(BRAND_REFERRAL_BONUS)
metrics = ",".join(metrics_list)

group_by = "CAMPAIGN"
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,8 @@ class AmazonAdsStream(HttpStream, BasicAmazonAdsStream):
Class for getting data from streams that based on single http request.
"""

data_field = ""

def __init__(self, config: Mapping[str, Any], *args, profiles: List[Profile] = None, **kwargs):
# Each AmazonAdsStream instance are dependant on list of profiles.
BasicAmazonAdsStream.__init__(self, config, profiles=profiles)
Expand All @@ -121,7 +123,10 @@ def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapp
:return an object representing single record in the response
"""
if response.status_code == HTTPStatus.OK:
yield from response.json()
if self.data_field:
yield from response.json().get(self.data_field, [])
else:
yield from response.json()
return

"""
Expand Down

0 comments on commit 454f84d

Please sign in to comment.