Skip to content

Commit

Permalink
Source Amplitude: refactor Events stream (#25317)
Browse files Browse the repository at this point in the history
* Source Amplitude: Refactor Events Stream, based on Python CDK

* Source Amplitude: bump docker version

* Source Amplitude: Fix add get_updated_state

* Revert "Increase memory_limit for Amplitude (#25282)"

This reverts commit 06354ae.

* Source Amplitude: Fix backport compatibility with new config

* Update source_definitions.yaml

* Source Amplitude: update catalogs


---------

Co-authored-by: artem1205 <artem1205@users.noreply.github.com>
  • Loading branch information
artem1205 and artem1205 committed Apr 20, 2023
1 parent a6d6b06 commit aaf4bf0
Show file tree
Hide file tree
Showing 8 changed files with 266 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1111,6 +1111,54 @@
"public": true,
"custom": false,
"releaseStage": "alpha"
}, {
"destinationDefinitionId": "e088acb6-9780-4568-880c-54c2dd7f431b",
"name": "Cumul.io",
"dockerRepository": "airbyte/destination-cumulio",
"dockerImageTag": "0.1.0",
"documentationUrl": "https://docs.airbyte.com/integrations/destinations/cumulio",
"icon": "cumulio.svg",
"spec": {
"documentationUrl": "https://docs.airbyte.com/integrations/destinations/cumulio",
"connectionSpecification": {
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "Destination Cumulio",
"type": "object",
"required": [ "api_host", "api_key", "api_token" ],
"additionalProperties": true,
"properties": {
"api_host": {
"title": "Cumul.io API Host URL",
"description": "URL of the Cumul.io API (e.g. 'https://api.cumul.io', 'https://api.us.cumul.io', or VPC-specific API url). Defaults to 'https://api.cumul.io'.",
"default": "https://api.cumul.io",
"type": "string",
"order": 0
},
"api_key": {
"title": "Cumul.io API Key",
"description": "An API key generated in Cumul.io's platform (can be generated here: https://app.cumul.io/start/profile/integration).",
"type": "string",
"airbyte_secret": true,
"order": 1
},
"api_token": {
"title": "Cumul.io API Token",
"description": "The corresponding API token generated in Cumul.io's platform (can be generated here: https://app.cumul.io/start/profile/integration).",
"type": "string",
"airbyte_secret": true,
"order": 2
}
}
},
"supportsIncremental": true,
"supportsNormalization": false,
"supportsDBT": false,
"supported_destination_sync_modes": [ "overwrite", "append" ]
},
"tombstone": false,
"public": true,
"custom": false,
"releaseStage": "alpha"
}, {
"destinationDefinitionId": "81740ce8-d764-4ea7-94df-16bb41de36ae",
"name": "Chargify (Keen)",
Expand Down Expand Up @@ -8987,7 +9035,7 @@
"sourceDefinitionId": "fa9f58c6-2d03-4237-aaa4-07d75e0c1396",
"name": "Amplitude",
"dockerRepository": "airbyte/source-amplitude",
"dockerImageTag": "0.2.2",
"dockerImageTag": "0.2.3",
"documentationUrl": "https://docs.airbyte.com/integrations/sources/amplitude",
"icon": "amplitude.svg",
"sourceType": "api",
Expand Down Expand Up @@ -9044,14 +9092,6 @@
"public": true,
"custom": false,
"releaseStage": "generally_available",
"resourceRequirements": {
"jobSpecific": [ {
"jobType": "sync",
"resourceRequirements": {
"memory_limit": "8Gi"
}
} ]
},
"allowedHosts": {
"hosts": [ "amplitude.com", "analytics.eu.amplitude.com" ]
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,16 +103,11 @@
- name: Amplitude
sourceDefinitionId: fa9f58c6-2d03-4237-aaa4-07d75e0c1396
dockerRepository: airbyte/source-amplitude
dockerImageTag: 0.2.2
dockerImageTag: 0.2.3
documentationUrl: https://docs.airbyte.com/integrations/sources/amplitude
icon: amplitude.svg
sourceType: api
releaseStage: generally_available
resourceRequirements:
jobSpecific:
- jobType: sync
resourceRequirements:
memory_limit: "8Gi"
allowedHosts:
hosts:
- amplitude.com
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1344,7 +1344,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-amplitude:0.2.2"
- dockerImage: "airbyte/source-amplitude:0.2.3"
spec:
documentationUrl: "https://docs.airbyte.com/integrations/sources/amplitude"
connectionSpecification:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,5 +34,5 @@ COPY source_amplitude ./source_amplitude
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.2.2
LABEL io.airbyte.version=0.2.3
LABEL io.airbyte.name=airbyte/source-amplitude
Original file line number Diff line number Diff line change
Expand Up @@ -119,45 +119,12 @@ definitions:
primary_key: "date"
path: "/2/users"

events_stream:
$ref: "#/definitions/base_incremental_stream"
retriever:
paginator:
type: NoPagination
requester:
$ref: "#/definitions/requester"
request_parameters:
start: "{{format_datetime(stream_slice.start_time, '%Y%m%dT%H') }}"
end: "{{format_datetime(stream_slice.end_time, '%Y%m%dT%H') }}"
record_selector:
type: RecordSelector
extractor:
type: CustomRecordExtractor
class_name: source_amplitude.components.EventsExtractor
record_filter:
condition: "{{ record[parameters['stream_cursor_field']] > stream_state.get(parameters['stream_cursor_field'],config['start_date']) }}"
incremental_sync:
$ref: "#/definitions/datetime_incremental_sync"
step: "PT{{config.get('request_time_range', 4)}}H"
cursor_field: "{{ parameters.get('stream_cursor_field') }}"
cursor_granularity: PT1H
start_datetime:
datetime: "{{ format_datetime(config['start_date'], '%Y-%m-%dT%H:%M:%S.%f%z') }}"
end_datetime:
datetime: "{{ now_utc().strftime('%Y-%m-%dT%H:%M:%S.%f%z') }}"
datetime_format: "%Y-%m-%dT%H:%M:%S.%f%z"
$parameters:
name: "events"
primary_key: "uuid"
path: "/2/export"
stream_cursor_field: "server_upload_time"

streams:
- "#/definitions/annotations_stream"
- "#/definitions/cohorts_stream"
- "#/definitions/average_session_length_stream"
- "#/definitions/active_users_stream"
- "#/definitions/events_stream"

check:
stream_names:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,13 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

from base64 import b64encode
from typing import Any, List, Mapping

from airbyte_cdk.sources.declarative.yaml_declarative_source import YamlDeclarativeSource
from airbyte_cdk.sources.streams import Stream
from airbyte_cdk.sources.streams.http.requests_native_auth import TokenAuthenticator
from source_amplitude.streams import Events

"""
This file provides the necessary constructs to interpret a provided declarative YAML configuration file into
Expand All @@ -16,3 +22,22 @@
class SourceAmplitude(YamlDeclarativeSource):
def __init__(self):
super().__init__(**{"path_to_yaml": "manifest.yaml"})

def _convert_auth_to_token(self, username: str, password: str) -> str:
username = username.encode("latin1")
password = password.encode("latin1")
token = b64encode(b":".join((username, password))).strip().decode("ascii")
return token

def streams(self, config: Mapping[str, Any]) -> List[Stream]:
streams = super().streams(config=config)
auth = TokenAuthenticator(token=self._convert_auth_to_token(config["api_key"], config["secret_key"]), auth_method="Basic")
streams.append(
Events(
authenticator=auth,
start_date=config["start_date"],
data_region=config["data_region"],
event_time_interval={"size_unit": "hours", "size": config.get("request_time_range", 24)},
)
)
return streams
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

import gzip
import io
import json
import logging
import zipfile
from typing import IO, Any, Iterable, List, Mapping, MutableMapping, Optional

import pendulum
import requests
from airbyte_cdk.models import SyncMode
from airbyte_cdk.sources.streams.availability_strategy import AvailabilityStrategy
from airbyte_cdk.sources.streams.http import HttpStream

LOGGER = logging.getLogger("airbyte")

HTTP_ERROR_CODES = {
400: {
"msg": "The file size of the exported data is too large. Shorten the time ranges and try again. The limit size is 4GB.",
"lvl": "ERROR",
},
404: {
"msg": "No data collected",
"lvl": "WARN",
},
504: {
"msg": "The amount of data is large causing a timeout. For large amounts of data, the Amazon S3 destination is recommended.",
"lvl": "ERROR",
},
}


def error_msg_from_status(status: int = None):
if status:
level = HTTP_ERROR_CODES[status]["lvl"]
message = HTTP_ERROR_CODES[status]["msg"]
if level == "ERROR":
LOGGER.error(message)
elif level == "WARN":
LOGGER.warning(message)
else:
LOGGER.error(f"Unknown error occured: code {status}")


class Events(HttpStream):
api_version = 2
base_params = {}
cursor_field = "server_upload_time"
date_template = "%Y%m%dT%H"
compare_date_template = "%Y-%m-%d %H:%M:%S.%f"
primary_key = "uuid"
state_checkpoint_interval = 1000

def __init__(self, data_region: str, start_date: str, event_time_interval: dict = None, **kwargs):
if event_time_interval is None:
event_time_interval = {"size_unit": "hours", "size": 24}
self.data_region = data_region
self.event_time_interval = event_time_interval
self._start_date = pendulum.parse(start_date) if isinstance(start_date, str) else start_date
self.date_time_fields = self._get_date_time_items_from_schema()
super().__init__(**kwargs)

@property
def url_base(self) -> str:
subdomain = "analytics.eu." if self.data_region == "EU Residency Server" else ""
return f"https://{subdomain}amplitude.com/api/"

@property
def availability_strategy(self) -> Optional["AvailabilityStrategy"]:
return None

@property
def time_interval(self) -> dict:
return {self.event_time_interval.get("size_unit"): self.event_time_interval.get("size")}

def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) -> Mapping[str, Any]:
# save state value in source native format
if self.compare_date_template:
latest_state = pendulum.parse(latest_record[self.cursor_field]).strftime(self.compare_date_template)
else:
latest_state = latest_record.get(self.cursor_field, "")
return {self.cursor_field: max(latest_state, current_stream_state.get(self.cursor_field, ""))}

def _get_date_time_items_from_schema(self):
"""
Get all properties from schema with format: 'date-time'
"""
result = []
schema = self.get_json_schema()
for key, value in schema["properties"].items():
if value.get("format") == "date-time":
result.append(key)
return result

def _date_time_to_rfc3339(self, record: MutableMapping[str, Any]) -> MutableMapping[str, Any]:
"""
Transform 'date-time' items to RFC3339 format
"""
for item in record:
if item in self.date_time_fields and record[item]:
record[item] = pendulum.parse(record[item]).to_rfc3339_string()
return record

def parse_response(self, response: requests.Response, stream_state: Mapping[str, Any] = None, **kwargs) -> Iterable[Mapping]:
state_value = stream_state[self.cursor_field] if stream_state else self._start_date.strftime(self.compare_date_template)
try:
zip_file = zipfile.ZipFile(io.BytesIO(response.content))
except zipfile.BadZipFile as e:
self.logger.exception(e)
self.logger.error(
f"Received an invalid zip file in response to URL: {response.request.url}."
f"The size of the response body is: {len(response.content)}"
)
return []

for gzip_filename in zip_file.namelist():
with zip_file.open(gzip_filename) as file:
for record in self._parse_zip_file(file):
if record[self.cursor_field] >= state_value:
yield self._date_time_to_rfc3339(record) # transform all `date-time` to RFC3339

def _parse_zip_file(self, zip_file: IO[bytes]) -> Iterable[MutableMapping]:
with gzip.open(zip_file) as file:
for record in file:
yield json.loads(record)

def stream_slices(self, stream_state: Mapping[str, Any] = None, **kwargs) -> Iterable[Optional[Mapping[str, Any]]]:
slices = []
start = pendulum.parse(stream_state.get(self.cursor_field)) if stream_state else self._start_date
end = pendulum.now()
if start > end:
self.logger.info("The data cannot be requested in the future. Skipping stream.")
return []

while start <= end:
slices.append(
{
"start": start.strftime(self.date_template),
"end": start.add(**self.time_interval).subtract(hours=1).strftime(self.date_template),
}
)
start = start.add(**self.time_interval)

return slices

def read_records(
self,
sync_mode: SyncMode,
cursor_field: List[str] = None,
stream_slice: Mapping[str, Any] = None,
stream_state: Mapping[str, Any] = None,
) -> Iterable[Mapping[str, Any]]:
stream_state = stream_state or {}
start = pendulum.parse(stream_slice["start"])
end = pendulum.parse(stream_slice["end"])
if start > end:
yield from []
# sometimes the API throws a 404 error for not obvious reasons, we have to handle it and log it.
# for example, if there is no data from the specified time period, a 404 exception is thrown
# https://developers.amplitude.com/docs/export-api#status-codes
try:
self.logger.info(f"Fetching {self.name} time range: {start.strftime('%Y-%m-%dT%H')} - {end.strftime('%Y-%m-%dT%H')}")
records = super().read_records(sync_mode, cursor_field, stream_slice, stream_state)
yield from records
except requests.exceptions.HTTPError as error:
status = error.response.status_code
if status in HTTP_ERROR_CODES.keys():
error_msg_from_status(status)
yield from []
else:
self.logger.error(f"Error during syncing {self.name} stream - {error}")
raise

def request_params(self, stream_slice: Mapping[str, Any], **kwargs) -> MutableMapping[str, Any]:
params = self.base_params
params["start"] = pendulum.parse(stream_slice["start"]).strftime(self.date_template)
params["end"] = pendulum.parse(stream_slice["end"]).strftime(self.date_template)
return params

def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
return None

def path(self, **kwargs) -> str:
return f"{self.api_version}/export"

0 comments on commit aaf4bf0

Please sign in to comment.