From 7d13ad5d5d38ed545156bf7d6a5805385f5dddc0 Mon Sep 17 00:00:00 2001 From: "Roman Yermilov [GL]" <86300758+roman-yermilov-gl@users.noreply.github.com> Date: Mon, 6 Feb 2023 20:53:18 +0400 Subject: [PATCH] Source Amazon Ads: fix reports stream records primary keys (#21677) * Source Amazon Ads: fix reports stream records primary keys * Source Amazon Ads: update changelog * #1332 source Amazon Ads: review fixes * #1332 source amazon ads: remove unused imports * #1332 source amazon ads: bump major version * auto-bump connector version --------- Co-authored-by: Denys Davydov Co-authored-by: Octavia Squidington III --- .../resources/seed/source_definitions.yaml | 2 +- .../src/main/resources/seed/source_specs.yaml | 2 +- .../connectors/source-amazon-ads/Dockerfile | 2 +- .../acceptance-test-config.yml | 3 -- .../configured_catalog_report.json | 2 +- .../source_amazon_ads/schemas/common.py | 3 +- .../streams/report_streams/brands_report.py | 8 ++++ .../report_streams/brands_video_report.py | 8 ++++ .../streams/report_streams/display_report.py | 4 ++ .../streams/report_streams/products_report.py | 12 +++++ .../streams/report_streams/report_streams.py | 20 +++++--- .../unit_tests/test_report_streams.py | 47 +++++++++++++++---- docs/integrations/sources/amazon-ads.md | 3 +- 13 files changed, 92 insertions(+), 24 deletions(-) diff --git a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml index 7e65e3642162a..4b87f1e73457a 100644 --- a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml @@ -61,7 +61,7 @@ - name: Amazon Ads sourceDefinitionId: c6b0a29e-1da9-4512-9002-7bfd0cba2246 dockerRepository: airbyte/source-amazon-ads - dockerImageTag: 0.1.29 + dockerImageTag: 1.0.0 documentationUrl: https://docs.airbyte.com/integrations/sources/amazon-ads icon: amazonads.svg sourceType: api diff --git a/airbyte-config/init/src/main/resources/seed/source_specs.yaml b/airbyte-config/init/src/main/resources/seed/source_specs.yaml index 1a2ba41c4fd93..6d93b5ef780f9 100644 --- a/airbyte-config/init/src/main/resources/seed/source_specs.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_specs.yaml @@ -804,7 +804,7 @@ supportsNormalization: false supportsDBT: false supported_destination_sync_modes: [] -- dockerImage: "airbyte/source-amazon-ads:0.1.29" +- dockerImage: "airbyte/source-amazon-ads:1.0.0" spec: documentationUrl: "https://docs.airbyte.com/integrations/sources/amazon-ads" connectionSpecification: diff --git a/airbyte-integrations/connectors/source-amazon-ads/Dockerfile b/airbyte-integrations/connectors/source-amazon-ads/Dockerfile index 1c1d83c84e2a3..5f9e193d240e2 100644 --- a/airbyte-integrations/connectors/source-amazon-ads/Dockerfile +++ b/airbyte-integrations/connectors/source-amazon-ads/Dockerfile @@ -13,5 +13,5 @@ ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py" ENTRYPOINT ["python", "/airbyte/integration_code/main.py"] -LABEL io.airbyte.version=0.1.29 +LABEL io.airbyte.version=1.0.0 LABEL io.airbyte.name=airbyte/source-amazon-ads diff --git a/airbyte-integrations/connectors/source-amazon-ads/acceptance-test-config.yml b/airbyte-integrations/connectors/source-amazon-ads/acceptance-test-config.yml index 322bbd96c6cc9..b9fb62d8f7e11 100644 --- a/airbyte-integrations/connectors/source-amazon-ads/acceptance-test-config.yml +++ b/airbyte-integrations/connectors/source-amazon-ads/acceptance-test-config.yml @@ -46,9 +46,6 @@ acceptance_tests: configured_catalog_path: integration_tests/configured_catalog.json - config_path: secrets/config_report.json configured_catalog_path: integration_tests/configured_catalog_report.json - ignored_fields: - sponsored_products_report_stream: - - updatedAt timeout_seconds: 3600 incremental: tests: diff --git a/airbyte-integrations/connectors/source-amazon-ads/integration_tests/configured_catalog_report.json b/airbyte-integrations/connectors/source-amazon-ads/integration_tests/configured_catalog_report.json index ded3a860befb7..ea2923cc85f74 100644 --- a/airbyte-integrations/connectors/source-amazon-ads/integration_tests/configured_catalog_report.json +++ b/airbyte-integrations/connectors/source-amazon-ads/integration_tests/configured_catalog_report.json @@ -9,7 +9,7 @@ ["profileId"], ["recordType"], ["reportDate"], - ["updatedAt"] + ["recordId"] ] }, "sync_mode": "incremental", diff --git a/airbyte-integrations/connectors/source-amazon-ads/source_amazon_ads/schemas/common.py b/airbyte-integrations/connectors/source-amazon-ads/source_amazon_ads/schemas/common.py index fb97b9eae68f7..23a06dad68015 100644 --- a/airbyte-integrations/connectors/source-amazon-ads/source_amazon_ads/schemas/common.py +++ b/airbyte-integrations/connectors/source-amazon-ads/source_amazon_ads/schemas/common.py @@ -2,7 +2,6 @@ # Copyright (c) 2022 Airbyte, Inc., all rights reserved. # -from datetime import datetime from decimal import Decimal from typing import Any, Dict, Iterable, Type @@ -43,7 +42,7 @@ class MetricsReport(CatalogModel): profileId: int recordType: str reportDate: str - updatedAt: datetime + recordId: str # This property will be overwritten with autogenerated model based on metrics list metric: None diff --git a/airbyte-integrations/connectors/source-amazon-ads/source_amazon_ads/streams/report_streams/brands_report.py b/airbyte-integrations/connectors/source-amazon-ads/source_amazon_ads/streams/report_streams/brands_report.py index 06f1f9ccfc62d..ad3df6aa6a951 100644 --- a/airbyte-integrations/connectors/source-amazon-ads/source_amazon_ads/streams/report_streams/brands_report.py +++ b/airbyte-integrations/connectors/source-amazon-ads/source_amazon_ads/streams/report_streams/brands_report.py @@ -99,6 +99,13 @@ } +METRICS_TYPE_TO_ID_MAP = { + "keywords": "keywordBid", + "adGroups": "adGroupId", + "campaigns": "campaignId", +} + + class SponsoredBrandsReportStream(ReportStream): """ https://advertising.amazon.com/API/docs/en-us/reference/sponsored-brands/2/reports @@ -108,6 +115,7 @@ def report_init_endpoint(self, record_type: str) -> str: return f"/v2/hsa/{record_type}/report" metrics_map = METRICS_MAP + metrics_type_to_id_map = METRICS_TYPE_TO_ID_MAP def _get_init_report_body(self, report_date: str, record_type: str, profile): metrics_list = self.metrics_map[record_type] diff --git a/airbyte-integrations/connectors/source-amazon-ads/source_amazon_ads/streams/report_streams/brands_video_report.py b/airbyte-integrations/connectors/source-amazon-ads/source_amazon_ads/streams/report_streams/brands_video_report.py index 12dd7017f146b..b1a0618babc21 100644 --- a/airbyte-integrations/connectors/source-amazon-ads/source_amazon_ads/streams/report_streams/brands_video_report.py +++ b/airbyte-integrations/connectors/source-amazon-ads/source_amazon_ads/streams/report_streams/brands_video_report.py @@ -68,6 +68,13 @@ } +METRICS_TYPE_TO_ID_MAP = { + "keywords": "keywordBid", + "adGroups": "adGroupId", + "campaigns": "campaignId", +} + + class SponsoredBrandsVideoReportStream(ReportStream): """ https://advertising.amazon.com/API/docs/en-us/reference/sponsored-brands/2/reports @@ -77,6 +84,7 @@ def report_init_endpoint(self, record_type: str) -> str: return f"/v2/hsa/{record_type}/report" metrics_map = METRICS_MAP + metrics_type_to_id_map = METRICS_TYPE_TO_ID_MAP def _get_init_report_body(self, report_date: str, record_type: str, profile): metrics_list = self.metrics_map[record_type] diff --git a/airbyte-integrations/connectors/source-amazon-ads/source_amazon_ads/streams/report_streams/display_report.py b/airbyte-integrations/connectors/source-amazon-ads/source_amazon_ads/streams/report_streams/display_report.py index 1dbd7156df480..24e2df3ad8051 100644 --- a/airbyte-integrations/connectors/source-amazon-ads/source_amazon_ads/streams/report_streams/display_report.py +++ b/airbyte-integrations/connectors/source-amazon-ads/source_amazon_ads/streams/report_streams/display_report.py @@ -165,6 +165,9 @@ } +METRICS_TYPE_TO_ID_MAP = {"campaigns": "campaignId", "adGroups": "adGroupId", "productAds": "adId", "targets": "targetId", "asins": "asin"} + + class Tactics(str, Enum): T00001 = "T00001" T00020 = "T00020" @@ -181,6 +184,7 @@ def report_init_endpoint(self, record_type: str) -> str: return f"/sd/{record_type}/report" metrics_map = METRICS_MAP + metrics_type_to_id_map = METRICS_TYPE_TO_ID_MAP def _get_init_report_body(self, report_date: str, record_type: str, profile): if record_type == RecordType.ASINS and profile.accountInfo.type == "vendor": diff --git a/airbyte-integrations/connectors/source-amazon-ads/source_amazon_ads/streams/report_streams/products_report.py b/airbyte-integrations/connectors/source-amazon-ads/source_amazon_ads/streams/report_streams/products_report.py index ad06f0aae6f0a..85e6c3e9e6f86 100644 --- a/airbyte-integrations/connectors/source-amazon-ads/source_amazon_ads/streams/report_streams/products_report.py +++ b/airbyte-integrations/connectors/source-amazon-ads/source_amazon_ads/streams/report_streams/products_report.py @@ -243,6 +243,17 @@ } +METRICS_TYPE_TO_ID_MAP = { + "campaigns": "campaignId", + "adGroups": "adGroupId", + "keywords": "keywordId", + "productAds": "adId", + "asins_keywords": "asin", + "asins_targets": "asin", + "targets": "targetId", +} + + class SponsoredProductsReportStream(ReportStream): """ https://advertising.amazon.com/API/docs/en-us/sponsored-products/2-0/openapi#/Reports @@ -252,6 +263,7 @@ def report_init_endpoint(self, record_type: str) -> str: return f"/v2/sp/{record_type}/report" metrics_map = METRICS_MAP + metrics_type_to_id_map = METRICS_TYPE_TO_ID_MAP def _get_init_report_body(self, report_date: str, record_type: str, profile): metrics_list = self.metrics_map[record_type] diff --git a/airbyte-integrations/connectors/source-amazon-ads/source_amazon_ads/streams/report_streams/report_streams.py b/airbyte-integrations/connectors/source-amazon-ads/source_amazon_ads/streams/report_streams/report_streams.py index 7359bbbc884b7..888b5165febe4 100644 --- a/airbyte-integrations/connectors/source-amazon-ads/source_amazon_ads/streams/report_streams/report_streams.py +++ b/airbyte-integrations/connectors/source-amazon-ads/source_amazon_ads/streams/report_streams/report_streams.py @@ -89,7 +89,7 @@ class ReportStream(BasicAmazonAdsStream, ABC): Common base class for report streams """ - primary_key = ["profileId", "recordType", "reportDate", "updatedAt"] + primary_key = ["profileId", "recordType", "reportDate", "recordId"] # https://advertising.amazon.com/API/docs/en-us/reporting/v2/faq#what-is-the-available-report-history-for-the-version-2-reporting-api REPORTING_PERIOD = 60 # (Service limits section) @@ -160,7 +160,7 @@ def read_records( profileId=report_info.profile_id, recordType=report_info.record_type, reportDate=report_date, - updatedAt=pendulum.now(tz=profile.timezone).replace(microsecond=0).to_iso8601_string(), + recordId=metric_object[self.metrics_type_to_id_map[report_info.record_type]], metric=metric_object, ).dict() @@ -245,6 +245,13 @@ def metrics_map(self) -> Dict[str, List]: :return: Map record type to list of available metrics """ + @property + @abstractmethod + def metrics_type_to_id_map(self) -> Dict[str, List]: + """ + :return: Map record type to to its unique identifier in metrics + """ + def _check_status(self, report_info: ReportInfo) -> Tuple[Status, str]: """ Check report status and return download link if report generated successfuly @@ -290,6 +297,7 @@ def get_start_date(self, profile: Profile, stream_state: Mapping[str, Any]) -> D start_date = stream_state.get(str(profile.profileId), {}).get(self.cursor_field) if start_date: start_date = pendulum.from_format(start_date, self.REPORT_DATE_FORMAT).date() + # Taking date from state if it's not older than 60 days return max(start_date, today.subtract(days=self.REPORTING_PERIOD)) if self._start_date: return max(self._start_date, today.subtract(days=self.REPORTING_PERIOD)) @@ -361,13 +369,13 @@ def _init_reports(self, profile: Profile, report_date: str) -> List[ReportInfo]: continue # Some of the record types has subtypes. For example asins type # for product report have keyword and targets subtypes and it - # repseneted as asins_keywords and asins_targets types. Those - # subtypes have mutualy excluded parameters so we requesting + # represented as asins_keywords and asins_targets types. Those + # subtypes have mutually excluded parameters so we requesting # different metric list for each record. - record_type = record_type.split("_")[0] + request_record_type = record_type.split("_")[0] self.logger.info(f"Initiating report generation for {profile.profileId} profile with {record_type} type for {report_date} date") response = self._send_http_request( - urljoin(self._url, self.report_init_endpoint(record_type)), + urljoin(self._url, self.report_init_endpoint(request_record_type)), profile.profileId, report_init_body, ) diff --git a/airbyte-integrations/connectors/source-amazon-ads/unit_tests/test_report_streams.py b/airbyte-integrations/connectors/source-amazon-ads/unit_tests/test_report_streams.py index a72a5f709e33f..81bacc27a6396 100644 --- a/airbyte-integrations/connectors/source-amazon-ads/unit_tests/test_report_streams.py +++ b/airbyte-integrations/connectors/source-amazon-ads/unit_tests/test_report_streams.py @@ -36,31 +36,62 @@ [ { "campaignId": 214078428, - "campaignName": "sample-campaign-name-214078428" + "campaignName": "sample-campaign-name-214078428", + "adGroupId": "6490134", + "adId": "665320125", + "targetId": "791320341", + "asin": "G000PSH142", + "keywordBid": "511234974", + "keywordId": "965783021" }, { "campaignId": 44504582, - "campaignName": "sample-campaign-name-44504582" + "campaignName": "sample-campaign-name-44504582", + "adGroupId": "6490134", + "adId": "665320125", + "targetId": "791320341", + "asin": "G000PSH142", + "keywordBid": "511234974", + "keywordId": "965783021" }, { "campaignId": 509144838, - "campaignName": "sample-campaign-name-509144838" + "campaignName": "sample-campaign-name-509144838", + "adGroupId": "6490134", + "adId": "665320125", + "targetId": "791320341", + "asin": "G000PSH142", + "keywordBid": "511234974", + "keywordId": "965783021" }, { "campaignId": 231712082, - "campaignName": "sample-campaign-name-231712082" + "campaignName": "sample-campaign-name-231712082", + "adGroupId": "6490134", + "adId": "665320125", + "targetId": "791320341", + "asin": "G000PSH142", + "keywordBid": "511234974", + "keywordId": "965783021" }, { "campaignId": 895306040, - "campaignName": "sample-campaign-name-895306040" + "campaignName": "sample-campaign-name-895306040", + "adGroupId": "6490134", + "adId": "665320125", + "targetId": "791320341", + "asin": "G000PSH142", + "keywordBid": "511234974", + "keywordId": "965783021" } ] """ METRIC_RESPONSE = b64decode( """ -H4sIAAAAAAAAAIvmUlCoBmIFBaXkxNyCxMz0PM8UJSsFI0MTA3MLEyMLHVRJv8TcVKC0UjGQn5Oq -CxPWzQOK68I1KQE11ergMNrExNTAxNTCiBSTYXrwGmxqYGloYmJhTJKb4ZrwGm1kbGhuaGRAmqPh -mvAabWFpamxgZmBiQIrRcE1go7liAYX9dsTHAQAA +H4sIANnqymMC/92SsYrCQBBA+3zFsrWBmdnZ7K6lTbSRgyvFYjFBwl2iJIqI+O+3p2aPEyxSmmKLnceb4jHJKhHiEp +4QcuPrva+2zaKQU0HIYCyTnfyHS1+XAcsu/L/LtB+nTZinUZIPyxd5uzvubxtlxg5Q8R97jDOtCJB0Dw6+3ZaHOzQO +A1SM0eqq5hfkAPDxOUemnnyV59OuLWbVTdSIpNgZfsL3tS7TxioglAFeJy8aMGtgbWlIgt4ZRwENDpmtGnQFURpHA1 +JokGDYGURpHA2s0woyYBjSIErv1SBZJz+HyV3zFgUAAA== """ ) METRICS_COUNT = 5 diff --git a/docs/integrations/sources/amazon-ads.md b/docs/integrations/sources/amazon-ads.md index 79efcff41b576..5ded4dda4fe42 100644 --- a/docs/integrations/sources/amazon-ads.md +++ b/docs/integrations/sources/amazon-ads.md @@ -94,8 +94,9 @@ Information about expected report generation waiting time you may find [here](ht | Version | Date | Pull Request | Subject | |:--------|:-----------|:---------------------------------------------------------|:----------------------------------------------------------------------------------------------------------------| +| 1.0.0 | 2023-01-30 | [21677](https://github.com/airbytehq/airbyte/pull/21677) | Fix bug with non-unique primary keys in report streams. Add asins_keywords and asins_targets | | 0.1.29 | 2023-01-27 | [22038](https://github.com/airbytehq/airbyte/pull/22038) | Set `AvailabilityStrategy` for streams explicitly to `None` | -| 0.1.28 | 2023-01-18 | [19491](https://github.com/airbytehq/airbyte/pull/19491) | Add option to customize look back window value +| 0.1.28 | 2023-01-18 | [19491](https://github.com/airbytehq/airbyte/pull/19491) | Add option to customize look back window value | | 0.1.27 | 2023-01-05 | [21082](https://github.com/airbytehq/airbyte/pull/21082) | Fix bug with handling: "Report date is too far in the past." - partial revert of #20662 | | 0.1.26 | 2022-12-19 | [20662](https://github.com/airbytehq/airbyte/pull/20662) | Fix bug with handling: "Report date is too far in the past." | | 0.1.25 | 2022-11-08 | [18985](https://github.com/airbytehq/airbyte/pull/18985) | Remove "report_wait_timeout", "report_generation_max_retries" from config |