Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Source Amazon Ads: fix reports stream records primary keys #21677

Expand Up @@ -13,5 +13,5 @@ ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]


LABEL io.airbyte.version=0.1.29
LABEL io.airbyte.version=0.1.30
LABEL io.airbyte.name=airbyte/source-amazon-ads
Expand Up @@ -46,9 +46,6 @@ acceptance_tests:
configured_catalog_path: integration_tests/configured_catalog.json
- config_path: secrets/config_report.json
configured_catalog_path: integration_tests/configured_catalog_report.json
ignored_fields:
sponsored_products_report_stream:
- updatedAt
timeout_seconds: 3600
incremental:
tests:
Expand Down
Expand Up @@ -9,7 +9,7 @@
["profileId"],
["recordType"],
["reportDate"],
["updatedAt"]
["recordId"]
]
},
"sync_mode": "incremental",
Expand Down
Expand Up @@ -2,7 +2,6 @@
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#

from datetime import datetime
from decimal import Decimal
from typing import Any, Dict, Iterable, Type

Expand Down Expand Up @@ -43,7 +42,7 @@ class MetricsReport(CatalogModel):
profileId: int
recordType: str
reportDate: str
updatedAt: datetime
recordId: str
# This property will be overwritten with autogenerated model based on metrics list
metric: None

Expand Down
Expand Up @@ -99,6 +99,13 @@
}


METRICS_TYPE_TO_ID_MAP = {
"keywords": "keywordBid",
"adGroups": "adGroupId",
"campaigns": "campaignId",
}


class SponsoredBrandsReportStream(ReportStream):
"""
https://advertising.amazon.com/API/docs/en-us/reference/sponsored-brands/2/reports
Expand All @@ -108,6 +115,7 @@ def report_init_endpoint(self, record_type: str) -> str:
return f"/v2/hsa/{record_type}/report"

metrics_map = METRICS_MAP
metrics_type_to_id_map = METRICS_TYPE_TO_ID_MAP

def _get_init_report_body(self, report_date: str, record_type: str, profile):
metrics_list = self.metrics_map[record_type]
Expand Down
Expand Up @@ -68,6 +68,13 @@
}


METRICS_TYPE_TO_ID_MAP = {
"keywords": "keywordBid",
"adGroups": "adGroupId",
"campaigns": "campaignId",
}


class SponsoredBrandsVideoReportStream(ReportStream):
"""
https://advertising.amazon.com/API/docs/en-us/reference/sponsored-brands/2/reports
Expand All @@ -77,6 +84,7 @@ def report_init_endpoint(self, record_type: str) -> str:
return f"/v2/hsa/{record_type}/report"

metrics_map = METRICS_MAP
metrics_type_to_id_map = METRICS_TYPE_TO_ID_MAP

def _get_init_report_body(self, report_date: str, record_type: str, profile):
metrics_list = self.metrics_map[record_type]
Expand Down
Expand Up @@ -165,6 +165,9 @@
}


METRICS_TYPE_TO_ID_MAP = {"campaigns": "campaignId", "adGroups": "adGroupId", "productAds": "adId", "targets": "targetId", "asins": "asin"}


class Tactics(str, Enum):
T00001 = "T00001"
T00020 = "T00020"
Expand All @@ -181,6 +184,7 @@ def report_init_endpoint(self, record_type: str) -> str:
return f"/sd/{record_type}/report"

metrics_map = METRICS_MAP
metrics_type_to_id_map = METRICS_TYPE_TO_ID_MAP

def _get_init_report_body(self, report_date: str, record_type: str, profile):
if record_type == RecordType.ASINS and profile.accountInfo.type == "vendor":
Expand Down
Expand Up @@ -243,6 +243,17 @@
}


METRICS_TYPE_TO_ID_MAP = {
"campaigns": "campaignId",
"adGroups": "adGroupId",
"keywords": "keywordId",
"productAds": "adId",
"asins_keywords": "asin",
"asins_targets": "asin",
"targets": "targetId",
}


class SponsoredProductsReportStream(ReportStream):
"""
https://advertising.amazon.com/API/docs/en-us/sponsored-products/2-0/openapi#/Reports
Expand All @@ -252,6 +263,7 @@ def report_init_endpoint(self, record_type: str) -> str:
return f"/v2/sp/{record_type}/report"

metrics_map = METRICS_MAP
metrics_type_to_id_map = METRICS_TYPE_TO_ID_MAP

def _get_init_report_body(self, report_date: str, record_type: str, profile):
metrics_list = self.metrics_map[record_type]
Expand Down
Expand Up @@ -89,7 +89,7 @@ class ReportStream(BasicAmazonAdsStream, ABC):
Common base class for report streams
"""

primary_key = ["profileId", "recordType", "reportDate", "updatedAt"]
primary_key = ["profileId", "recordType", "reportDate", "recordId"]
# https://advertising.amazon.com/API/docs/en-us/reporting/v2/faq#what-is-the-available-report-history-for-the-version-2-reporting-api
REPORTING_PERIOD = 60
# (Service limits section)
Expand Down Expand Up @@ -160,7 +160,7 @@ def read_records(
profileId=report_info.profile_id,
recordType=report_info.record_type,
reportDate=report_date,
updatedAt=pendulum.now(tz=profile.timezone).replace(microsecond=0).to_iso8601_string(),
recordId=metric_object[self.metrics_type_to_id_map[report_info.record_type]],
metric=metric_object,
).dict()

Expand Down Expand Up @@ -245,6 +245,13 @@ def metrics_map(self) -> Dict[str, List]:
:return: Map record type to list of available metrics
"""

@property
@abstractmethod
def metrics_type_to_id_map(self) -> Dict[str, List]:
"""
:return: Map record type to to its unique identifier in metrics
"""

def _check_status(self, report_info: ReportInfo) -> Tuple[Status, str]:
"""
Check report status and return download link if report generated successfuly
Expand Down Expand Up @@ -290,6 +297,7 @@ def get_start_date(self, profile: Profile, stream_state: Mapping[str, Any]) -> D
start_date = stream_state.get(str(profile.profileId), {}).get(self.cursor_field)
if start_date:
start_date = pendulum.from_format(start_date, self.REPORT_DATE_FORMAT).date()
# Taking date from state if it's not older than 60 days
davydov-d marked this conversation as resolved.
Show resolved Hide resolved
return max(start_date, today.subtract(days=self.REPORTING_PERIOD))
if self._start_date:
return max(self._start_date, today.subtract(days=self.REPORTING_PERIOD))
Expand Down Expand Up @@ -361,13 +369,13 @@ def _init_reports(self, profile: Profile, report_date: str) -> List[ReportInfo]:
continue
# Some of the record types has subtypes. For example asins type
# for product report have keyword and targets subtypes and it
# repseneted as asins_keywords and asins_targets types. Those
# subtypes have mutualy excluded parameters so we requesting
# represented as asins_keywords and asins_targets types. Those
# subtypes have mutually excluded parameters so we requesting
# different metric list for each record.
record_type = record_type.split("_")[0]
request_record_type = record_type.split("_")[0]
self.logger.info(f"Initiating report generation for {profile.profileId} profile with {record_type} type for {report_date} date")
response = self._send_http_request(
urljoin(self._url, self.report_init_endpoint(record_type)),
urljoin(self._url, self.report_init_endpoint(request_record_type)),
profile.profileId,
report_init_body,
)
Expand Down
Expand Up @@ -36,31 +36,62 @@
[
{
"campaignId": 214078428,
"campaignName": "sample-campaign-name-214078428"
"campaignName": "sample-campaign-name-214078428",
"adGroupId": "6490134",
"adId": "665320125",
"targetId": "791320341",
"asin": "G000PSH142",
"keywordBid": "511234974",
"keywordId": "965783021"
},
{
"campaignId": 44504582,
"campaignName": "sample-campaign-name-44504582"
"campaignName": "sample-campaign-name-44504582",
"adGroupId": "6490134",
"adId": "665320125",
"targetId": "791320341",
"asin": "G000PSH142",
"keywordBid": "511234974",
"keywordId": "965783021"
},
{
"campaignId": 509144838,
"campaignName": "sample-campaign-name-509144838"
"campaignName": "sample-campaign-name-509144838",
"adGroupId": "6490134",
"adId": "665320125",
"targetId": "791320341",
"asin": "G000PSH142",
"keywordBid": "511234974",
"keywordId": "965783021"
},
{
"campaignId": 231712082,
"campaignName": "sample-campaign-name-231712082"
"campaignName": "sample-campaign-name-231712082",
"adGroupId": "6490134",
"adId": "665320125",
"targetId": "791320341",
"asin": "G000PSH142",
"keywordBid": "511234974",
"keywordId": "965783021"
},
{
"campaignId": 895306040,
"campaignName": "sample-campaign-name-895306040"
"campaignName": "sample-campaign-name-895306040",
"adGroupId": "6490134",
"adId": "665320125",
"targetId": "791320341",
"asin": "G000PSH142",
"keywordBid": "511234974",
"keywordId": "965783021"
}
]
"""
METRIC_RESPONSE = b64decode(
"""
H4sIAAAAAAAAAIvmUlCoBmIFBaXkxNyCxMz0PM8UJSsFI0MTA3MLEyMLHVRJv8TcVKC0UjGQn5Oq
CxPWzQOK68I1KQE11ergMNrExNTAxNTCiBSTYXrwGmxqYGloYmJhTJKb4ZrwGm1kbGhuaGRAmqPh
mvAabWFpamxgZmBiQIrRcE1go7liAYX9dsTHAQAA
H4sIANnqymMC/92SsYrCQBBA+3zFsrWBmdnZ7K6lTbSRgyvFYjFBwl2iJIqI+O+3p2aPEyxSmmKLnceb4jHJKhHiEp
4QcuPrva+2zaKQU0HIYCyTnfyHS1+XAcsu/L/LtB+nTZinUZIPyxd5uzvubxtlxg5Q8R97jDOtCJB0Dw6+3ZaHOzQO
A1SM0eqq5hfkAPDxOUemnnyV59OuLWbVTdSIpNgZfsL3tS7TxioglAFeJy8aMGtgbWlIgt4ZRwENDpmtGnQFURpHA1
JokGDYGURpHA2s0woyYBjSIErv1SBZJz+HyV3zFgUAAA==
"""
)
METRICS_COUNT = 5
Expand Down
3 changes: 2 additions & 1 deletion docs/integrations/sources/amazon-ads.md
Expand Up @@ -94,8 +94,9 @@ Information about expected report generation waiting time you may find [here](ht

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:---------------------------------------------------------|:----------------------------------------------------------------------------------------------------------------|
| 0.1.30 | 2023-01-30 | [21677](https://github.com/airbytehq/airbyte/pull/21677) | Fix bug with non-unique primary keys in report streams. Add asins_keywords and asins_targets |
| 0.1.29 | 2023-01-27 | [22038](https://github.com/airbytehq/airbyte/pull/22038) | Set `AvailabilityStrategy` for streams explicitly to `None` |
| 0.1.28 | 2023-01-18 | [19491](https://github.com/airbytehq/airbyte/pull/19491) | Add option to customize look back window value
| 0.1.28 | 2023-01-18 | [19491](https://github.com/airbytehq/airbyte/pull/19491) | Add option to customize look back window value |
| 0.1.27 | 2023-01-05 | [21082](https://github.com/airbytehq/airbyte/pull/21082) | Fix bug with handling: "Report date is too far in the past." - partial revert of #20662 |
| 0.1.26 | 2022-12-19 | [20662](https://github.com/airbytehq/airbyte/pull/20662) | Fix bug with handling: "Report date is too far in the past." |
| 0.1.25 | 2022-11-08 | [18985](https://github.com/airbytehq/airbyte/pull/18985) | Remove "report_wait_timeout", "report_generation_max_retries" from config |
Expand Down