Skip to content

Commit

Permalink
Source Amazon Ads: fix reports stream records primary keys (#21677)
Browse files Browse the repository at this point in the history
* Source Amazon Ads: fix reports stream records primary keys

* Source Amazon Ads: update changelog

* #1332 source Amazon Ads: review fixes

* #1332 source amazon ads: remove unused imports

* #1332 source amazon ads: bump major version

* auto-bump connector version

---------

Co-authored-by: Denys Davydov <davydov.den18@gmail.com>
Co-authored-by: Octavia Squidington III <octavia-squidington-iii@users.noreply.github.com>
  • Loading branch information
3 people committed Feb 6, 2023
1 parent 449c3d8 commit 7d13ad5
Show file tree
Hide file tree
Showing 13 changed files with 92 additions and 24 deletions.
Expand Up @@ -61,7 +61,7 @@
- name: Amazon Ads
sourceDefinitionId: c6b0a29e-1da9-4512-9002-7bfd0cba2246
dockerRepository: airbyte/source-amazon-ads
dockerImageTag: 0.1.29
dockerImageTag: 1.0.0
documentationUrl: https://docs.airbyte.com/integrations/sources/amazon-ads
icon: amazonads.svg
sourceType: api
Expand Down
Expand Up @@ -804,7 +804,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-amazon-ads:0.1.29"
- dockerImage: "airbyte/source-amazon-ads:1.0.0"
spec:
documentationUrl: "https://docs.airbyte.com/integrations/sources/amazon-ads"
connectionSpecification:
Expand Down
Expand Up @@ -13,5 +13,5 @@ ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]


LABEL io.airbyte.version=0.1.29
LABEL io.airbyte.version=1.0.0
LABEL io.airbyte.name=airbyte/source-amazon-ads
Expand Up @@ -46,9 +46,6 @@ acceptance_tests:
configured_catalog_path: integration_tests/configured_catalog.json
- config_path: secrets/config_report.json
configured_catalog_path: integration_tests/configured_catalog_report.json
ignored_fields:
sponsored_products_report_stream:
- updatedAt
timeout_seconds: 3600
incremental:
tests:
Expand Down
Expand Up @@ -9,7 +9,7 @@
["profileId"],
["recordType"],
["reportDate"],
["updatedAt"]
["recordId"]
]
},
"sync_mode": "incremental",
Expand Down
Expand Up @@ -2,7 +2,6 @@
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#

from datetime import datetime
from decimal import Decimal
from typing import Any, Dict, Iterable, Type

Expand Down Expand Up @@ -43,7 +42,7 @@ class MetricsReport(CatalogModel):
profileId: int
recordType: str
reportDate: str
updatedAt: datetime
recordId: str
# This property will be overwritten with autogenerated model based on metrics list
metric: None

Expand Down
Expand Up @@ -99,6 +99,13 @@
}


METRICS_TYPE_TO_ID_MAP = {
"keywords": "keywordBid",
"adGroups": "adGroupId",
"campaigns": "campaignId",
}


class SponsoredBrandsReportStream(ReportStream):
"""
https://advertising.amazon.com/API/docs/en-us/reference/sponsored-brands/2/reports
Expand All @@ -108,6 +115,7 @@ def report_init_endpoint(self, record_type: str) -> str:
return f"/v2/hsa/{record_type}/report"

metrics_map = METRICS_MAP
metrics_type_to_id_map = METRICS_TYPE_TO_ID_MAP

def _get_init_report_body(self, report_date: str, record_type: str, profile):
metrics_list = self.metrics_map[record_type]
Expand Down
Expand Up @@ -68,6 +68,13 @@
}


METRICS_TYPE_TO_ID_MAP = {
"keywords": "keywordBid",
"adGroups": "adGroupId",
"campaigns": "campaignId",
}


class SponsoredBrandsVideoReportStream(ReportStream):
"""
https://advertising.amazon.com/API/docs/en-us/reference/sponsored-brands/2/reports
Expand All @@ -77,6 +84,7 @@ def report_init_endpoint(self, record_type: str) -> str:
return f"/v2/hsa/{record_type}/report"

metrics_map = METRICS_MAP
metrics_type_to_id_map = METRICS_TYPE_TO_ID_MAP

def _get_init_report_body(self, report_date: str, record_type: str, profile):
metrics_list = self.metrics_map[record_type]
Expand Down
Expand Up @@ -165,6 +165,9 @@
}


METRICS_TYPE_TO_ID_MAP = {"campaigns": "campaignId", "adGroups": "adGroupId", "productAds": "adId", "targets": "targetId", "asins": "asin"}


class Tactics(str, Enum):
T00001 = "T00001"
T00020 = "T00020"
Expand All @@ -181,6 +184,7 @@ def report_init_endpoint(self, record_type: str) -> str:
return f"/sd/{record_type}/report"

metrics_map = METRICS_MAP
metrics_type_to_id_map = METRICS_TYPE_TO_ID_MAP

def _get_init_report_body(self, report_date: str, record_type: str, profile):
if record_type == RecordType.ASINS and profile.accountInfo.type == "vendor":
Expand Down
Expand Up @@ -243,6 +243,17 @@
}


METRICS_TYPE_TO_ID_MAP = {
"campaigns": "campaignId",
"adGroups": "adGroupId",
"keywords": "keywordId",
"productAds": "adId",
"asins_keywords": "asin",
"asins_targets": "asin",
"targets": "targetId",
}


class SponsoredProductsReportStream(ReportStream):
"""
https://advertising.amazon.com/API/docs/en-us/sponsored-products/2-0/openapi#/Reports
Expand All @@ -252,6 +263,7 @@ def report_init_endpoint(self, record_type: str) -> str:
return f"/v2/sp/{record_type}/report"

metrics_map = METRICS_MAP
metrics_type_to_id_map = METRICS_TYPE_TO_ID_MAP

def _get_init_report_body(self, report_date: str, record_type: str, profile):
metrics_list = self.metrics_map[record_type]
Expand Down
Expand Up @@ -89,7 +89,7 @@ class ReportStream(BasicAmazonAdsStream, ABC):
Common base class for report streams
"""

primary_key = ["profileId", "recordType", "reportDate", "updatedAt"]
primary_key = ["profileId", "recordType", "reportDate", "recordId"]
# https://advertising.amazon.com/API/docs/en-us/reporting/v2/faq#what-is-the-available-report-history-for-the-version-2-reporting-api
REPORTING_PERIOD = 60
# (Service limits section)
Expand Down Expand Up @@ -160,7 +160,7 @@ def read_records(
profileId=report_info.profile_id,
recordType=report_info.record_type,
reportDate=report_date,
updatedAt=pendulum.now(tz=profile.timezone).replace(microsecond=0).to_iso8601_string(),
recordId=metric_object[self.metrics_type_to_id_map[report_info.record_type]],
metric=metric_object,
).dict()

Expand Down Expand Up @@ -245,6 +245,13 @@ def metrics_map(self) -> Dict[str, List]:
:return: Map record type to list of available metrics
"""

@property
@abstractmethod
def metrics_type_to_id_map(self) -> Dict[str, List]:
"""
:return: Map record type to to its unique identifier in metrics
"""

def _check_status(self, report_info: ReportInfo) -> Tuple[Status, str]:
"""
Check report status and return download link if report generated successfuly
Expand Down Expand Up @@ -290,6 +297,7 @@ def get_start_date(self, profile: Profile, stream_state: Mapping[str, Any]) -> D
start_date = stream_state.get(str(profile.profileId), {}).get(self.cursor_field)
if start_date:
start_date = pendulum.from_format(start_date, self.REPORT_DATE_FORMAT).date()
# Taking date from state if it's not older than 60 days
return max(start_date, today.subtract(days=self.REPORTING_PERIOD))
if self._start_date:
return max(self._start_date, today.subtract(days=self.REPORTING_PERIOD))
Expand Down Expand Up @@ -361,13 +369,13 @@ def _init_reports(self, profile: Profile, report_date: str) -> List[ReportInfo]:
continue
# Some of the record types has subtypes. For example asins type
# for product report have keyword and targets subtypes and it
# repseneted as asins_keywords and asins_targets types. Those
# subtypes have mutualy excluded parameters so we requesting
# represented as asins_keywords and asins_targets types. Those
# subtypes have mutually excluded parameters so we requesting
# different metric list for each record.
record_type = record_type.split("_")[0]
request_record_type = record_type.split("_")[0]
self.logger.info(f"Initiating report generation for {profile.profileId} profile with {record_type} type for {report_date} date")
response = self._send_http_request(
urljoin(self._url, self.report_init_endpoint(record_type)),
urljoin(self._url, self.report_init_endpoint(request_record_type)),
profile.profileId,
report_init_body,
)
Expand Down
Expand Up @@ -36,31 +36,62 @@
[
{
"campaignId": 214078428,
"campaignName": "sample-campaign-name-214078428"
"campaignName": "sample-campaign-name-214078428",
"adGroupId": "6490134",
"adId": "665320125",
"targetId": "791320341",
"asin": "G000PSH142",
"keywordBid": "511234974",
"keywordId": "965783021"
},
{
"campaignId": 44504582,
"campaignName": "sample-campaign-name-44504582"
"campaignName": "sample-campaign-name-44504582",
"adGroupId": "6490134",
"adId": "665320125",
"targetId": "791320341",
"asin": "G000PSH142",
"keywordBid": "511234974",
"keywordId": "965783021"
},
{
"campaignId": 509144838,
"campaignName": "sample-campaign-name-509144838"
"campaignName": "sample-campaign-name-509144838",
"adGroupId": "6490134",
"adId": "665320125",
"targetId": "791320341",
"asin": "G000PSH142",
"keywordBid": "511234974",
"keywordId": "965783021"
},
{
"campaignId": 231712082,
"campaignName": "sample-campaign-name-231712082"
"campaignName": "sample-campaign-name-231712082",
"adGroupId": "6490134",
"adId": "665320125",
"targetId": "791320341",
"asin": "G000PSH142",
"keywordBid": "511234974",
"keywordId": "965783021"
},
{
"campaignId": 895306040,
"campaignName": "sample-campaign-name-895306040"
"campaignName": "sample-campaign-name-895306040",
"adGroupId": "6490134",
"adId": "665320125",
"targetId": "791320341",
"asin": "G000PSH142",
"keywordBid": "511234974",
"keywordId": "965783021"
}
]
"""
METRIC_RESPONSE = b64decode(
"""
H4sIAAAAAAAAAIvmUlCoBmIFBaXkxNyCxMz0PM8UJSsFI0MTA3MLEyMLHVRJv8TcVKC0UjGQn5Oq
CxPWzQOK68I1KQE11ergMNrExNTAxNTCiBSTYXrwGmxqYGloYmJhTJKb4ZrwGm1kbGhuaGRAmqPh
mvAabWFpamxgZmBiQIrRcE1go7liAYX9dsTHAQAA
H4sIANnqymMC/92SsYrCQBBA+3zFsrWBmdnZ7K6lTbSRgyvFYjFBwl2iJIqI+O+3p2aPEyxSmmKLnceb4jHJKhHiEp
4QcuPrva+2zaKQU0HIYCyTnfyHS1+XAcsu/L/LtB+nTZinUZIPyxd5uzvubxtlxg5Q8R97jDOtCJB0Dw6+3ZaHOzQO
A1SM0eqq5hfkAPDxOUemnnyV59OuLWbVTdSIpNgZfsL3tS7TxioglAFeJy8aMGtgbWlIgt4ZRwENDpmtGnQFURpHA1
JokGDYGURpHA2s0woyYBjSIErv1SBZJz+HyV3zFgUAAA==
"""
)
METRICS_COUNT = 5
Expand Down
3 changes: 2 additions & 1 deletion docs/integrations/sources/amazon-ads.md
Expand Up @@ -94,8 +94,9 @@ Information about expected report generation waiting time you may find [here](ht

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:---------------------------------------------------------|:----------------------------------------------------------------------------------------------------------------|
| 1.0.0 | 2023-01-30 | [21677](https://github.com/airbytehq/airbyte/pull/21677) | Fix bug with non-unique primary keys in report streams. Add asins_keywords and asins_targets |
| 0.1.29 | 2023-01-27 | [22038](https://github.com/airbytehq/airbyte/pull/22038) | Set `AvailabilityStrategy` for streams explicitly to `None` |
| 0.1.28 | 2023-01-18 | [19491](https://github.com/airbytehq/airbyte/pull/19491) | Add option to customize look back window value
| 0.1.28 | 2023-01-18 | [19491](https://github.com/airbytehq/airbyte/pull/19491) | Add option to customize look back window value |
| 0.1.27 | 2023-01-05 | [21082](https://github.com/airbytehq/airbyte/pull/21082) | Fix bug with handling: "Report date is too far in the past." - partial revert of #20662 |
| 0.1.26 | 2022-12-19 | [20662](https://github.com/airbytehq/airbyte/pull/20662) | Fix bug with handling: "Report date is too far in the past." |
| 0.1.25 | 2022-11-08 | [18985](https://github.com/airbytehq/airbyte/pull/18985) | Remove "report_wait_timeout", "report_generation_max_retries" from config |
Expand Down

0 comments on commit 7d13ad5

Please sign in to comment.