From ddb997f374c00e638f5aea80b8d13eb563c22f3c Mon Sep 17 00:00:00 2001 From: Baz Date: Thu, 28 Apr 2022 14:57:50 +0300 Subject: [PATCH] =?UTF-8?q?=F0=9F=90=9B=20=20Source=20Amplitude:=20add=20e?= =?UTF-8?q?rror=20descriptions=20and=20fix=20`events`=20stream=20fail=20on?= =?UTF-8?q?=20404=20(#12430)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../resources/seed/source_definitions.yaml | 2 +- .../src/main/resources/seed/source_specs.yaml | 2 +- .../connectors/source-amplitude/Dockerfile | 2 +- .../source-amplitude/source_amplitude/api.py | 71 ++++++++++++------- .../source_amplitude/errors.py | 34 +++++++++ .../source-amplitude/unit_tests/unit_test.py | 21 ++++-- docs/integrations/sources/amplitude.md | 1 + 7 files changed, 102 insertions(+), 31 deletions(-) create mode 100644 airbyte-integrations/connectors/source-amplitude/source_amplitude/errors.py diff --git a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml index 33756fb40746ed..9265e1656a75dc 100644 --- a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml @@ -35,7 +35,7 @@ - name: Amplitude sourceDefinitionId: fa9f58c6-2d03-4237-aaa4-07d75e0c1396 dockerRepository: airbyte/source-amplitude - dockerImageTag: 0.1.4 + dockerImageTag: 0.1.5 documentationUrl: https://docs.airbyte.io/integrations/sources/amplitude icon: amplitude.svg sourceType: api diff --git a/airbyte-config/init/src/main/resources/seed/source_specs.yaml b/airbyte-config/init/src/main/resources/seed/source_specs.yaml index f1c4be36edefa1..e18e005985dcdf 100644 --- a/airbyte-config/init/src/main/resources/seed/source_specs.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_specs.yaml @@ -476,7 +476,7 @@ supportsNormalization: false supportsDBT: false supported_destination_sync_modes: [] -- dockerImage: "airbyte/source-amplitude:0.1.4" +- dockerImage: "airbyte/source-amplitude:0.1.5" spec: documentationUrl: "https://docs.airbyte.io/integrations/sources/amplitude" connectionSpecification: diff --git a/airbyte-integrations/connectors/source-amplitude/Dockerfile b/airbyte-integrations/connectors/source-amplitude/Dockerfile index 210e952009ec75..f2cad066bbc678 100644 --- a/airbyte-integrations/connectors/source-amplitude/Dockerfile +++ b/airbyte-integrations/connectors/source-amplitude/Dockerfile @@ -12,5 +12,5 @@ RUN pip install . ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py" ENTRYPOINT ["python", "/airbyte/integration_code/main.py"] -LABEL io.airbyte.version=0.1.4 +LABEL io.airbyte.version=0.1.5 LABEL io.airbyte.name=airbyte/source-amplitude diff --git a/airbyte-integrations/connectors/source-amplitude/source_amplitude/api.py b/airbyte-integrations/connectors/source-amplitude/source_amplitude/api.py index f5e3e68a3f229c..0d2908608a5e35 100644 --- a/airbyte-integrations/connectors/source-amplitude/source_amplitude/api.py +++ b/airbyte-integrations/connectors/source-amplitude/source_amplitude/api.py @@ -17,6 +17,8 @@ from airbyte_cdk.models import SyncMode from airbyte_cdk.sources.streams.http import HttpStream +from .errors import HTTP_ERROR_CODES, error_msg_from_status + class AmplitudeStream(HttpStream, ABC): @@ -27,8 +29,12 @@ def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, return None def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]: - response_data = response.json() - yield from response_data.get(self.name, []) + status = response.status_code + if status in HTTP_ERROR_CODES.keys(): + error_msg_from_status(status) + yield from [] + else: + yield from response.json().get(self.data_field, []) def path(self, **kwargs) -> str: return f"{self.api_version}/{self.name}" @@ -37,14 +43,12 @@ def path(self, **kwargs) -> str: class Cohorts(AmplitudeStream): primary_key = "id" api_version = 3 + data_field = "cohorts" class Annotations(AmplitudeStream): primary_key = "id" - - def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]: - response_data = response.json() - yield from response_data.get("data", []) + data_field = "data" class IncrementalAmplitudeStream(AmplitudeStream, ABC): @@ -124,6 +128,22 @@ def _parse_zip_file(self, zip_file: IO[bytes]) -> Iterable[Mapping]: for record in file: yield json.loads(record) + def stream_slices(self, stream_state: Mapping[str, Any] = None, **kwargs) -> Iterable[Optional[Mapping[str, Any]]]: + slices = [] + start = self._start_date + if stream_state: + start = pendulum.parse(stream_state.get(self.cursor_field)) + end = pendulum.now() + while start <= end: + slices.append( + { + "start": start.strftime(self.date_template), + "end": self._get_end_date(start).strftime(self.date_template), + } + ) + start = start.add(**self.time_interval) + return slices + def read_records( self, sync_mode: SyncMode, @@ -132,34 +152,35 @@ def read_records( stream_state: Mapping[str, Any] = None, ) -> Iterable[Mapping[str, Any]]: stream_state = stream_state or {} - params = self.request_params(stream_state=stream_state, stream_slice=stream_slice, next_page_token=None) - # API returns data only when requested with a difference between 'start' and 'end' of 6 or more hours. - if pendulum.parse(params["start"]).add(hours=6) > pendulum.parse(params["end"]): - return [] + start = pendulum.parse(stream_slice["start"]).add(hours=6) + end = pendulum.parse(stream_slice["end"]) + if start > end: + yield from [] + # sometimes the API throws a 404 error for not obvious reasons, we have to handle it and log it. # for example, if there is no data from the specified time period, a 404 exception is thrown # https://developers.amplitude.com/docs/export-api#status-codes + try: + self.logger.info(f"Fetching {self.name} time range: {start.strftime('%Y-%m-%d')} - {end.strftime('%Y-%m-%d')}") yield from super().read_records(sync_mode, cursor_field, stream_slice, stream_state) except requests.exceptions.HTTPError as error: - if error.response.status_code == 404: - self.logger.warn(f"Error during syncing {self.name} stream - {error}") - return [] + status = error.response.status_code + if status in HTTP_ERROR_CODES.keys(): + error_msg_from_status(status) + yield from [] else: + self.logger.error(f"Error during syncing {self.name} stream - {error}") raise - def request_params( - self, stream_state: Mapping[str, Any], next_page_token: Mapping[str, Any] = None, **kwargs - ) -> MutableMapping[str, Any]: - params = super().request_params(stream_state=stream_state, next_page_token=next_page_token, **kwargs) - if stream_state or next_page_token: - params["start"] = pendulum.parse(params["start"]).add(hours=1).strftime(self.date_template) + def request_params(self, stream_slice: Mapping[str, Any], **kwargs) -> MutableMapping[str, Any]: + params = self.base_params + params["start"] = pendulum.parse(stream_slice["start"]).strftime(self.date_template) + params["end"] = pendulum.parse(stream_slice["end"]).strftime(self.date_template) return params - def path( - self, stream_state: Mapping[str, Any] = None, stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None - ) -> str: + def path(self, **kwargs) -> str: return f"{self.api_version}/export" @@ -168,9 +189,10 @@ class ActiveUsers(IncrementalAmplitudeStream): name = "active_users" primary_key = "date" time_interval = {"months": 1} + data_field = "data" def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]: - response_data = response.json().get("data", []) + response_data = response.json().get(self.data_field, []) if response_data: series = list(map(list, zip(*response_data["series"]))) for i, date in enumerate(response_data["xValues"]): @@ -184,9 +206,10 @@ class AverageSessionLength(IncrementalAmplitudeStream): name = "average_session_length" primary_key = "date" time_interval = {"days": 15} + data_field = "data" def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]: - response_data = response.json().get("data", []) + response_data = response.json().get(self.data_field, []) if response_data: # From the Amplitude documentation it follows that "series" is an array with one element which is itself # an array that contains the average session length for each day. diff --git a/airbyte-integrations/connectors/source-amplitude/source_amplitude/errors.py b/airbyte-integrations/connectors/source-amplitude/source_amplitude/errors.py new file mode 100644 index 00000000000000..037dd7f6401bdd --- /dev/null +++ b/airbyte-integrations/connectors/source-amplitude/source_amplitude/errors.py @@ -0,0 +1,34 @@ +# +# Copyright (c) 2021 Airbyte, Inc., all rights reserved. +# + +import logging + +LOGGER = logging.getLogger("airbyte") + +HTTP_ERROR_CODES = { + 400: { + "msg": "The file size of the exported data is too large. Shorten the time ranges and try again. The limit size is 4GB.", + "lvl": "ERROR", + }, + 404: { + "msg": "No data collected", + "lvl": "WARN", + }, + 504: { + "msg": "The amount of data is large causing a timeout. For large amounts of data, the Amazon S3 destination is recommended.", + "lvl": "ERROR", + }, +} + + +def error_msg_from_status(status: int = None): + if status: + level = HTTP_ERROR_CODES[status]["lvl"] + message = HTTP_ERROR_CODES[status]["msg"] + if level == "ERROR": + LOGGER.error(message) + elif level == "WARN": + LOGGER.warn(message) + else: + LOGGER.error(f"Unknown error occured: code {status}") diff --git a/airbyte-integrations/connectors/source-amplitude/unit_tests/unit_test.py b/airbyte-integrations/connectors/source-amplitude/unit_tests/unit_test.py index 8eb316ca38e35e..f691390f40ec15 100755 --- a/airbyte-integrations/connectors/source-amplitude/unit_tests/unit_test.py +++ b/airbyte-integrations/connectors/source-amplitude/unit_tests/unit_test.py @@ -2,9 +2,9 @@ # Copyright (c) 2021 Airbyte, Inc., all rights reserved. # -import airbyte_cdk.models import pytest import requests +from airbyte_cdk.models import SyncMode from source_amplitude.api import Events @@ -13,16 +13,29 @@ def __init__(self, status_code): self.status_code = status_code -def test_http_error_handler(mocker): +def test_incremental_http_error_handler(mocker): stream = Events(start_date="2021-01-01T00:00:00Z") + stream_slice = stream.stream_slices()[0] mock_response = MockRequest(404) send_request_mocker = mocker.patch.object(stream, "_send_request", side_effect=requests.HTTPError(**{"response": mock_response})) with pytest.raises(StopIteration): - result = next(stream.read_records(sync_mode=airbyte_cdk.models.SyncMode.full_refresh)) + result = next(stream.read_records(sync_mode=SyncMode.full_refresh, stream_slice=stream_slice)) assert result == [] mock_response = MockRequest(403) send_request_mocker.side_effect = requests.HTTPError(**{"response": mock_response}) with pytest.raises(requests.exceptions.HTTPError): - next(stream.read_records(sync_mode=airbyte_cdk.models.SyncMode.full_refresh)) + next(stream.read_records(sync_mode=SyncMode.full_refresh, stream_slice=stream_slice)) + + mock_response = MockRequest(400) + send_request_mocker.side_effect = requests.HTTPError(**{"response": mock_response}) + with pytest.raises(StopIteration): + result = next(stream.read_records(sync_mode=SyncMode.full_refresh, stream_slice=stream_slice)) + assert result == [] + + mock_response = MockRequest(504) + send_request_mocker.side_effect = requests.HTTPError(**{"response": mock_response}) + with pytest.raises(StopIteration): + result = next(stream.read_records(sync_mode=SyncMode.full_refresh, stream_slice=stream_slice)) + assert result == [] diff --git a/docs/integrations/sources/amplitude.md b/docs/integrations/sources/amplitude.md index 8039e147b80e8d..93384fd00613d2 100644 --- a/docs/integrations/sources/amplitude.md +++ b/docs/integrations/sources/amplitude.md @@ -45,6 +45,7 @@ Please read [How to get your API key and Secret key](https://help.amplitude.com/ | Version | Date | Pull Request | Subject | | :------ | :--------- | :----------------------------------------------------- | :------ | +| 0.1.5 | 2022-04-28 | [12430](https://github.com/airbytehq/airbyte/pull/12430) | Added HTTP error descriptions and fixed `Events` stream fail caused by `404` HTTP Error | | 0.1.4 | 2021-12-23 | [8434](https://github.com/airbytehq/airbyte/pull/8434) | Update fields in source-connectors specifications | | 0.1.3 | 2021-10-12 | [6375](https://github.com/airbytehq/airbyte/pull/6375) | Log Transient 404 Error in Events stream | | 0.1.2 | 2021-09-21 | [6353](https://github.com/airbytehq/airbyte/pull/6353) | Correct output schemas on cohorts, events, active\_users, and average\_session\_lengths streams |