Skip to content

Commit

Permalink
🐛 Source Amplitude: add error descriptions and fix events stream fa…
Browse files Browse the repository at this point in the history
…il on 404 (#12430)
  • Loading branch information
bazarnov committed Apr 28, 2022
1 parent 42a58b0 commit fd4b71e
Show file tree
Hide file tree
Showing 7 changed files with 102 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
- name: Amplitude
sourceDefinitionId: fa9f58c6-2d03-4237-aaa4-07d75e0c1396
dockerRepository: airbyte/source-amplitude
dockerImageTag: 0.1.4
dockerImageTag: 0.1.5
documentationUrl: https://docs.airbyte.io/integrations/sources/amplitude
icon: amplitude.svg
sourceType: api
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -476,7 +476,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-amplitude:0.1.4"
- dockerImage: "airbyte/source-amplitude:0.1.5"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/sources/amplitude"
connectionSpecification:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,5 @@ RUN pip install .
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.1.4
LABEL io.airbyte.version=0.1.5
LABEL io.airbyte.name=airbyte/source-amplitude
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
from airbyte_cdk.models import SyncMode
from airbyte_cdk.sources.streams.http import HttpStream

from .errors import HTTP_ERROR_CODES, error_msg_from_status


class AmplitudeStream(HttpStream, ABC):

Expand All @@ -27,8 +29,12 @@ def next_page_token(self, response: requests.Response) -> Optional[Mapping[str,
return None

def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
response_data = response.json()
yield from response_data.get(self.name, [])
status = response.status_code
if status in HTTP_ERROR_CODES.keys():
error_msg_from_status(status)
yield from []
else:
yield from response.json().get(self.data_field, [])

def path(self, **kwargs) -> str:
return f"{self.api_version}/{self.name}"
Expand All @@ -37,14 +43,12 @@ def path(self, **kwargs) -> str:
class Cohorts(AmplitudeStream):
primary_key = "id"
api_version = 3
data_field = "cohorts"


class Annotations(AmplitudeStream):
primary_key = "id"

def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
response_data = response.json()
yield from response_data.get("data", [])
data_field = "data"


class IncrementalAmplitudeStream(AmplitudeStream, ABC):
Expand Down Expand Up @@ -124,6 +128,22 @@ def _parse_zip_file(self, zip_file: IO[bytes]) -> Iterable[Mapping]:
for record in file:
yield json.loads(record)

def stream_slices(self, stream_state: Mapping[str, Any] = None, **kwargs) -> Iterable[Optional[Mapping[str, Any]]]:
slices = []
start = self._start_date
if stream_state:
start = pendulum.parse(stream_state.get(self.cursor_field))
end = pendulum.now()
while start <= end:
slices.append(
{
"start": start.strftime(self.date_template),
"end": self._get_end_date(start).strftime(self.date_template),
}
)
start = start.add(**self.time_interval)
return slices

def read_records(
self,
sync_mode: SyncMode,
Expand All @@ -132,34 +152,35 @@ def read_records(
stream_state: Mapping[str, Any] = None,
) -> Iterable[Mapping[str, Any]]:
stream_state = stream_state or {}
params = self.request_params(stream_state=stream_state, stream_slice=stream_slice, next_page_token=None)

# API returns data only when requested with a difference between 'start' and 'end' of 6 or more hours.
if pendulum.parse(params["start"]).add(hours=6) > pendulum.parse(params["end"]):
return []
start = pendulum.parse(stream_slice["start"]).add(hours=6)
end = pendulum.parse(stream_slice["end"])
if start > end:
yield from []

# sometimes the API throws a 404 error for not obvious reasons, we have to handle it and log it.
# for example, if there is no data from the specified time period, a 404 exception is thrown
# https://developers.amplitude.com/docs/export-api#status-codes

try:
self.logger.info(f"Fetching {self.name} time range: {start.strftime('%Y-%m-%d')} - {end.strftime('%Y-%m-%d')}")
yield from super().read_records(sync_mode, cursor_field, stream_slice, stream_state)
except requests.exceptions.HTTPError as error:
if error.response.status_code == 404:
self.logger.warn(f"Error during syncing {self.name} stream - {error}")
return []
status = error.response.status_code
if status in HTTP_ERROR_CODES.keys():
error_msg_from_status(status)
yield from []
else:
self.logger.error(f"Error during syncing {self.name} stream - {error}")
raise

def request_params(
self, stream_state: Mapping[str, Any], next_page_token: Mapping[str, Any] = None, **kwargs
) -> MutableMapping[str, Any]:
params = super().request_params(stream_state=stream_state, next_page_token=next_page_token, **kwargs)
if stream_state or next_page_token:
params["start"] = pendulum.parse(params["start"]).add(hours=1).strftime(self.date_template)
def request_params(self, stream_slice: Mapping[str, Any], **kwargs) -> MutableMapping[str, Any]:
params = self.base_params
params["start"] = pendulum.parse(stream_slice["start"]).strftime(self.date_template)
params["end"] = pendulum.parse(stream_slice["end"]).strftime(self.date_template)
return params

def path(
self, stream_state: Mapping[str, Any] = None, stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None
) -> str:
def path(self, **kwargs) -> str:
return f"{self.api_version}/export"


Expand All @@ -168,9 +189,10 @@ class ActiveUsers(IncrementalAmplitudeStream):
name = "active_users"
primary_key = "date"
time_interval = {"months": 1}
data_field = "data"

def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
response_data = response.json().get("data", [])
response_data = response.json().get(self.data_field, [])
if response_data:
series = list(map(list, zip(*response_data["series"])))
for i, date in enumerate(response_data["xValues"]):
Expand All @@ -184,9 +206,10 @@ class AverageSessionLength(IncrementalAmplitudeStream):
name = "average_session_length"
primary_key = "date"
time_interval = {"days": 15}
data_field = "data"

def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
response_data = response.json().get("data", [])
response_data = response.json().get(self.data_field, [])
if response_data:
# From the Amplitude documentation it follows that "series" is an array with one element which is itself
# an array that contains the average session length for each day.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
#
# Copyright (c) 2021 Airbyte, Inc., all rights reserved.
#

import logging

LOGGER = logging.getLogger("airbyte")

HTTP_ERROR_CODES = {
400: {
"msg": "The file size of the exported data is too large. Shorten the time ranges and try again. The limit size is 4GB.",
"lvl": "ERROR",
},
404: {
"msg": "No data collected",
"lvl": "WARN",
},
504: {
"msg": "The amount of data is large causing a timeout. For large amounts of data, the Amazon S3 destination is recommended.",
"lvl": "ERROR",
},
}


def error_msg_from_status(status: int = None):
if status:
level = HTTP_ERROR_CODES[status]["lvl"]
message = HTTP_ERROR_CODES[status]["msg"]
if level == "ERROR":
LOGGER.error(message)
elif level == "WARN":
LOGGER.warn(message)
else:
LOGGER.error(f"Unknown error occured: code {status}")
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@
# Copyright (c) 2021 Airbyte, Inc., all rights reserved.
#

import airbyte_cdk.models
import pytest
import requests
from airbyte_cdk.models import SyncMode
from source_amplitude.api import Events


Expand All @@ -13,16 +13,29 @@ def __init__(self, status_code):
self.status_code = status_code


def test_http_error_handler(mocker):
def test_incremental_http_error_handler(mocker):
stream = Events(start_date="2021-01-01T00:00:00Z")
stream_slice = stream.stream_slices()[0]

mock_response = MockRequest(404)
send_request_mocker = mocker.patch.object(stream, "_send_request", side_effect=requests.HTTPError(**{"response": mock_response}))
with pytest.raises(StopIteration):
result = next(stream.read_records(sync_mode=airbyte_cdk.models.SyncMode.full_refresh))
result = next(stream.read_records(sync_mode=SyncMode.full_refresh, stream_slice=stream_slice))
assert result == []

mock_response = MockRequest(403)
send_request_mocker.side_effect = requests.HTTPError(**{"response": mock_response})
with pytest.raises(requests.exceptions.HTTPError):
next(stream.read_records(sync_mode=airbyte_cdk.models.SyncMode.full_refresh))
next(stream.read_records(sync_mode=SyncMode.full_refresh, stream_slice=stream_slice))

mock_response = MockRequest(400)
send_request_mocker.side_effect = requests.HTTPError(**{"response": mock_response})
with pytest.raises(StopIteration):
result = next(stream.read_records(sync_mode=SyncMode.full_refresh, stream_slice=stream_slice))
assert result == []

mock_response = MockRequest(504)
send_request_mocker.side_effect = requests.HTTPError(**{"response": mock_response})
with pytest.raises(StopIteration):
result = next(stream.read_records(sync_mode=SyncMode.full_refresh, stream_slice=stream_slice))
assert result == []
1 change: 1 addition & 0 deletions docs/integrations/sources/amplitude.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ Please read [How to get your API key and Secret key](https://help.amplitude.com/

| Version | Date | Pull Request | Subject |
| :------ | :--------- | :----------------------------------------------------- | :------ |
| 0.1.5 | 2022-04-28 | [12430](https://github.com/airbytehq/airbyte/pull/12430) | Added HTTP error descriptions and fixed `Events` stream fail caused by `404` HTTP Error |
| 0.1.4 | 2021-12-23 | [8434](https://github.com/airbytehq/airbyte/pull/8434) | Update fields in source-connectors specifications |
| 0.1.3 | 2021-10-12 | [6375](https://github.com/airbytehq/airbyte/pull/6375) | Log Transient 404 Error in Events stream |
| 0.1.2 | 2021-09-21 | [6353](https://github.com/airbytehq/airbyte/pull/6353) | Correct output schemas on cohorts, events, active\_users, and average\_session\_lengths streams |
Expand Down

0 comments on commit fd4b71e

Please sign in to comment.