Skip to content

Commit

Permalink
🐛 Source Amazon Ads - Generate slices by lazy evaluation (#15637)
Browse files Browse the repository at this point in the history
Signed-off-by: Sergey Chvalyuk <grubberr@gmail.com>
  • Loading branch information
grubberr authored and rodireich committed Aug 20, 2022
1 parent 8258c4e commit b21b387
Show file tree
Hide file tree
Showing 8 changed files with 103 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
- name: Amazon Ads
sourceDefinitionId: c6b0a29e-1da9-4512-9002-7bfd0cba2246
dockerRepository: airbyte/source-amazon-ads
dockerImageTag: 0.1.12
dockerImageTag: 0.1.14
documentationUrl: https://docs.airbyte.io/integrations/sources/amazon-ads
icon: amazonads.svg
sourceType: api
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-amazon-ads:0.1.12"
- dockerImage: "airbyte/source-amazon-ads:0.1.14"
spec:
documentationUrl: "https://docs.airbyte.com/integrations/sources/amazon-ads"
connectionSpecification:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,5 @@ RUN pip install .
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.1.12
LABEL io.airbyte.version=0.1.14
LABEL io.airbyte.name=airbyte/source-amazon-ads
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-amazon-ads/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
"pytest-mock~=3.7.0",
"jsonschema~=3.2.0",
"responses~=0.13.3",
"freezegun~=1.1.0",
"freezegun~=1.2.0",
]

setup(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from pydantic import BaseModel
from source_amazon_ads.schemas import CatalogModel, MetricsReport, Profile
from source_amazon_ads.streams.common import BasicAmazonAdsStream
from source_amazon_ads.utils import iterate_one_by_one

logger = AirbyteLogger()

Expand Down Expand Up @@ -92,6 +93,8 @@ class ReportStream(BasicAmazonAdsStream, ABC):
primary_key = ["profileId", "recordType", "reportDate", "updatedAt"]
# Amazon ads updates the data for the next 3 days
LOOK_BACK_WINDOW = 3
# https://advertising.amazon.com/API/docs/en-us/reporting/v2/faq#what-is-the-available-report-history-for-the-version-2-reporting-api
REPORTING_PERIOD = 60
# (Service limits section)
# Format used to specify metric generation date over Amazon Ads API.
REPORT_DATE_FORMAT = "YYYYMMDD"
Expand Down Expand Up @@ -265,35 +268,42 @@ def _send_http_request(self, url: str, profile_id: int, json: dict = None):
raise TooManyRequests()
return response

def get_date_range(self, start_date: Date, end_date: Date) -> Iterable[str]:
for days in range((end_date - start_date).days + 1):
yield start_date.add(days=days).format(ReportStream.REPORT_DATE_FORMAT)
def get_date_range(self, start_date: Date, timezone: str) -> Iterable[str]:
while True:
if start_date > pendulum.today(tz=timezone).date():
break
yield start_date.format(self.REPORT_DATE_FORMAT)
start_date = start_date.add(days=1)

def get_start_date(self, profile: Profile, stream_state: Mapping[str, Any]) -> Date:
today = pendulum.today(tz=profile.timezone).date()
start_date = stream_state.get(str(profile.profileId), {}).get(self.cursor_field)
if start_date:
start_date = pendulum.from_format(start_date, self.REPORT_DATE_FORMAT).date()
return max(start_date, today.subtract(days=60))
return max(start_date, today.subtract(days=self.REPORTING_PERIOD))
if self._start_date:
return max(self._start_date, today.subtract(days=60))
return max(self._start_date, today.subtract(days=self.REPORTING_PERIOD))
return today

def stream_profile_slices(self, profile: Profile, stream_state: Mapping[str, Any]) -> Iterable[Mapping[str, Any]]:
start_date = self.get_start_date(profile, stream_state)
for report_date in self.get_date_range(start_date, profile.timezone):
yield {"profile": profile, self.cursor_field: report_date}

def stream_slices(
self, sync_mode: SyncMode, cursor_field: List[str] = None, stream_state: Mapping[str, Any] = None
) -> Iterable[Optional[Mapping[str, Any]]]:

stream_state = stream_state or {}
no_data = True

generators = [self.stream_profile_slices(profile, stream_state) for profile in self._profiles]
for _slice in iterate_one_by_one(*generators):
no_data = False
yield _slice

slices = []
for profile in self._profiles:
today = pendulum.today(tz=profile.timezone).date()
start_date = self.get_start_date(profile, stream_state)
for report_date in self.get_date_range(start_date, today):
slices.append({"profile": profile, self.cursor_field: report_date})
if not slices:
return [None]
return slices
if no_data:
yield None

def get_updated_state(self, current_stream_state: Dict[str, Any], latest_data: Mapping[str, Any]) -> Mapping[str, Any]:
profileId = str(latest_data["profileId"])
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
#
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#


def iterate_one_by_one(*iterables):
iterables = list(iterables)
while iterables:
iterable = iterables.pop(0)
try:
yield next(iterable)
except StopIteration:
pass
else:
iterables.append(iterable)
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

import re
from base64 import b64decode
from datetime import timedelta
from functools import partial
from unittest import mock

import pytest
Expand Down Expand Up @@ -301,7 +303,7 @@ def __call__(self, request):
def test_display_report_stream_slices_full_refresh(config):
profiles = make_profiles()
stream = SponsoredDisplayReportStream(config, profiles, authenticator=mock.MagicMock())
slices = stream.stream_slices(SyncMode.full_refresh, cursor_field=stream.cursor_field)
slices = list(stream.stream_slices(SyncMode.full_refresh, cursor_field=stream.cursor_field))
assert slices == [{"profile": profiles[0], "reportDate": "20210729"}]


Expand All @@ -311,7 +313,7 @@ def test_display_report_stream_slices_incremental(config):
profiles = make_profiles()
stream = SponsoredDisplayReportStream(config, profiles, authenticator=mock.MagicMock())
stream_state = {str(profiles[0].profileId): {"reportDate": "20210725"}}
slices = stream.stream_slices(SyncMode.incremental, cursor_field=stream.cursor_field, stream_state=stream_state)
slices = list(stream.stream_slices(SyncMode.incremental, cursor_field=stream.cursor_field, stream_state=stream_state))
assert slices == [
{"profile": profiles[0], "reportDate": "20210725"},
{"profile": profiles[0], "reportDate": "20210726"},
Expand All @@ -321,13 +323,13 @@ def test_display_report_stream_slices_incremental(config):
]

stream_state = {str(profiles[0].profileId): {"reportDate": "20210730"}}
slices = stream.stream_slices(SyncMode.incremental, cursor_field=stream.cursor_field, stream_state=stream_state)
slices = list(stream.stream_slices(SyncMode.incremental, cursor_field=stream.cursor_field, stream_state=stream_state))
assert slices == [None]

slices = stream.stream_slices(SyncMode.incremental, cursor_field=stream.cursor_field, stream_state={})
slices = list(stream.stream_slices(SyncMode.incremental, cursor_field=stream.cursor_field, stream_state={}))
assert slices == [{"profile": profiles[0], "reportDate": "20210729"}]

slices = stream.stream_slices(SyncMode.incremental, cursor_field=None, stream_state={})
slices = list(stream.stream_slices(SyncMode.incremental, cursor_field=None, stream_state={}))
assert slices == [{"profile": profiles[0], "reportDate": "20210729"}]


Expand Down Expand Up @@ -358,5 +360,56 @@ def test_stream_slices_different_timezones(config):
profile1 = Profile(profileId=1, timezone="America/Los_Angeles", accountInfo=AccountInfo(marketplaceStringId="", id="", type="seller"))
profile2 = Profile(profileId=2, timezone="UTC", accountInfo=AccountInfo(marketplaceStringId="", id="", type="seller"))
stream = SponsoredProductsReportStream(config, [profile1, profile2], authenticator=mock.MagicMock())
slices = stream.stream_slices(SyncMode.incremental, cursor_field=stream.cursor_field, stream_state={})
slices = list(stream.stream_slices(SyncMode.incremental, cursor_field=stream.cursor_field, stream_state={}))
assert slices == [{"profile": profile1, "reportDate": "20210731"}, {"profile": profile2, "reportDate": "20210801"}]


def test_stream_slices_lazy_evaluation(config):
with freeze_time("2022-06-01T23:50:00+00:00") as frozen_datetime:
config["start_date"] = "2021-05-10"
profile1 = Profile(profileId=1, timezone="UTC", accountInfo=AccountInfo(marketplaceStringId="", id="", type="seller"))
profile2 = Profile(profileId=2, timezone="UTC", accountInfo=AccountInfo(marketplaceStringId="", id="", type="seller"))

stream = SponsoredProductsReportStream(config, [profile1, profile2], authenticator=mock.MagicMock())
stream.REPORTING_PERIOD = 5

slices = []
for _slice in stream.stream_slices(SyncMode.incremental, cursor_field=stream.cursor_field):
slices.append(_slice)
frozen_datetime.tick(delta=timedelta(minutes=10))

assert slices == [
{"profile": profile1, "reportDate": "20220527"},
{"profile": profile2, "reportDate": "20220528"},
{"profile": profile1, "reportDate": "20220528"},
{"profile": profile2, "reportDate": "20220529"},
{"profile": profile1, "reportDate": "20220529"},
{"profile": profile2, "reportDate": "20220530"},
{"profile": profile1, "reportDate": "20220530"},
{"profile": profile2, "reportDate": "20220531"},
{"profile": profile1, "reportDate": "20220531"},
{"profile": profile2, "reportDate": "20220601"},
{"profile": profile1, "reportDate": "20220601"},
{"profile": profile2, "reportDate": "20220602"},
{"profile": profile1, "reportDate": "20220602"},
]


def test_get_date_range_lazy_evaluation():
get_date_range = partial(SponsoredProductsReportStream.get_date_range, SponsoredProductsReportStream)

with freeze_time("2022-06-01T12:00:00+00:00") as frozen_datetime:
date_range = list(get_date_range(start_date=Date(2022, 5, 29), timezone="UTC"))
assert date_range == ["20220529", "20220530", "20220531", "20220601"]

date_range = list(get_date_range(start_date=Date(2022, 6, 1), timezone="UTC"))
assert date_range == ["20220601"]

date_range = list(get_date_range(start_date=Date(2022, 6, 2), timezone="UTC"))
assert date_range == []

date_range = []
for date in get_date_range(start_date=Date(2022, 5, 29), timezone="UTC"):
date_range.append(date)
frozen_datetime.tick(delta=timedelta(hours=3))
assert date_range == ["20220529", "20220530", "20220531", "20220601", "20220602"]
1 change: 1 addition & 0 deletions docs/integrations/sources/amazon-ads.md
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ Information about expected report generation waiting time you may find [here](ht

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-----------------------------------------------------------|:------------------------------------------------------------------------------------------------------------------|
| 0.1.14 | 2022-08-15 | [15637](https://github.com/airbytehq/airbyte/pull/15637) | Generate slices by lazy evaluation |
| 0.1.12 | 2022-08-09 | [15469](https://github.com/airbytehq/airbyte/pull/15469) | Define primary_key for all report streams |
| 0.1.11 | 2022-07-28 | [15031](https://github.com/airbytehq/airbyte/pull/15031) | Improve report streams date-range generation |
| 0.1.10 | 2022-07-26 | [15042](https://github.com/airbytehq/airbyte/pull/15042) | Update `additionalProperties` field to true from schemas |
Expand Down

0 comments on commit b21b387

Please sign in to comment.