From b21b387c4fb9031dd27bdf1ba516a3b1e7a3183c Mon Sep 17 00:00:00 2001 From: Serhii Chvaliuk Date: Tue, 16 Aug 2022 19:51:40 +0300 Subject: [PATCH] =?UTF-8?q?=F0=9F=90=9B=20Source=20Amazon=20Ads=20-=20Gene?= =?UTF-8?q?rate=20slices=20by=20lazy=20evaluation=20(#15637)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Sergey Chvalyuk --- .../resources/seed/source_definitions.yaml | 2 +- .../src/main/resources/seed/source_specs.yaml | 2 +- .../connectors/source-amazon-ads/Dockerfile | 2 +- .../connectors/source-amazon-ads/setup.py | 2 +- .../streams/report_streams/report_streams.py | 38 +++++++---- .../source_amazon_ads/utils.py | 15 +++++ .../unit_tests/test_report_streams.py | 65 +++++++++++++++++-- docs/integrations/sources/amazon-ads.md | 1 + 8 files changed, 103 insertions(+), 24 deletions(-) create mode 100644 airbyte-integrations/connectors/source-amazon-ads/source_amazon_ads/utils.py diff --git a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml index 981b6f69e200..b870dc0e4b78 100644 --- a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml @@ -17,7 +17,7 @@ - name: Amazon Ads sourceDefinitionId: c6b0a29e-1da9-4512-9002-7bfd0cba2246 dockerRepository: airbyte/source-amazon-ads - dockerImageTag: 0.1.12 + dockerImageTag: 0.1.14 documentationUrl: https://docs.airbyte.io/integrations/sources/amazon-ads icon: amazonads.svg sourceType: api diff --git a/airbyte-config/init/src/main/resources/seed/source_specs.yaml b/airbyte-config/init/src/main/resources/seed/source_specs.yaml index 4c2402091ec0..52660d3de78e 100644 --- a/airbyte-config/init/src/main/resources/seed/source_specs.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_specs.yaml @@ -87,7 +87,7 @@ supportsNormalization: false supportsDBT: false supported_destination_sync_modes: [] -- dockerImage: "airbyte/source-amazon-ads:0.1.12" +- dockerImage: "airbyte/source-amazon-ads:0.1.14" spec: documentationUrl: "https://docs.airbyte.com/integrations/sources/amazon-ads" connectionSpecification: diff --git a/airbyte-integrations/connectors/source-amazon-ads/Dockerfile b/airbyte-integrations/connectors/source-amazon-ads/Dockerfile index b0ebacf5609e..0c9c33346bf2 100644 --- a/airbyte-integrations/connectors/source-amazon-ads/Dockerfile +++ b/airbyte-integrations/connectors/source-amazon-ads/Dockerfile @@ -12,5 +12,5 @@ RUN pip install . ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py" ENTRYPOINT ["python", "/airbyte/integration_code/main.py"] -LABEL io.airbyte.version=0.1.12 +LABEL io.airbyte.version=0.1.14 LABEL io.airbyte.name=airbyte/source-amazon-ads diff --git a/airbyte-integrations/connectors/source-amazon-ads/setup.py b/airbyte-integrations/connectors/source-amazon-ads/setup.py index 5d9c4a766fd8..7f4200df6c1b 100644 --- a/airbyte-integrations/connectors/source-amazon-ads/setup.py +++ b/airbyte-integrations/connectors/source-amazon-ads/setup.py @@ -12,7 +12,7 @@ "pytest-mock~=3.7.0", "jsonschema~=3.2.0", "responses~=0.13.3", - "freezegun~=1.1.0", + "freezegun~=1.2.0", ] setup( diff --git a/airbyte-integrations/connectors/source-amazon-ads/source_amazon_ads/streams/report_streams/report_streams.py b/airbyte-integrations/connectors/source-amazon-ads/source_amazon_ads/streams/report_streams/report_streams.py index 01814cdd6c78..032e3611182d 100644 --- a/airbyte-integrations/connectors/source-amazon-ads/source_amazon_ads/streams/report_streams/report_streams.py +++ b/airbyte-integrations/connectors/source-amazon-ads/source_amazon_ads/streams/report_streams/report_streams.py @@ -21,6 +21,7 @@ from pydantic import BaseModel from source_amazon_ads.schemas import CatalogModel, MetricsReport, Profile from source_amazon_ads.streams.common import BasicAmazonAdsStream +from source_amazon_ads.utils import iterate_one_by_one logger = AirbyteLogger() @@ -92,6 +93,8 @@ class ReportStream(BasicAmazonAdsStream, ABC): primary_key = ["profileId", "recordType", "reportDate", "updatedAt"] # Amazon ads updates the data for the next 3 days LOOK_BACK_WINDOW = 3 + # https://advertising.amazon.com/API/docs/en-us/reporting/v2/faq#what-is-the-available-report-history-for-the-version-2-reporting-api + REPORTING_PERIOD = 60 # (Service limits section) # Format used to specify metric generation date over Amazon Ads API. REPORT_DATE_FORMAT = "YYYYMMDD" @@ -265,35 +268,42 @@ def _send_http_request(self, url: str, profile_id: int, json: dict = None): raise TooManyRequests() return response - def get_date_range(self, start_date: Date, end_date: Date) -> Iterable[str]: - for days in range((end_date - start_date).days + 1): - yield start_date.add(days=days).format(ReportStream.REPORT_DATE_FORMAT) + def get_date_range(self, start_date: Date, timezone: str) -> Iterable[str]: + while True: + if start_date > pendulum.today(tz=timezone).date(): + break + yield start_date.format(self.REPORT_DATE_FORMAT) + start_date = start_date.add(days=1) def get_start_date(self, profile: Profile, stream_state: Mapping[str, Any]) -> Date: today = pendulum.today(tz=profile.timezone).date() start_date = stream_state.get(str(profile.profileId), {}).get(self.cursor_field) if start_date: start_date = pendulum.from_format(start_date, self.REPORT_DATE_FORMAT).date() - return max(start_date, today.subtract(days=60)) + return max(start_date, today.subtract(days=self.REPORTING_PERIOD)) if self._start_date: - return max(self._start_date, today.subtract(days=60)) + return max(self._start_date, today.subtract(days=self.REPORTING_PERIOD)) return today + def stream_profile_slices(self, profile: Profile, stream_state: Mapping[str, Any]) -> Iterable[Mapping[str, Any]]: + start_date = self.get_start_date(profile, stream_state) + for report_date in self.get_date_range(start_date, profile.timezone): + yield {"profile": profile, self.cursor_field: report_date} + def stream_slices( self, sync_mode: SyncMode, cursor_field: List[str] = None, stream_state: Mapping[str, Any] = None ) -> Iterable[Optional[Mapping[str, Any]]]: stream_state = stream_state or {} + no_data = True + + generators = [self.stream_profile_slices(profile, stream_state) for profile in self._profiles] + for _slice in iterate_one_by_one(*generators): + no_data = False + yield _slice - slices = [] - for profile in self._profiles: - today = pendulum.today(tz=profile.timezone).date() - start_date = self.get_start_date(profile, stream_state) - for report_date in self.get_date_range(start_date, today): - slices.append({"profile": profile, self.cursor_field: report_date}) - if not slices: - return [None] - return slices + if no_data: + yield None def get_updated_state(self, current_stream_state: Dict[str, Any], latest_data: Mapping[str, Any]) -> Mapping[str, Any]: profileId = str(latest_data["profileId"]) diff --git a/airbyte-integrations/connectors/source-amazon-ads/source_amazon_ads/utils.py b/airbyte-integrations/connectors/source-amazon-ads/source_amazon_ads/utils.py new file mode 100644 index 000000000000..caa66c1d13bb --- /dev/null +++ b/airbyte-integrations/connectors/source-amazon-ads/source_amazon_ads/utils.py @@ -0,0 +1,15 @@ +# +# Copyright (c) 2022 Airbyte, Inc., all rights reserved. +# + + +def iterate_one_by_one(*iterables): + iterables = list(iterables) + while iterables: + iterable = iterables.pop(0) + try: + yield next(iterable) + except StopIteration: + pass + else: + iterables.append(iterable) diff --git a/airbyte-integrations/connectors/source-amazon-ads/unit_tests/test_report_streams.py b/airbyte-integrations/connectors/source-amazon-ads/unit_tests/test_report_streams.py index 3bd230139e36..c4c0765ea726 100644 --- a/airbyte-integrations/connectors/source-amazon-ads/unit_tests/test_report_streams.py +++ b/airbyte-integrations/connectors/source-amazon-ads/unit_tests/test_report_streams.py @@ -4,6 +4,8 @@ import re from base64 import b64decode +from datetime import timedelta +from functools import partial from unittest import mock import pytest @@ -301,7 +303,7 @@ def __call__(self, request): def test_display_report_stream_slices_full_refresh(config): profiles = make_profiles() stream = SponsoredDisplayReportStream(config, profiles, authenticator=mock.MagicMock()) - slices = stream.stream_slices(SyncMode.full_refresh, cursor_field=stream.cursor_field) + slices = list(stream.stream_slices(SyncMode.full_refresh, cursor_field=stream.cursor_field)) assert slices == [{"profile": profiles[0], "reportDate": "20210729"}] @@ -311,7 +313,7 @@ def test_display_report_stream_slices_incremental(config): profiles = make_profiles() stream = SponsoredDisplayReportStream(config, profiles, authenticator=mock.MagicMock()) stream_state = {str(profiles[0].profileId): {"reportDate": "20210725"}} - slices = stream.stream_slices(SyncMode.incremental, cursor_field=stream.cursor_field, stream_state=stream_state) + slices = list(stream.stream_slices(SyncMode.incremental, cursor_field=stream.cursor_field, stream_state=stream_state)) assert slices == [ {"profile": profiles[0], "reportDate": "20210725"}, {"profile": profiles[0], "reportDate": "20210726"}, @@ -321,13 +323,13 @@ def test_display_report_stream_slices_incremental(config): ] stream_state = {str(profiles[0].profileId): {"reportDate": "20210730"}} - slices = stream.stream_slices(SyncMode.incremental, cursor_field=stream.cursor_field, stream_state=stream_state) + slices = list(stream.stream_slices(SyncMode.incremental, cursor_field=stream.cursor_field, stream_state=stream_state)) assert slices == [None] - slices = stream.stream_slices(SyncMode.incremental, cursor_field=stream.cursor_field, stream_state={}) + slices = list(stream.stream_slices(SyncMode.incremental, cursor_field=stream.cursor_field, stream_state={})) assert slices == [{"profile": profiles[0], "reportDate": "20210729"}] - slices = stream.stream_slices(SyncMode.incremental, cursor_field=None, stream_state={}) + slices = list(stream.stream_slices(SyncMode.incremental, cursor_field=None, stream_state={})) assert slices == [{"profile": profiles[0], "reportDate": "20210729"}] @@ -358,5 +360,56 @@ def test_stream_slices_different_timezones(config): profile1 = Profile(profileId=1, timezone="America/Los_Angeles", accountInfo=AccountInfo(marketplaceStringId="", id="", type="seller")) profile2 = Profile(profileId=2, timezone="UTC", accountInfo=AccountInfo(marketplaceStringId="", id="", type="seller")) stream = SponsoredProductsReportStream(config, [profile1, profile2], authenticator=mock.MagicMock()) - slices = stream.stream_slices(SyncMode.incremental, cursor_field=stream.cursor_field, stream_state={}) + slices = list(stream.stream_slices(SyncMode.incremental, cursor_field=stream.cursor_field, stream_state={})) assert slices == [{"profile": profile1, "reportDate": "20210731"}, {"profile": profile2, "reportDate": "20210801"}] + + +def test_stream_slices_lazy_evaluation(config): + with freeze_time("2022-06-01T23:50:00+00:00") as frozen_datetime: + config["start_date"] = "2021-05-10" + profile1 = Profile(profileId=1, timezone="UTC", accountInfo=AccountInfo(marketplaceStringId="", id="", type="seller")) + profile2 = Profile(profileId=2, timezone="UTC", accountInfo=AccountInfo(marketplaceStringId="", id="", type="seller")) + + stream = SponsoredProductsReportStream(config, [profile1, profile2], authenticator=mock.MagicMock()) + stream.REPORTING_PERIOD = 5 + + slices = [] + for _slice in stream.stream_slices(SyncMode.incremental, cursor_field=stream.cursor_field): + slices.append(_slice) + frozen_datetime.tick(delta=timedelta(minutes=10)) + + assert slices == [ + {"profile": profile1, "reportDate": "20220527"}, + {"profile": profile2, "reportDate": "20220528"}, + {"profile": profile1, "reportDate": "20220528"}, + {"profile": profile2, "reportDate": "20220529"}, + {"profile": profile1, "reportDate": "20220529"}, + {"profile": profile2, "reportDate": "20220530"}, + {"profile": profile1, "reportDate": "20220530"}, + {"profile": profile2, "reportDate": "20220531"}, + {"profile": profile1, "reportDate": "20220531"}, + {"profile": profile2, "reportDate": "20220601"}, + {"profile": profile1, "reportDate": "20220601"}, + {"profile": profile2, "reportDate": "20220602"}, + {"profile": profile1, "reportDate": "20220602"}, + ] + + +def test_get_date_range_lazy_evaluation(): + get_date_range = partial(SponsoredProductsReportStream.get_date_range, SponsoredProductsReportStream) + + with freeze_time("2022-06-01T12:00:00+00:00") as frozen_datetime: + date_range = list(get_date_range(start_date=Date(2022, 5, 29), timezone="UTC")) + assert date_range == ["20220529", "20220530", "20220531", "20220601"] + + date_range = list(get_date_range(start_date=Date(2022, 6, 1), timezone="UTC")) + assert date_range == ["20220601"] + + date_range = list(get_date_range(start_date=Date(2022, 6, 2), timezone="UTC")) + assert date_range == [] + + date_range = [] + for date in get_date_range(start_date=Date(2022, 5, 29), timezone="UTC"): + date_range.append(date) + frozen_datetime.tick(delta=timedelta(hours=3)) + assert date_range == ["20220529", "20220530", "20220531", "20220601", "20220602"] diff --git a/docs/integrations/sources/amazon-ads.md b/docs/integrations/sources/amazon-ads.md index 8a072bdc3d12..a5f2f7a2e859 100644 --- a/docs/integrations/sources/amazon-ads.md +++ b/docs/integrations/sources/amazon-ads.md @@ -90,6 +90,7 @@ Information about expected report generation waiting time you may find [here](ht | Version | Date | Pull Request | Subject | |:--------|:-----------|:-----------------------------------------------------------|:------------------------------------------------------------------------------------------------------------------| +| 0.1.14 | 2022-08-15 | [15637](https://github.com/airbytehq/airbyte/pull/15637) | Generate slices by lazy evaluation | | 0.1.12 | 2022-08-09 | [15469](https://github.com/airbytehq/airbyte/pull/15469) | Define primary_key for all report streams | | 0.1.11 | 2022-07-28 | [15031](https://github.com/airbytehq/airbyte/pull/15031) | Improve report streams date-range generation | | 0.1.10 | 2022-07-26 | [15042](https://github.com/airbytehq/airbyte/pull/15042) | Update `additionalProperties` field to true from schemas |