From 6f70b6b4238b110ab0501e42348c790fa08cbcf6 Mon Sep 17 00:00:00 2001 From: Dmytro Date: Wed, 10 Nov 2021 11:43:45 +0200 Subject: [PATCH] Iterable: split email send stream into slices. (#7780) --- .../2e875208-0c0b-4ee4-9e92-1cb3156ea799.json | 2 +- .../resources/seed/source_definitions.yaml | 2 +- .../connectors/source-iterable/Dockerfile | 2 +- .../connectors/source-iterable/setup.py | 4 +- .../source-iterable/source_iterable/api.py | 123 +---------- .../source_iterable/iterable_streams.py | 204 ++++++++++++++++++ .../unit_tests/test_exports_stream.py | 64 +++++- .../source-iterable/unit_tests/test_source.py | 51 +++++ docs/integrations/sources/iterable.md | 1 + 9 files changed, 329 insertions(+), 124 deletions(-) create mode 100644 airbyte-integrations/connectors/source-iterable/source_iterable/iterable_streams.py create mode 100644 airbyte-integrations/connectors/source-iterable/unit_tests/test_source.py diff --git a/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/2e875208-0c0b-4ee4-9e92-1cb3156ea799.json b/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/2e875208-0c0b-4ee4-9e92-1cb3156ea799.json index 6cb4f103e6d2..6dd7e7ae1c90 100644 --- a/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/2e875208-0c0b-4ee4-9e92-1cb3156ea799.json +++ b/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/2e875208-0c0b-4ee4-9e92-1cb3156ea799.json @@ -2,6 +2,6 @@ "sourceDefinitionId": "2e875208-0c0b-4ee4-9e92-1cb3156ea799", "name": "Iterable", "dockerRepository": "airbyte/source-iterable", - "dockerImageTag": "0.1.10", + "dockerImageTag": "0.1.12", "documentationUrl": "https://docs.airbyte.io/integrations/sources/iterable" } diff --git a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml index 9108faaf1716..27c850214c46 100644 --- a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml @@ -268,7 +268,7 @@ - name: Iterable sourceDefinitionId: 2e875208-0c0b-4ee4-9e92-1cb3156ea799 dockerRepository: airbyte/source-iterable - dockerImageTag: 0.1.11 + dockerImageTag: 0.1.12 documentationUrl: https://docs.airbyte.io/integrations/sources/iterable sourceType: api - name: Jira diff --git a/airbyte-integrations/connectors/source-iterable/Dockerfile b/airbyte-integrations/connectors/source-iterable/Dockerfile index b24c983735c7..7d626d2e0a50 100644 --- a/airbyte-integrations/connectors/source-iterable/Dockerfile +++ b/airbyte-integrations/connectors/source-iterable/Dockerfile @@ -12,5 +12,5 @@ RUN pip install . ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py" ENTRYPOINT ["python", "/airbyte/integration_code/main.py"] -LABEL io.airbyte.version=0.1.11 +LABEL io.airbyte.version=0.1.12 LABEL io.airbyte.name=airbyte/source-iterable diff --git a/airbyte-integrations/connectors/source-iterable/setup.py b/airbyte-integrations/connectors/source-iterable/setup.py index 893e468fb733..d9ad8dad2653 100644 --- a/airbyte-integrations/connectors/source-iterable/setup.py +++ b/airbyte-integrations/connectors/source-iterable/setup.py @@ -7,11 +7,11 @@ MAIN_REQUIREMENTS = [ "airbyte-cdk~=0.1", - "pendulum~=1.2", + "pendulum~=2.1.2", "requests~=2.25", ] -TEST_REQUIREMENTS = ["pytest~=6.1", "responses==0.13.3"] +TEST_REQUIREMENTS = ["pytest~=6.1", "responses==0.13.3", "freezegun==1.1.0"] setup( diff --git a/airbyte-integrations/connectors/source-iterable/source_iterable/api.py b/airbyte-integrations/connectors/source-iterable/source_iterable/api.py index d3d9eb77870d..93e49e37fe13 100755 --- a/airbyte-integrations/connectors/source-iterable/source_iterable/api.py +++ b/airbyte-integrations/connectors/source-iterable/source_iterable/api.py @@ -3,129 +3,18 @@ # import csv -import json import urllib.parse as urlparse -from abc import ABC, abstractmethod from io import StringIO -from typing import Any, Dict, Iterable, List, Mapping, MutableMapping, Optional, Union +from typing import Any, Dict, Iterable, List, Mapping, MutableMapping, Optional -import pendulum import requests from airbyte_cdk.models import SyncMode -from airbyte_cdk.sources.streams.http import HttpStream +from source_iterable.iterable_streams import IterableExportStream, IterableExportStreamRanged, IterableStream EVENT_ROWS_LIMIT = 200 CAMPAIGNS_PER_REQUEST = 20 -class IterableStream(HttpStream, ABC): - - # Hardcode the value because it is not returned from the API - BACKOFF_TIME_CONSTANT = 10.0 - # define date-time fields with potential wrong format - - url_base = "https://api.iterable.com/api/" - primary_key = "id" - - def __init__(self, api_key, **kwargs): - super().__init__(**kwargs) - self._api_key = api_key - - @property - @abstractmethod - def data_field(self) -> str: - """ - :return: Default field name to get data from response - """ - - def backoff_time(self, response: requests.Response) -> Optional[float]: - return self.BACKOFF_TIME_CONSTANT - - def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]: - """ - Iterable API does not support pagination - """ - return None - - def request_params(self, **kwargs) -> MutableMapping[str, Any]: - return {"api_key": self._api_key} - - def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]: - response_json = response.json() - records = response_json.get(self.data_field, []) - - for record in records: - yield record - - -class IterableExportStream(IterableStream, ABC): - - cursor_field = "createdAt" - primary_key = None - - def __init__(self, start_date, **kwargs): - super().__init__(**kwargs) - self._start_date = pendulum.parse(start_date) - self.stream_params = {"dataTypeName": self.data_field} - - def path(self, **kwargs) -> str: - return "export/data.json" - - @staticmethod - def _field_to_datetime(value: Union[int, str]) -> pendulum.datetime: - if isinstance(value, int): - value = pendulum.from_timestamp(value / 1000.0) - elif isinstance(value, str): - value = pendulum.parse(value) - else: - raise ValueError(f"Unsupported type of datetime field {type(value)}") - return value - - def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) -> Mapping[str, Any]: - """ - Return the latest state by comparing the cursor value in the latest record with the stream's most recent state object - and returning an updated state object. - """ - latest_benchmark = latest_record[self.cursor_field] - if current_stream_state.get(self.cursor_field): - return {self.cursor_field: str(max(latest_benchmark, self._field_to_datetime(current_stream_state[self.cursor_field])))} - return {self.cursor_field: str(latest_benchmark)} - - def request_params(self, stream_state: Mapping[str, Any], **kwargs) -> MutableMapping[str, Any]: - - params = super().request_params(stream_state=stream_state) - start_datetime = self._start_date - if stream_state.get(self.cursor_field): - start_datetime = pendulum.parse(stream_state[self.cursor_field]) - - params.update( - {"startDateTime": start_datetime.strftime("%Y-%m-%d %H:%M:%S"), "endDateTime": pendulum.now().strftime("%Y-%m-%d %H:%M:%S")}, - **self.stream_params, - ) - return params - - def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]: - for obj in response.iter_lines(): - record = json.loads(obj) - record[self.cursor_field] = self._field_to_datetime(record[self.cursor_field]) - yield record - - def request_kwargs( - self, - stream_state: Mapping[str, Any], - stream_slice: Mapping[str, Any] = None, - next_page_token: Mapping[str, Any] = None, - ) -> Mapping[str, Any]: - """ - https://api.iterable.com/api/docs#export_exportDataJson - Sending those type of requests could download large piece of json - objects splitted with newline character. - Passing stream=True argument to requests.session.send method to avoid - loading whole analytics report content into memory. - """ - return {"stream": True} - - class Lists(IterableStream): data_field = "lists" @@ -257,7 +146,7 @@ class EmailBounce(IterableExportStream): data_field = "emailBounce" -class EmailClick(IterableExportStream): +class EmailClick(IterableExportStreamRanged): name = "email_click" data_field = "emailClick" @@ -267,17 +156,17 @@ class EmailComplaint(IterableExportStream): data_field = "emailComplaint" -class EmailOpen(IterableExportStream): +class EmailOpen(IterableExportStreamRanged): name = "email_open" data_field = "emailOpen" -class EmailSend(IterableExportStream): +class EmailSend(IterableExportStreamRanged): name = "email_send" data_field = "emailSend" -class EmailSendSkip(IterableExportStream): +class EmailSendSkip(IterableExportStreamRanged): name = "email_send_skip" data_field = "emailSendSkip" diff --git a/airbyte-integrations/connectors/source-iterable/source_iterable/iterable_streams.py b/airbyte-integrations/connectors/source-iterable/source_iterable/iterable_streams.py new file mode 100644 index 000000000000..a28ea9b6c5e3 --- /dev/null +++ b/airbyte-integrations/connectors/source-iterable/source_iterable/iterable_streams.py @@ -0,0 +1,204 @@ +# +# Copyright (c) 2021 Airbyte, Inc., all rights reserved. +# +import json +from abc import ABC, abstractmethod +from dataclasses import dataclass +from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Tuple, Union + +import pendulum +import requests +from airbyte_cdk.models import SyncMode +from airbyte_cdk.sources.streams.http import HttpStream +from pendulum.datetime import DateTime + + +@dataclass +class StreamSlice: + start_date: DateTime + end_date: DateTime + + +class IterableStream(HttpStream, ABC): + + # Hardcode the value because it is not returned from the API + BACKOFF_TIME_CONSTANT = 10.0 + # define date-time fields with potential wrong format + + url_base = "https://api.iterable.com/api/" + primary_key = "id" + + def __init__(self, api_key, **kwargs): + super().__init__(**kwargs) + self._api_key = api_key + + @property + @abstractmethod + def data_field(self) -> str: + """ + :return: Default field name to get data from response + """ + + def backoff_time(self, response: requests.Response) -> Optional[float]: + return self.BACKOFF_TIME_CONSTANT + + def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]: + """ + Iterable API does not support pagination + """ + return None + + def request_params(self, **kwargs) -> MutableMapping[str, Any]: + return {"api_key": self._api_key} + + def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]: + response_json = response.json() + records = response_json.get(self.data_field, []) + + for record in records: + yield record + + +class IterableExportStream(IterableStream, ABC): + + cursor_field = "createdAt" + primary_key = None + + def __init__(self, start_date, **kwargs): + super().__init__(**kwargs) + self._start_date = pendulum.parse(start_date) + self.stream_params = {"dataTypeName": self.data_field} + + def path(self, **kwargs) -> str: + return "export/data.json" + + def backoff_time(self, response: requests.Response) -> Optional[float]: + # Use default exponential backoff + return None + + # For python backoff package expo backoff delays calculated according to formula: + # delay = factor * base ** n where base is 2 + # With default factor equal to 5 and 5 retries delays would be 5, 10, 20, 40 and 80 seconds. + # For exports stream there is a limit of 4 requests per minute. + # Tune up factor and retries to send a lot of excessive requests before timeout exceed. + @property + def retry_factor(self) -> int: + return 20 + + # With factor 20 it woud be 20, 40, 80 and 160 seconds delays. + @property + def max_retries(self) -> Union[int, None]: + return 4 + + @staticmethod + def _field_to_datetime(value: Union[int, str]) -> pendulum.datetime: + if isinstance(value, int): + value = pendulum.from_timestamp(value / 1000.0) + elif isinstance(value, str): + value = pendulum.parse(value, strict=False) + else: + raise ValueError(f"Unsupported type of datetime field {type(value)}") + return value + + def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) -> Mapping[str, Any]: + """ + Return the latest state by comparing the cursor value in the latest record with the stream's most recent state object + and returning an updated state object. + """ + latest_benchmark = latest_record[self.cursor_field] + if current_stream_state.get(self.cursor_field): + return {self.cursor_field: str(max(latest_benchmark, self._field_to_datetime(current_stream_state[self.cursor_field])))} + return {self.cursor_field: str(latest_benchmark)} + + def request_params( + self, + stream_state: Mapping[str, Any], + stream_slice: StreamSlice, + next_page_token: Mapping[str, Any] = None, + ) -> MutableMapping[str, Any]: + + params = super().request_params(stream_state=stream_state) + params.update( + { + "startDateTime": stream_slice.start_date.strftime("%Y-%m-%d %H:%M:%S"), + "endDateTime": stream_slice.end_date.strftime("%Y-%m-%d %H:%M:%S"), + }, + **self.stream_params, + ) + return params + + def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]: + for obj in response.iter_lines(): + record = json.loads(obj) + record[self.cursor_field] = self._field_to_datetime(record[self.cursor_field]) + yield record + + def request_kwargs( + self, + stream_state: Mapping[str, Any], + stream_slice: Mapping[str, Any] = None, + next_page_token: Mapping[str, Any] = None, + ) -> Mapping[str, Any]: + """ + https://api.iterable.com/api/docs#export_exportDataJson + Sending those type of requests could download large piece of json + objects splitted with newline character. + Passing stream=True argument to requests.session.send method to avoid + loading whole analytics report content into memory. + """ + return {"stream": True} + + def get_start_date(self, stream_state: Mapping[str, Any]) -> DateTime: + stream_state = stream_state or {} + start_datetime = self._start_date + if stream_state.get(self.cursor_field): + start_datetime = pendulum.parse(stream_state[self.cursor_field]) + return start_datetime + + def stream_slices( + self, sync_mode: SyncMode, cursor_field: List[str] = None, stream_state: Mapping[str, Any] = None + ) -> Iterable[Optional[StreamSlice]]: + + start_datetime = self.get_start_date(stream_state) + return [StreamSlice(start_datetime, pendulum.now("UTC"))] + + +class IterableExportStreamRanged(IterableExportStream): + + RANGE_LENGTH_DAYS = 90 + + @staticmethod + def make_datetime_ranges(start: DateTime, end: DateTime, range_days: int) -> Iterable[Tuple[DateTime, DateTime]]: + """ + Generates list of ranges starting from start up to end date with duration of ranges_days. + Args: + start (DateTime): start of the range + end (DateTime): end of the range + range_days (int): Number in days to split subranges into. + + Returns: + List[Tuple[DateTime, DateTime]]: list of tuples with ranges. + + Each tuple contains two daytime variables: first is period start + and second is period end. + """ + if start > end: + return [] + + next_start = start + period = pendulum.Duration(days=range_days) + while next_start < end: + next_end = min(next_start + period, end) + yield next_start, next_end + next_start = next_end + + def stream_slices( + self, sync_mode: SyncMode, cursor_field: List[str] = None, stream_state: Mapping[str, Any] = None + ) -> Iterable[Optional[StreamSlice]]: + + start_datetime = self.get_start_date(stream_state) + + return ( + StreamSlice(start_date=start, end_date=end) + for start, end in self.make_datetime_ranges(start_datetime, pendulum.now("UTC"), self.RANGE_LENGTH_DAYS) + ) diff --git a/airbyte-integrations/connectors/source-iterable/unit_tests/test_exports_stream.py b/airbyte-integrations/connectors/source-iterable/unit_tests/test_exports_stream.py index 4f151b615515..32c5c0c3e49b 100644 --- a/airbyte-integrations/connectors/source-iterable/unit_tests/test_exports_stream.py +++ b/airbyte-integrations/connectors/source-iterable/unit_tests/test_exports_stream.py @@ -5,10 +5,12 @@ import json from unittest import mock +import pendulum import pytest import responses from airbyte_cdk.models import SyncMode from source_iterable.api import EmailSend +from source_iterable.iterable_streams import IterableExportStreamRanged, StreamSlice @pytest.fixture @@ -24,7 +26,8 @@ def session_mock(): def test_send_email_stream(session_mock): stream = EmailSend(start_date="2020", api_key="") - _ = list(stream.read_records(sync_mode=SyncMode.full_refresh, cursor_field=None, stream_slice=[], stream_state={})) + stream_slice = StreamSlice(start_date=pendulum.parse("2020"), end_date=pendulum.parse("2021")) + _ = list(stream.read_records(sync_mode=SyncMode.full_refresh, cursor_field=None, stream_slice=stream_slice, stream_state={})) assert session_mock.send.called send_args = session_mock.send.call_args[1] @@ -33,10 +36,67 @@ def test_send_email_stream(session_mock): @responses.activate def test_stream_correct(): + stream_slice = StreamSlice(start_date=pendulum.parse("2020"), end_date=pendulum.parse("2021")) record_js = {"createdAt": "2020"} NUMBER_OF_RECORDS = 10 ** 2 resp_body = "\n".join([json.dumps(record_js)] * NUMBER_OF_RECORDS) responses.add("GET", "https://api.iterable.com/api/export/data.json", body=resp_body) stream = EmailSend(start_date="2020", api_key="") - records = list(stream.read_records(sync_mode=SyncMode.full_refresh, cursor_field=None, stream_slice=[], stream_state={})) + records = list(stream.read_records(sync_mode=SyncMode.full_refresh, cursor_field=None, stream_slice=stream_slice, stream_state={})) assert len(records) == NUMBER_OF_RECORDS + + +@pytest.mark.parametrize( + "start_day,end_day,days,range", + [ + ( + "2020-01-01", + "2020-01-10", + 5, + [ + (pendulum.parse("2020-01-01"), pendulum.parse("2020-01-06")), + (pendulum.parse("2020-01-06"), pendulum.parse("2020-01-10")), + ], + ), + ( + "2020-01-01", + "2020-01-10 20:00:12", + 5, + [ + (pendulum.parse("2020-01-01"), pendulum.parse("2020-01-06")), + (pendulum.parse("2020-01-06"), pendulum.parse("2020-01-10 20:00:12")), + ], + ), + ( + "2020-01-01", + "2020-01-01 20:00:12", + 5, + [ + (pendulum.parse("2020-01-01"), pendulum.parse("2020-01-01 20:00:12")), + ], + ), + ( + "2020-01-01", + "2020-01-10", + 50, + [(pendulum.parse("2020-01-01"), pendulum.parse("2020-01-10"))], + ), + ( + "2020-01-01", + "2020-01-01", + 50, + [], + ), + ], +) +def test_datetime_ranges(start_day, end_day, days, range): + start_day = pendulum.parse(start_day) + end_day = pendulum.parse(end_day) + assert list(IterableExportStreamRanged.make_datetime_ranges(start_day, end_day, days)) == range + + +def test_datetime_wrong_range(): + start_day = pendulum.parse("2020") + end_day = pendulum.parse("2000") + with pytest.raises(StopIteration): + next(IterableExportStreamRanged.make_datetime_ranges(start_day, end_day, 1)) diff --git a/airbyte-integrations/connectors/source-iterable/unit_tests/test_source.py b/airbyte-integrations/connectors/source-iterable/unit_tests/test_source.py new file mode 100644 index 000000000000..ce3ed07a2f0b --- /dev/null +++ b/airbyte-integrations/connectors/source-iterable/unit_tests/test_source.py @@ -0,0 +1,51 @@ +# +# Copyright (c) 2021 Airbyte, Inc., all rights reserved. +# + +import json +import math +from unittest import mock + +import freezegun +import pendulum +import pytest +import responses +from airbyte_cdk.models import AirbyteStream, ConfiguredAirbyteCatalog, ConfiguredAirbyteStream +from source_iterable.api import EmailSend +from source_iterable.source import SourceIterable + + +@pytest.fixture +def response_mock(): + with responses.RequestsMock() as resp_mock: + record_js = {"createdAt": "2020"} + resp_body = "\n".join([json.dumps(record_js)]) + responses.add("GET", "https://api.iterable.com/api/export/data.json", body=resp_body) + yield resp_mock + + +@responses.activate +@freezegun.freeze_time("2021-01-01") +def test_stream_correct(response_mock): + TEST_START_DATE = "2020" + test_catalog = ConfiguredAirbyteCatalog( + streams=[ + ConfiguredAirbyteStream( + stream=AirbyteStream(name="email_send", json_schema={}), + sync_mode="full_refresh", + destination_sync_mode="append", + ) + ] + ) + chunks = math.ceil((pendulum.today() - pendulum.parse(TEST_START_DATE)).days / EmailSend.RANGE_LENGTH_DAYS) + + source = SourceIterable() + records = list( + source.read( + mock.MagicMock(), + {"start_date": TEST_START_DATE, "api_key": "api_key"}, + test_catalog, + None, + ) + ) + assert len(records) == chunks diff --git a/docs/integrations/sources/iterable.md b/docs/integrations/sources/iterable.md index 9b9830a8af0b..c3f608c9a5e1 100644 --- a/docs/integrations/sources/iterable.md +++ b/docs/integrations/sources/iterable.md @@ -58,6 +58,7 @@ Please read [How to find your API key](https://support.iterable.com/hc/en-us/art | Version | Date | Pull Request | Subject | | :------ | :-------- | :----- | :------ | +| `0.1.12` | 2021-11-09 | [7780](https://github.com/airbytehq/airbyte/pull/7780) | Split EmailSend stream into slices to fix premature connection close error | | `0.1.11` | 2021-11-03 | [7619](https://github.com/airbytehq/airbyte/pull/7619) | Bugfix type error while incrementally loading the `Templates` stream | | `0.1.10` | 2021-11-03 | [7591](https://github.com/airbytehq/airbyte/pull/7591) | Optimize export streams memory consumption for large requests | | `0.1.9` | 2021-10-06 | [5915](https://github.com/airbytehq/airbyte/pull/5915) | Enable campaign_metrics stream |