Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🐛 Source Amazon Ads - Generate slices by lazy evaluation #15637

Merged
merged 12 commits into from
Aug 16, 2022
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
- name: Amazon Ads
sourceDefinitionId: c6b0a29e-1da9-4512-9002-7bfd0cba2246
dockerRepository: airbyte/source-amazon-ads
dockerImageTag: 0.1.12
dockerImageTag: 0.1.14
documentationUrl: https://docs.airbyte.io/integrations/sources/amazon-ads
icon: amazonads.svg
sourceType: api
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-amazon-ads:0.1.12"
- dockerImage: "airbyte/source-amazon-ads:0.1.14"
spec:
documentationUrl: "https://docs.airbyte.com/integrations/sources/amazon-ads"
connectionSpecification:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,5 @@ RUN pip install .
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.1.12
LABEL io.airbyte.version=0.1.14
LABEL io.airbyte.name=airbyte/source-amazon-ads
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-amazon-ads/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
"pytest-mock~=3.7.0",
"jsonschema~=3.2.0",
"responses~=0.13.3",
"freezegun~=1.1.0",
"freezegun~=1.2.0",
]

setup(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from pydantic import BaseModel
from source_amazon_ads.schemas import CatalogModel, MetricsReport, Profile
from source_amazon_ads.streams.common import BasicAmazonAdsStream
from source_amazon_ads.utils import iterate_one_by_one

logger = AirbyteLogger()

Expand Down Expand Up @@ -92,6 +93,8 @@ class ReportStream(BasicAmazonAdsStream, ABC):
primary_key = ["profileId", "recordType", "reportDate", "updatedAt"]
# Amazon ads updates the data for the next 3 days
LOOK_BACK_WINDOW = 3
# https://advertising.amazon.com/API/docs/en-us/reporting/v2/faq#what-is-the-available-report-history-for-the-version-2-reporting-api
REPORTING_PERIOD = 60
# (Service limits section)
# Format used to specify metric generation date over Amazon Ads API.
REPORT_DATE_FORMAT = "YYYYMMDD"
Expand Down Expand Up @@ -265,35 +268,42 @@ def _send_http_request(self, url: str, profile_id: int, json: dict = None):
raise TooManyRequests()
return response

def get_date_range(self, start_date: Date, end_date: Date) -> Iterable[str]:
for days in range((end_date - start_date).days + 1):
yield start_date.add(days=days).format(ReportStream.REPORT_DATE_FORMAT)
def get_date_range(self, start_date: Date, timezone: str) -> Iterable[str]:
while True:
if start_date > pendulum.today(tz=timezone).date():
break
yield start_date.format(self.REPORT_DATE_FORMAT)
start_date = start_date.add(days=1)

def get_start_date(self, profile: Profile, stream_state: Mapping[str, Any]) -> Date:
today = pendulum.today(tz=profile.timezone).date()
start_date = stream_state.get(str(profile.profileId), {}).get(self.cursor_field)
if start_date:
start_date = pendulum.from_format(start_date, self.REPORT_DATE_FORMAT).date()
return max(start_date, today.subtract(days=60))
return max(start_date, today.subtract(days=self.REPORTING_PERIOD))
if self._start_date:
return max(self._start_date, today.subtract(days=60))
return max(self._start_date, today.subtract(days=self.REPORTING_PERIOD))
return today

def stream_profile_slices(self, profile: Profile, stream_state: Mapping[str, Any]) -> Iterable[Mapping[str, Any]]:
start_date = self.get_start_date(profile, stream_state)
for report_date in self.get_date_range(start_date, profile.timezone):
yield {"profile": profile, self.cursor_field: report_date}

def stream_slices(
self, sync_mode: SyncMode, cursor_field: List[str] = None, stream_state: Mapping[str, Any] = None
) -> Iterable[Optional[Mapping[str, Any]]]:

stream_state = stream_state or {}
no_data = True

generators = [self.stream_profile_slices(profile, stream_state) for profile in self._profiles]
for _slice in iterate_one_by_one(*generators):
no_data = False
yield _slice

slices = []
for profile in self._profiles:
today = pendulum.today(tz=profile.timezone).date()
start_date = self.get_start_date(profile, stream_state)
for report_date in self.get_date_range(start_date, today):
slices.append({"profile": profile, self.cursor_field: report_date})
if not slices:
return [None]
return slices
if no_data:
yield None

def get_updated_state(self, current_stream_state: Dict[str, Any], latest_data: Mapping[str, Any]) -> Mapping[str, Any]:
profileId = str(latest_data["profileId"])
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
#
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#


def iterate_one_by_one(*iterables):
iterables = list(iterables)
while iterables:
iterable = iterables.pop(0)
try:
yield next(iterable)
except StopIteration:
pass
else:
iterables.append(iterable)
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

import re
from base64 import b64decode
from datetime import timedelta
from functools import partial
from unittest import mock

import pytest
Expand Down Expand Up @@ -301,7 +303,7 @@ def __call__(self, request):
def test_display_report_stream_slices_full_refresh(config):
profiles = make_profiles()
stream = SponsoredDisplayReportStream(config, profiles, authenticator=mock.MagicMock())
slices = stream.stream_slices(SyncMode.full_refresh, cursor_field=stream.cursor_field)
slices = list(stream.stream_slices(SyncMode.full_refresh, cursor_field=stream.cursor_field))
assert slices == [{"profile": profiles[0], "reportDate": "20210729"}]


Expand All @@ -311,7 +313,7 @@ def test_display_report_stream_slices_incremental(config):
profiles = make_profiles()
stream = SponsoredDisplayReportStream(config, profiles, authenticator=mock.MagicMock())
stream_state = {str(profiles[0].profileId): {"reportDate": "20210725"}}
slices = stream.stream_slices(SyncMode.incremental, cursor_field=stream.cursor_field, stream_state=stream_state)
slices = list(stream.stream_slices(SyncMode.incremental, cursor_field=stream.cursor_field, stream_state=stream_state))
assert slices == [
{"profile": profiles[0], "reportDate": "20210725"},
{"profile": profiles[0], "reportDate": "20210726"},
Expand All @@ -321,13 +323,13 @@ def test_display_report_stream_slices_incremental(config):
]

stream_state = {str(profiles[0].profileId): {"reportDate": "20210730"}}
slices = stream.stream_slices(SyncMode.incremental, cursor_field=stream.cursor_field, stream_state=stream_state)
slices = list(stream.stream_slices(SyncMode.incremental, cursor_field=stream.cursor_field, stream_state=stream_state))
assert slices == [None]

slices = stream.stream_slices(SyncMode.incremental, cursor_field=stream.cursor_field, stream_state={})
slices = list(stream.stream_slices(SyncMode.incremental, cursor_field=stream.cursor_field, stream_state={}))
assert slices == [{"profile": profiles[0], "reportDate": "20210729"}]

slices = stream.stream_slices(SyncMode.incremental, cursor_field=None, stream_state={})
slices = list(stream.stream_slices(SyncMode.incremental, cursor_field=None, stream_state={}))
assert slices == [{"profile": profiles[0], "reportDate": "20210729"}]


Expand Down Expand Up @@ -358,5 +360,56 @@ def test_stream_slices_different_timezones(config):
profile1 = Profile(profileId=1, timezone="America/Los_Angeles", accountInfo=AccountInfo(marketplaceStringId="", id="", type="seller"))
profile2 = Profile(profileId=2, timezone="UTC", accountInfo=AccountInfo(marketplaceStringId="", id="", type="seller"))
stream = SponsoredProductsReportStream(config, [profile1, profile2], authenticator=mock.MagicMock())
slices = stream.stream_slices(SyncMode.incremental, cursor_field=stream.cursor_field, stream_state={})
slices = list(stream.stream_slices(SyncMode.incremental, cursor_field=stream.cursor_field, stream_state={}))
assert slices == [{"profile": profile1, "reportDate": "20210731"}, {"profile": profile2, "reportDate": "20210801"}]


def test_stream_slices_lazy_evaluation(config):
with freeze_time("2022-06-01T23:50:00+00:00") as frozen_datetime:
config["start_date"] = "2021-05-10"
profile1 = Profile(profileId=1, timezone="UTC", accountInfo=AccountInfo(marketplaceStringId="", id="", type="seller"))
profile2 = Profile(profileId=2, timezone="UTC", accountInfo=AccountInfo(marketplaceStringId="", id="", type="seller"))

stream = SponsoredProductsReportStream(config, [profile1, profile2], authenticator=mock.MagicMock())
stream.REPORTING_PERIOD = 5

slices = []
for _slice in stream.stream_slices(SyncMode.incremental, cursor_field=stream.cursor_field):
slices.append(_slice)
frozen_datetime.tick(delta=timedelta(minutes=10))

assert slices == [
{"profile": profile1, "reportDate": "20220527"},
{"profile": profile2, "reportDate": "20220528"},
{"profile": profile1, "reportDate": "20220528"},
{"profile": profile2, "reportDate": "20220529"},
{"profile": profile1, "reportDate": "20220529"},
{"profile": profile2, "reportDate": "20220530"},
{"profile": profile1, "reportDate": "20220530"},
{"profile": profile2, "reportDate": "20220531"},
{"profile": profile1, "reportDate": "20220531"},
{"profile": profile2, "reportDate": "20220601"},
{"profile": profile1, "reportDate": "20220601"},
{"profile": profile2, "reportDate": "20220602"},
{"profile": profile1, "reportDate": "20220602"},
]


def test_get_date_range_lazy_evaluation():
get_date_range = partial(SponsoredProductsReportStream.get_date_range, SponsoredProductsReportStream)

with freeze_time("2022-06-01T12:00:00+00:00") as frozen_datetime:
date_range = list(get_date_range(start_date=Date(2022, 5, 29), timezone="UTC"))
assert date_range == ["20220529", "20220530", "20220531", "20220601"]

date_range = list(get_date_range(start_date=Date(2022, 6, 1), timezone="UTC"))
assert date_range == ["20220601"]

date_range = list(get_date_range(start_date=Date(2022, 6, 2), timezone="UTC"))
assert date_range == []

date_range = []
for date in get_date_range(start_date=Date(2022, 5, 29), timezone="UTC"):
date_range.append(date)
frozen_datetime.tick(delta=timedelta(hours=3))
assert date_range == ["20220529", "20220530", "20220531", "20220601", "20220602"]
1 change: 1 addition & 0 deletions docs/integrations/sources/amazon-ads.md
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ Information about expected report generation waiting time you may find [here](ht

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-----------------------------------------------------------|:------------------------------------------------------------------------------------------------------------------|
| 0.1.14 | 2022-08-15 | [15637](https://github.com/airbytehq/airbyte/pull/15637) | Generate slices by lazy evaluation |
| 0.1.12 | 2022-08-09 | [15469](https://github.com/airbytehq/airbyte/pull/15469) | Define primary_key for all report streams |
| 0.1.11 | 2022-07-28 | [15031](https://github.com/airbytehq/airbyte/pull/15031) | Improve report streams date-range generation |
| 0.1.10 | 2022-07-26 | [15042](https://github.com/airbytehq/airbyte/pull/15042) | Update `additionalProperties` field to true from schemas |
Expand Down