Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Source Amplitude: refactor Events stream #25317

Merged
merged 15 commits into from
Apr 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -1111,6 +1111,54 @@
"public": true,
"custom": false,
"releaseStage": "alpha"
}, {
"destinationDefinitionId": "e088acb6-9780-4568-880c-54c2dd7f431b",
"name": "Cumul.io",
"dockerRepository": "airbyte/destination-cumulio",
"dockerImageTag": "0.1.0",
"documentationUrl": "https://docs.airbyte.com/integrations/destinations/cumulio",
"icon": "cumulio.svg",
"spec": {
"documentationUrl": "https://docs.airbyte.com/integrations/destinations/cumulio",
"connectionSpecification": {
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "Destination Cumulio",
"type": "object",
"required": [ "api_host", "api_key", "api_token" ],
"additionalProperties": true,
"properties": {
"api_host": {
"title": "Cumul.io API Host URL",
"description": "URL of the Cumul.io API (e.g. 'https://api.cumul.io', 'https://api.us.cumul.io', or VPC-specific API url). Defaults to 'https://api.cumul.io'.",
"default": "https://api.cumul.io",
"type": "string",
"order": 0
},
"api_key": {
"title": "Cumul.io API Key",
"description": "An API key generated in Cumul.io's platform (can be generated here: https://app.cumul.io/start/profile/integration).",
"type": "string",
"airbyte_secret": true,
"order": 1
},
"api_token": {
"title": "Cumul.io API Token",
"description": "The corresponding API token generated in Cumul.io's platform (can be generated here: https://app.cumul.io/start/profile/integration).",
"type": "string",
"airbyte_secret": true,
"order": 2
}
}
},
"supportsIncremental": true,
"supportsNormalization": false,
"supportsDBT": false,
"supported_destination_sync_modes": [ "overwrite", "append" ]
},
"tombstone": false,
"public": true,
"custom": false,
"releaseStage": "alpha"
}, {
"destinationDefinitionId": "81740ce8-d764-4ea7-94df-16bb41de36ae",
"name": "Chargify (Keen)",
Expand Down Expand Up @@ -8987,7 +9035,7 @@
"sourceDefinitionId": "fa9f58c6-2d03-4237-aaa4-07d75e0c1396",
"name": "Amplitude",
"dockerRepository": "airbyte/source-amplitude",
"dockerImageTag": "0.2.2",
"dockerImageTag": "0.2.3",
"documentationUrl": "https://docs.airbyte.com/integrations/sources/amplitude",
"icon": "amplitude.svg",
"sourceType": "api",
Expand Down Expand Up @@ -9044,14 +9092,6 @@
"public": true,
"custom": false,
"releaseStage": "generally_available",
"resourceRequirements": {
"jobSpecific": [ {
"jobType": "sync",
"resourceRequirements": {
"memory_limit": "8Gi"
}
} ]
},
"allowedHosts": {
"hosts": [ "amplitude.com", "analytics.eu.amplitude.com" ]
}
Expand Down
Expand Up @@ -103,16 +103,11 @@
- name: Amplitude
sourceDefinitionId: fa9f58c6-2d03-4237-aaa4-07d75e0c1396
dockerRepository: airbyte/source-amplitude
dockerImageTag: 0.2.2
dockerImageTag: 0.2.3
documentationUrl: https://docs.airbyte.com/integrations/sources/amplitude
icon: amplitude.svg
sourceType: api
releaseStage: generally_available
resourceRequirements:
jobSpecific:
- jobType: sync
resourceRequirements:
memory_limit: "8Gi"
allowedHosts:
hosts:
- amplitude.com
Expand Down
Expand Up @@ -1344,7 +1344,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-amplitude:0.2.2"
- dockerImage: "airbyte/source-amplitude:0.2.3"
spec:
documentationUrl: "https://docs.airbyte.com/integrations/sources/amplitude"
connectionSpecification:
Expand Down
Expand Up @@ -34,5 +34,5 @@ COPY source_amplitude ./source_amplitude
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.2.2
LABEL io.airbyte.version=0.2.3
LABEL io.airbyte.name=airbyte/source-amplitude
Expand Up @@ -119,45 +119,12 @@ definitions:
primary_key: "date"
path: "/2/users"

events_stream:
$ref: "#/definitions/base_incremental_stream"
retriever:
paginator:
type: NoPagination
requester:
$ref: "#/definitions/requester"
request_parameters:
start: "{{format_datetime(stream_slice.start_time, '%Y%m%dT%H') }}"
end: "{{format_datetime(stream_slice.end_time, '%Y%m%dT%H') }}"
record_selector:
type: RecordSelector
extractor:
type: CustomRecordExtractor
class_name: source_amplitude.components.EventsExtractor
record_filter:
condition: "{{ record[parameters['stream_cursor_field']] > stream_state.get(parameters['stream_cursor_field'],config['start_date']) }}"
incremental_sync:
$ref: "#/definitions/datetime_incremental_sync"
step: "PT{{config.get('request_time_range', 4)}}H"
cursor_field: "{{ parameters.get('stream_cursor_field') }}"
cursor_granularity: PT1H
start_datetime:
datetime: "{{ format_datetime(config['start_date'], '%Y-%m-%dT%H:%M:%S.%f%z') }}"
end_datetime:
datetime: "{{ now_utc().strftime('%Y-%m-%dT%H:%M:%S.%f%z') }}"
datetime_format: "%Y-%m-%dT%H:%M:%S.%f%z"
$parameters:
name: "events"
primary_key: "uuid"
path: "/2/export"
stream_cursor_field: "server_upload_time"

streams:
- "#/definitions/annotations_stream"
- "#/definitions/cohorts_stream"
- "#/definitions/average_session_length_stream"
- "#/definitions/active_users_stream"
- "#/definitions/events_stream"

check:
stream_names:
Expand Down
Expand Up @@ -2,7 +2,13 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

from base64 import b64encode
from typing import Any, List, Mapping

from airbyte_cdk.sources.declarative.yaml_declarative_source import YamlDeclarativeSource
from airbyte_cdk.sources.streams import Stream
from airbyte_cdk.sources.streams.http.requests_native_auth import TokenAuthenticator
from source_amplitude.streams import Events

"""
This file provides the necessary constructs to interpret a provided declarative YAML configuration file into
Expand All @@ -16,3 +22,22 @@
class SourceAmplitude(YamlDeclarativeSource):
def __init__(self):
super().__init__(**{"path_to_yaml": "manifest.yaml"})

def _convert_auth_to_token(self, username: str, password: str) -> str:
username = username.encode("latin1")
password = password.encode("latin1")
token = b64encode(b":".join((username, password))).strip().decode("ascii")
return token

def streams(self, config: Mapping[str, Any]) -> List[Stream]:
streams = super().streams(config=config)
auth = TokenAuthenticator(token=self._convert_auth_to_token(config["api_key"], config["secret_key"]), auth_method="Basic")
streams.append(
Events(
authenticator=auth,
start_date=config["start_date"],
data_region=config["data_region"],
event_time_interval={"size_unit": "hours", "size": config.get("request_time_range", 24)},
)
)
return streams
@@ -0,0 +1,187 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

import gzip
import io
import json
import logging
import zipfile
from typing import IO, Any, Iterable, List, Mapping, MutableMapping, Optional

import pendulum
import requests
from airbyte_cdk.models import SyncMode
from airbyte_cdk.sources.streams.availability_strategy import AvailabilityStrategy
from airbyte_cdk.sources.streams.http import HttpStream

LOGGER = logging.getLogger("airbyte")

HTTP_ERROR_CODES = {
400: {
"msg": "The file size of the exported data is too large. Shorten the time ranges and try again. The limit size is 4GB.",
"lvl": "ERROR",
},
404: {
"msg": "No data collected",
"lvl": "WARN",
},
504: {
"msg": "The amount of data is large causing a timeout. For large amounts of data, the Amazon S3 destination is recommended.",
"lvl": "ERROR",
},
}


def error_msg_from_status(status: int = None):
if status:
level = HTTP_ERROR_CODES[status]["lvl"]
message = HTTP_ERROR_CODES[status]["msg"]
if level == "ERROR":
LOGGER.error(message)
elif level == "WARN":
LOGGER.warning(message)
else:
LOGGER.error(f"Unknown error occured: code {status}")


class Events(HttpStream):
api_version = 2
base_params = {}
cursor_field = "server_upload_time"
date_template = "%Y%m%dT%H"
compare_date_template = "%Y-%m-%d %H:%M:%S.%f"
primary_key = "uuid"
state_checkpoint_interval = 1000

def __init__(self, data_region: str, start_date: str, event_time_interval: dict = None, **kwargs):
if event_time_interval is None:
event_time_interval = {"size_unit": "hours", "size": 24}
self.data_region = data_region
self.event_time_interval = event_time_interval
self._start_date = pendulum.parse(start_date) if isinstance(start_date, str) else start_date
self.date_time_fields = self._get_date_time_items_from_schema()
super().__init__(**kwargs)

@property
def url_base(self) -> str:
subdomain = "analytics.eu." if self.data_region == "EU Residency Server" else ""
return f"https://{subdomain}amplitude.com/api/"

@property
def availability_strategy(self) -> Optional["AvailabilityStrategy"]:
return None

@property
def time_interval(self) -> dict:
return {self.event_time_interval.get("size_unit"): self.event_time_interval.get("size")}

def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) -> Mapping[str, Any]:
# save state value in source native format
if self.compare_date_template:
latest_state = pendulum.parse(latest_record[self.cursor_field]).strftime(self.compare_date_template)
else:
latest_state = latest_record.get(self.cursor_field, "")
return {self.cursor_field: max(latest_state, current_stream_state.get(self.cursor_field, ""))}

def _get_date_time_items_from_schema(self):
"""
Get all properties from schema with format: 'date-time'
"""
result = []
schema = self.get_json_schema()
for key, value in schema["properties"].items():
if value.get("format") == "date-time":
result.append(key)
return result

def _date_time_to_rfc3339(self, record: MutableMapping[str, Any]) -> MutableMapping[str, Any]:
"""
Transform 'date-time' items to RFC3339 format
"""
for item in record:
if item in self.date_time_fields and record[item]:
record[item] = pendulum.parse(record[item]).to_rfc3339_string()
return record

def parse_response(self, response: requests.Response, stream_state: Mapping[str, Any] = None, **kwargs) -> Iterable[Mapping]:
state_value = stream_state[self.cursor_field] if stream_state else self._start_date.strftime(self.compare_date_template)
try:
zip_file = zipfile.ZipFile(io.BytesIO(response.content))
except zipfile.BadZipFile as e:
self.logger.exception(e)
self.logger.error(
f"Received an invalid zip file in response to URL: {response.request.url}."
f"The size of the response body is: {len(response.content)}"
)
return []

for gzip_filename in zip_file.namelist():
with zip_file.open(gzip_filename) as file:
for record in self._parse_zip_file(file):
if record[self.cursor_field] >= state_value:
yield self._date_time_to_rfc3339(record) # transform all `date-time` to RFC3339

def _parse_zip_file(self, zip_file: IO[bytes]) -> Iterable[MutableMapping]:
with gzip.open(zip_file) as file:
for record in file:
yield json.loads(record)

def stream_slices(self, stream_state: Mapping[str, Any] = None, **kwargs) -> Iterable[Optional[Mapping[str, Any]]]:
slices = []
start = pendulum.parse(stream_state.get(self.cursor_field)) if stream_state else self._start_date
end = pendulum.now()
if start > end:
self.logger.info("The data cannot be requested in the future. Skipping stream.")
return []

while start <= end:
slices.append(
{
"start": start.strftime(self.date_template),
"end": start.add(**self.time_interval).subtract(hours=1).strftime(self.date_template),
}
)
start = start.add(**self.time_interval)

return slices

def read_records(
self,
sync_mode: SyncMode,
cursor_field: List[str] = None,
stream_slice: Mapping[str, Any] = None,
stream_state: Mapping[str, Any] = None,
) -> Iterable[Mapping[str, Any]]:
stream_state = stream_state or {}
start = pendulum.parse(stream_slice["start"])
end = pendulum.parse(stream_slice["end"])
if start > end:
yield from []
# sometimes the API throws a 404 error for not obvious reasons, we have to handle it and log it.
# for example, if there is no data from the specified time period, a 404 exception is thrown
# https://developers.amplitude.com/docs/export-api#status-codes
try:
self.logger.info(f"Fetching {self.name} time range: {start.strftime('%Y-%m-%dT%H')} - {end.strftime('%Y-%m-%dT%H')}")
records = super().read_records(sync_mode, cursor_field, stream_slice, stream_state)
yield from records
except requests.exceptions.HTTPError as error:
status = error.response.status_code
if status in HTTP_ERROR_CODES.keys():
error_msg_from_status(status)
yield from []
else:
self.logger.error(f"Error during syncing {self.name} stream - {error}")
raise

def request_params(self, stream_slice: Mapping[str, Any], **kwargs) -> MutableMapping[str, Any]:
params = self.base_params
params["start"] = pendulum.parse(stream_slice["start"]).strftime(self.date_template)
params["end"] = pendulum.parse(stream_slice["end"]).strftime(self.date_template)
return params

def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
return None

def path(self, **kwargs) -> str:
return f"{self.api_version}/export"